tests: Add support for getting corefile patterns on FreeBSD
[vpp.git] / test / vpp_policer.py
1 from vpp_object import VppObject
2 from vpp_ip import INVALID_INDEX
3 from enum import Enum
4
5
6 class Dir(Enum):
7     RX = 0
8     TX = 1
9
10
11 class PolicerAction:
12     """sse2 qos action"""
13
14     def __init__(self, type, dscp):
15         self.type = type
16         self.dscp = dscp
17
18     def encode(self):
19         return {"type": self.type, "dscp": self.dscp}
20
21
22 class VppPolicer(VppObject):
23     """Policer"""
24
25     def __init__(
26         self,
27         test,
28         name,
29         cir,
30         eir,
31         commited_burst,
32         excess_burst,
33         rate_type=0,
34         round_type=0,
35         type=0,
36         color_aware=False,
37         conform_action=PolicerAction(1, 0),
38         exceed_action=PolicerAction(0, 0),
39         violate_action=PolicerAction(0, 0),
40     ):
41         self._test = test
42         self.name = name
43         self.cir = cir
44         self.eir = eir
45         self.commited_burst = commited_burst
46         self.excess_burst = excess_burst
47         self.rate_type = rate_type
48         self.round_type = round_type
49         self.type = type
50         self.color_aware = color_aware
51         self.conform_action = conform_action
52         self.exceed_action = exceed_action
53         self.violate_action = violate_action
54         self._policer_index = INVALID_INDEX
55
56     @property
57     def policer_index(self):
58         return self._policer_index
59
60     @property
61     def config(self):
62         return {
63             "cir": self.cir,
64             "eir": self.eir,
65             "cb": self.commited_burst,
66             "eb": self.excess_burst,
67             "rate_type": self.rate_type,
68             "round_type": self.round_type,
69             "type": self.type,
70             "color_aware": self.color_aware,
71             "conform_action": self.conform_action.encode(),
72             "exceed_action": self.exceed_action.encode(),
73             "violate_action": self.violate_action.encode(),
74         }
75
76     def add_vpp_config(self):
77         r = self._test.vapi.policer_add(name=self.name, infos=self.config)
78         self._test.registry.register(self, self._test.logger)
79         self._policer_index = r.policer_index
80         return self
81
82     def update(self):
83         self._test.vapi.policer_update(
84             policer_index=self._policer_index, infos=self.config
85         )
86
87     def remove_vpp_config(self):
88         self._test.vapi.policer_del(policer_index=self._policer_index)
89         self._policer_index = INVALID_INDEX
90
91     def bind_vpp_config(self, worker, bind):
92         self._test.vapi.policer_bind_v2(
93             policer_index=self._policer_index, worker_index=worker, bind_enable=bind
94         )
95
96     def apply_vpp_config(self, if_index, dir: Dir, apply):
97         if dir == Dir.RX:
98             self._test.vapi.policer_input_v2(
99                 policer_index=self._policer_index, sw_if_index=if_index, apply=apply
100             )
101         else:
102             self._test.vapi.policer_output_v2(
103                 policer_index=self._policer_index, sw_if_index=if_index, apply=apply
104             )
105
106     def query_vpp_config(self):
107         dump = self._test.vapi.policer_dump_v2(policer_index=self._policer_index)
108         for policer in dump:
109             if policer.name == self.name:
110                 return True
111         return False
112
113     def object_id(self):
114         return "policer-%s" % (self.name)
115
116     def get_details(self):
117         dump = self._test.vapi.policer_dump_v2(policer_index=self._policer_index)
118         for policer in dump:
119             if policer.name == self.name:
120                 return policer
121         raise self._test.vapi.VPPValueError("Missing policer")
122
123     def get_stats(self, worker=None):
124         conform = self._test.statistics.get_counter("/net/policer/conform")
125         exceed = self._test.statistics.get_counter("/net/policer/exceed")
126         violate = self._test.statistics.get_counter("/net/policer/violate")
127
128         counters = {"conform": conform, "exceed": exceed, "violate": violate}
129
130         total = {}
131         for name, c in counters.items():
132             total[f"{name}_packets"] = 0
133             total[f"{name}_bytes"] = 0
134             for i in range(len(c)):
135                 t = c[i]
136                 if worker is not None and i != worker + 1:
137                     continue
138                 stat_index = self._policer_index
139                 total[f"{name}_packets"] += t[stat_index]["packets"]
140                 total[f"{name}_bytes"] += t[stat_index]["bytes"]
141
142         return total