+ error_string.append("{:<60}{:>10}".format(k, error_counters[k]))
+ return "%s\n" % "\n".join(error_string)
+
+ def get_counter(self, name, blocking=True):
+ """Alternative call to __getitem__"""
+ return self.__getitem__(name, blocking)
+
+ def get_err_counter(self, name, blocking=True):
+ """Alternative call to __getitem__"""
+ return self.__getitem__(name, blocking).sum()
+
+ def ls(self, patterns):
+ """Returns list of counters matching pattern"""
+ # pylint: disable=invalid-name
+ if not self.connected:
+ self.connect()
+ if not isinstance(patterns, list):
+ patterns = [patterns]
+ regex = [re.compile(i) for i in patterns]
+ if self.last_epoch != self.epoch:
+ self.refresh()
+
+ return [
+ k
+ for k, v in self.directory.items()
+ if any(re.match(pattern, k) for pattern in regex)
+ ]
+
+ def dump(self, counters, blocking=True):
+ """Given a list of counters return a dictionary of results"""
+ if not self.connected:
+ self.connect()
+ result = {}
+ for cnt in counters:
+ result[cnt] = self.__getitem__(cnt, blocking)
+ return result
+
+
+class StatsLock:
+ """Stat segment optimistic locking"""
+
+ def __init__(self, stats):
+ self.stats = stats
+ self.epoch = 0
+
+ def __enter__(self):
+ acquired = self.acquire(blocking=True)
+ assert acquired, "Lock wasn't acquired, but blocking=True"
+ return self
+
+ def __exit__(self, exc_type=None, exc_value=None, traceback=None):
+ self.release()
+
+ def acquire(self, blocking=True, timeout=-1):
+ """Acquire the lock. Await in progress to go false. Record epoch."""
+ self.epoch = self.stats.epoch
+ if timeout > 0:
+ start = time.monotonic()
+ while self.stats.in_progress:
+ if not blocking:
+ time.sleep(0.01)
+ if timeout > 0:
+ if start + time.monotonic() > timeout:
+ return False
+ return True
+
+ def release(self):
+ """Check if data read while locked is valid"""
+ if self.stats.in_progress or self.stats.epoch != self.epoch:
+ raise IOError("Optimistic lock failed, retry")
+
+ def locked(self):
+ """Not used"""
+
+
+class StatsCombinedList(list):
+ """Column slicing for Combined counters list"""
+
+ def __getitem__(self, item):
+ """Supports partial numpy style 2d support. Slice by column [:,1]"""
+ if isinstance(item, int):
+ return list.__getitem__(self, item)
+ return CombinedList([row[item[1]] for row in self])
+
+
+class CombinedList(list):
+ """Combined Counters 2-dimensional by thread by index of packets/octets"""
+
+ def packets(self):
+ """Return column (2nd dimension). Packets for all threads"""
+ return [pair[0] for pair in self]
+
+ def octets(self):
+ """Return column (2nd dimension). Octets for all threads"""
+ return [pair[1] for pair in self]
+
+ def sum_packets(self):
+ """Return column (2nd dimension). Sum of all packets for all threads"""
+ return sum(self.packets())
+
+ def sum_octets(self):
+ """Return column (2nd dimension). Sum of all octets for all threads"""
+ return sum(self.octets())
+
+
+class StatsTuple(tuple):
+ """A Combined vector tuple (packets, octets)"""
+
+ def __init__(self, data):
+ self.dictionary = {"packets": data[0], "bytes": data[1]}
+ super().__init__()
+
+ def __repr__(self):
+ return dict.__repr__(self.dictionary)
+
+ def __getitem__(self, item):
+ if isinstance(item, int):
+ return tuple.__getitem__(self, item)
+ if item == "packets":
+ return tuple.__getitem__(self, 0)
+ return tuple.__getitem__(self, 1)
+
+
+class StatsSimpleList(list):
+ """Simple Counters 2-dimensional by thread by index of packets"""
+
+ def __getitem__(self, item):
+ """Supports partial numpy style 2d support. Slice by column [:,1]"""
+ if isinstance(item, int):
+ return list.__getitem__(self, item)
+ return SimpleList([row[item[1]] for row in self])
+
+
+class SimpleList(list):
+ """Simple counter"""
+
+ def sum(self):
+ """Sum the vector"""
+ return sum(self)
+
+
+class StatsEntry:
+ """An individual stats entry"""
+
+ # pylint: disable=unused-argument,no-self-use
+
+ def __init__(self, stattype, statvalue):
+ self.type = stattype
+ self.value = statvalue
+
+ if stattype == 1:
+ self.function = self.scalar
+ elif stattype == 2:
+ self.function = self.simple
+ elif stattype == 3:
+ self.function = self.combined
+ elif stattype == 4:
+ self.function = self.name
+ elif stattype == 6:
+ self.function = self.symlink
+ else:
+ self.function = self.illegal
+
+ def illegal(self, stats):
+ """Invalid or unknown counter type"""
+ return None
+
+ def scalar(self, stats):
+ """Scalar counter"""
+ return self.value
+
+ def simple(self, stats):
+ """Simple counter"""
+ counter = StatsSimpleList()
+ for threads in StatsVector(stats, self.value, "P"):
+ clist = [v[0] for v in StatsVector(stats, threads[0], "Q")]
+ counter.append(clist)
+ return counter
+
+ def combined(self, stats):
+ """Combined counter"""
+ counter = StatsCombinedList()
+ for threads in StatsVector(stats, self.value, "P"):
+ clist = [StatsTuple(cnt) for cnt in StatsVector(stats, threads[0], "QQ")]
+ counter.append(clist)
+ return counter
+
+ def name(self, stats):
+ """Name counter"""
+ counter = []
+ for name in StatsVector(stats, self.value, "P"):
+ if name[0]:
+ counter.append(get_string(stats, name[0]))
+ return counter
+
+ SYMLINK_FMT1 = Struct("II")
+ SYMLINK_FMT2 = Struct("Q")
+
+ def symlink(self, stats):
+ """Symlink counter"""
+ b = self.SYMLINK_FMT2.pack(self.value)
+ index1, index2 = self.SYMLINK_FMT1.unpack(b)
+ name = stats.directory_by_idx[index1]
+ return stats[name][:, index2]
+
+ def get_counter(self, stats):
+ """Return a list of counters"""
+ if stats:
+ return self.function(stats)
+
+
+class TestStats(unittest.TestCase):
+ """Basic statseg tests"""
+
+ def setUp(self):
+ """Connect to statseg"""
+ self.stat = VPPStats()
+ self.stat.connect()
+ self.profile = cProfile.Profile()
+ self.profile.enable()
+
+ def tearDown(self):
+ """Disconnect from statseg"""
+ self.stat.disconnect()
+ profile = Stats(self.profile)
+ profile.strip_dirs()
+ profile.sort_stats("cumtime")
+ profile.print_stats()
+ print("\n--->>>")
+
+ def test_counters(self):
+ """Test access to statseg"""
+
+ print("/err/abf-input-ip4/missed", self.stat["/err/abf-input-ip4/missed"])
+ print("/sys/heartbeat", self.stat["/sys/heartbeat"])
+ print("/if/names", self.stat["/if/names"])
+ print("/if/rx-miss", self.stat["/if/rx-miss"])
+ print("/if/rx-miss", self.stat["/if/rx-miss"][1])
+ print(
+ "/nat44-ed/out2in/slowpath/drops",
+ self.stat["/nat44-ed/out2in/slowpath/drops"],
+ )
+ with self.assertRaises(KeyError):
+ print("NO SUCH COUNTER", self.stat["foobar"])
+ print("/if/rx", self.stat.get_counter("/if/rx"))
+ print(
+ "/err/ethernet-input/no_error",
+ self.stat.get_counter("/err/ethernet-input/no_error"),
+ )
+
+ def test_column(self):
+ """Test column slicing"""
+
+ print("/if/rx-miss", self.stat["/if/rx-miss"])
+ print("/if/rx", self.stat["/if/rx"]) # All interfaces for thread #1
+ print(
+ "/if/rx thread #1", self.stat["/if/rx"][0]
+ ) # All interfaces for thread #1
+ print(
+ "/if/rx thread #1, interface #1", self.stat["/if/rx"][0][1]
+ ) # All interfaces for thread #1
+ print("/if/rx if_index #1", self.stat["/if/rx"][:, 1])
+ print("/if/rx if_index #1 packets", self.stat["/if/rx"][:, 1].packets())
+ print("/if/rx if_index #1 packets", self.stat["/if/rx"][:, 1].sum_packets())
+ print("/if/rx if_index #1 packets", self.stat["/if/rx"][:, 1].octets())
+ print("/if/rx-miss", self.stat["/if/rx-miss"])
+ print("/if/rx-miss if_index #1 packets", self.stat["/if/rx-miss"][:, 1].sum())
+ print("/if/rx if_index #1 packets", self.stat["/if/rx"][0][1]["packets"])
+
+ def test_nat44(self):
+ """Test the nat counters"""
+
+ print("/nat44-ei/ha/del-event-recv", self.stat["/nat44-ei/ha/del-event-recv"])
+ print(
+ "/err/nat44-ei-ha/pkts-processed",
+ self.stat["/err/nat44-ei-ha/pkts-processed"].sum(),
+ )
+
+ def test_legacy(self):
+ """Legacy interface"""
+ directory = self.stat.ls(["^/if", "/err/ip4-input", "/sys/node/ip4-input"])
+ data = self.stat.dump(directory)
+ print(data)
+ print("Looking up sys node")
+ directory = self.stat.ls(["^/sys/node"])
+ print("Dumping sys node")
+ data = self.stat.dump(directory)
+ print(data)
+ directory = self.stat.ls(["^/foobar"])
+ data = self.stat.dump(directory)
+ print(data)
+
+ def test_sys_nodes(self):
+ """Test /sys/nodes"""
+ counters = self.stat.ls("^/sys/node")
+ print("COUNTERS:", counters)
+ print("/sys/node", self.stat.dump(counters))
+ print("/net/route/to", self.stat["/net/route/to"])
+
+ def test_symlink(self):
+ """Symbolic links"""
+ print("/interface/local0/rx", self.stat["/interfaces/local0/rx"])
+ print("/sys/nodes/unix-epoll-input", self.stat["/nodes/unix-epoll-input/calls"])
+
+
+if __name__ == "__main__":
+ import cProfile
+ from pstats import Stats
+
+ unittest.main()