tests: Add support for getting corefile patterns on FreeBSD
[vpp.git] / test / vpp_qos.py
1 """
2   QoS
3
4   object abstractions for representing QoS config VPP
5 """
6
7 from vpp_object import VppObject
8
9
10 class VppQosRecord(VppObject):
11     """QoS Record(ing) configuration"""
12
13     def __init__(self, test, intf, source):
14         self._test = test
15         self.intf = intf
16         self.source = source
17
18     def add_vpp_config(self):
19         self._test.vapi.qos_record_enable_disable(
20             enable=1,
21             record={"sw_if_index": self.intf.sw_if_index, "input_source": self.source},
22         )
23         self._test.registry.register(self, self._test.logger)
24         return self
25
26     def remove_vpp_config(self):
27         self._test.vapi.qos_record_enable_disable(
28             enable=0,
29             record={"sw_if_index": self.intf.sw_if_index, "input_source": self.source},
30         )
31
32     def query_vpp_config(self):
33         rs = self._test.vapi.qos_record_dump()
34
35         for r in rs:
36             if (
37                 self.intf.sw_if_index == r.record.sw_if_index
38                 and self.source == r.record.input_source
39             ):
40                 return True
41         return False
42
43     def object_id(self):
44         return "qos-record-%s-%d" % (self.intf, self.source)
45
46
47 class VppQosStore(VppObject):
48     """QoS Store(ing) configuration"""
49
50     def __init__(self, test, intf, source, value):
51         self._test = test
52         self.intf = intf
53         self.source = source
54         self.value = value
55
56     def add_vpp_config(self):
57         self._test.vapi.qos_store_enable_disable(
58             enable=1,
59             store={
60                 "sw_if_index": self.intf.sw_if_index,
61                 "input_source": self.source,
62                 "value": self.value,
63             },
64         )
65         self._test.registry.register(self, self._test.logger)
66         return self
67
68     def remove_vpp_config(self):
69         self._test.vapi.qos_store_enable_disable(
70             enable=0,
71             store={"sw_if_index": self.intf.sw_if_index, "input_source": self.source},
72         )
73
74     def query_vpp_config(self):
75         rs = self._test.vapi.qos_store_dump()
76
77         for r in rs:
78             if (
79                 self.intf.sw_if_index == r.store.sw_if_index
80                 and self.source == r.store.input_source
81                 and self.value == r.store.value
82             ):
83                 return True
84         return False
85
86     def object_id(self):
87         return "qos-store-%s-%d" % (self.intf, self.source)
88
89
90 class VppQosEgressMap(VppObject):
91     """QoS Egress Map(ping) configuration"""
92
93     def __init__(self, test, id, rows):
94         self._test = test
95         self.id = id
96         self.rows = rows
97
98     def add_vpp_config(self):
99         self._test.vapi.qos_egress_map_update(map={"id": self.id, "rows": self.rows})
100         self._test.registry.register(self, self._test.logger)
101         return self
102
103     def remove_vpp_config(self):
104         self._test.vapi.qos_egress_map_delete(id=self.id)
105
106     def query_vpp_config(self):
107         rs = self._test.vapi.qos_egress_map_dump()
108
109         for r in rs:
110             if self.id == r.map.id:
111                 return True
112         return False
113
114     def object_id(self):
115         return "qos-map-%d" % (self.id)
116
117
118 class VppQosMark(VppObject):
119     """QoS Mark(ing) configuration"""
120
121     def __init__(self, test, intf, map, source):
122         self._test = test
123         self.intf = intf
124         self.source = source
125         self.map = map
126
127     def add_vpp_config(self):
128         self._test.vapi.qos_mark_enable_disable(
129             enable=1,
130             mark={
131                 "sw_if_index": self.intf.sw_if_index,
132                 "map_id": self.map.id,
133                 "output_source": self.source,
134             },
135         )
136         self._test.registry.register(self, self._test.logger)
137         return self
138
139     def remove_vpp_config(self):
140         self._test.vapi.qos_mark_enable_disable(
141             enable=0,
142             mark={"sw_if_index": self.intf.sw_if_index, "output_source": self.source},
143         )
144
145     def query_vpp_config(self):
146         ms = self._test.vapi.qos_mark_dump()
147
148         for m in ms:
149             if (
150                 self.intf.sw_if_index == m.mark.sw_if_index
151                 and self.source == m.mark.output_source
152                 and self.map.id == m.mark.map_id
153             ):
154                 return True
155         return False
156
157     def object_id(self):
158         return "qos-mark-%s-%d" % (self.intf, self.source)