from vpp_object import VppObject
from vpp_ip import INVALID_INDEX
+from enum import Enum
-class PolicerAction():
- """ sse2 qos action """
+class Dir(Enum):
+ RX = 0
+ TX = 1
+
+
+class PolicerAction:
+ """sse2 qos action"""
def __init__(self, type, dscp):
self.type = type
self.dscp = dscp
def encode(self):
- return {'type': self.type, 'dscp': self.dscp}
+ return {"type": self.type, "dscp": self.dscp}
class VppPolicer(VppObject):
- """ Policer """
+ """Policer"""
- def __init__(self, test, name, cir, eir, commited_burst, excess_burst,
- rate_type=0, round_type=0, type=0, color_aware=False,
- conform_action=PolicerAction(1, 0),
- exceed_action=PolicerAction(0, 0),
- violate_action=PolicerAction(0, 0)):
+ def __init__(
+ self,
+ test,
+ name,
+ cir,
+ eir,
+ commited_burst,
+ excess_burst,
+ rate_type=0,
+ round_type=0,
+ type=0,
+ color_aware=False,
+ conform_action=PolicerAction(1, 0),
+ exceed_action=PolicerAction(0, 0),
+ violate_action=PolicerAction(0, 0),
+ ):
self._test = test
self.name = name
self.cir = cir
def add_vpp_config(self):
r = self._test.vapi.policer_add_del(
- name=self.name, cir=self.cir,
- eir=self.eir, cb=self.commited_burst, eb=self.excess_burst,
- rate_type=self.rate_type, round_type=self.round_type,
- type=self.type, color_aware=self.color_aware,
+ name=self.name,
+ cir=self.cir,
+ eir=self.eir,
+ cb=self.commited_burst,
+ eb=self.excess_burst,
+ rate_type=self.rate_type,
+ round_type=self.round_type,
+ type=self.type,
+ color_aware=self.color_aware,
conform_action=self.conform_action.encode(),
exceed_action=self.exceed_action.encode(),
- violate_action=self.violate_action.encode())
+ violate_action=self.violate_action.encode(),
+ )
self._test.registry.register(self, self._test.logger)
self._policer_index = r.policer_index
return self
self._test.vapi.policer_add_del(is_add=False, name=self.name)
self._policer_index = INVALID_INDEX
+ def bind_vpp_config(self, worker, bind):
+ self._test.vapi.policer_bind(
+ name=self.name, worker_index=worker, bind_enable=bind
+ )
+
+ def apply_vpp_config(self, if_index, dir: Dir, apply):
+ if dir == Dir.RX:
+ self._test.vapi.policer_input(
+ name=self.name, sw_if_index=if_index, apply=apply
+ )
+ else:
+ self._test.vapi.policer_output(
+ name=self.name, sw_if_index=if_index, apply=apply
+ )
+
def query_vpp_config(self):
- dump = self._test.vapi.policer_dump(
- match_name_valid=True, match_name=self.name)
+ dump = self._test.vapi.policer_dump(match_name_valid=True, match_name=self.name)
for policer in dump:
if policer.name == self.name:
return True
return False
def object_id(self):
- return ("policer-%s" % (self.name))
+ return "policer-%s" % (self.name)
+
+ def get_stats(self, worker=None):
+ conform = self._test.statistics.get_counter("/net/policer/conform")
+ exceed = self._test.statistics.get_counter("/net/policer/exceed")
+ violate = self._test.statistics.get_counter("/net/policer/violate")
+
+ counters = {"conform": conform, "exceed": exceed, "violate": violate}
+
+ total = {}
+ for name, c in counters.items():
+ total[f"{name}_packets"] = 0
+ total[f"{name}_bytes"] = 0
+ for i in range(len(c)):
+ t = c[i]
+ if worker is not None and i != worker + 1:
+ continue
+ stat_index = self._policer_index
+ total[f"{name}_packets"] += t[stat_index]["packets"]
+ total[f"{name}_bytes"] += t[stat_index]["bytes"]
+
+ return total