tests: replace pycodestyle with black
[vpp.git] / test / vpp_acl.py
1 from ipaddress import IPv4Network
2
3 from vpp_object import VppObject
4 from vpp_papi import VppEnum
5 from vpp_ip import INVALID_INDEX
6 from vpp_papi_provider import UnexpectedApiReturnValueError
7
8
9 class VppAclPlugin(VppObject):
10     def __init__(self, test, enable_intf_counters=False):
11         self._test = test
12         self.enable_intf_counters = enable_intf_counters
13
14     @property
15     def enable_intf_counters(self):
16         return self._enable_intf_counters
17
18     @enable_intf_counters.setter
19     def enable_intf_counters(self, enable):
20         self.vapi.acl_stats_intf_counters_enable(enable=enable)
21
22     def add_vpp_config(self):
23         pass
24
25     def remove_vpp_config(self):
26         pass
27
28     def query_vpp_config(self):
29         pass
30
31     def object_id(self):
32         return "acl-plugin-%d" % (self._sw_if_index)
33
34
35 class AclRule:
36     """ACL Rule"""
37
38     # port ranges
39     PORTS_ALL = -1
40     PORTS_RANGE = 0
41     PORTS_RANGE_2 = 1
42     udp_sport_from = 10
43     udp_sport_to = udp_sport_from + 5
44     udp_dport_from = 20000
45     udp_dport_to = udp_dport_from + 5000
46     tcp_sport_from = 30
47     tcp_sport_to = tcp_sport_from + 5
48     tcp_dport_from = 40000
49     tcp_dport_to = tcp_dport_from + 5000
50
51     udp_sport_from_2 = 90
52     udp_sport_to_2 = udp_sport_from_2 + 5
53     udp_dport_from_2 = 30000
54     udp_dport_to_2 = udp_dport_from_2 + 5000
55     tcp_sport_from_2 = 130
56     tcp_sport_to_2 = tcp_sport_from_2 + 5
57     tcp_dport_from_2 = 20000
58     tcp_dport_to_2 = tcp_dport_from_2 + 5000
59
60     icmp4_type = 8  # echo request
61     icmp4_code = 3
62     icmp6_type = 128  # echo request
63     icmp6_code = 3
64
65     icmp4_type_2 = 8
66     icmp4_code_from_2 = 5
67     icmp4_code_to_2 = 20
68     icmp6_type_2 = 128
69     icmp6_code_from_2 = 8
70     icmp6_code_to_2 = 42
71
72     def __init__(
73         self,
74         is_permit,
75         src_prefix=IPv4Network("0.0.0.0/0"),
76         dst_prefix=IPv4Network("0.0.0.0/0"),
77         proto=0,
78         ports=PORTS_ALL,
79         sport_from=None,
80         sport_to=None,
81         dport_from=None,
82         dport_to=None,
83     ):
84         self.is_permit = is_permit
85         self.src_prefix = src_prefix
86         self.dst_prefix = dst_prefix
87         self._proto = proto
88         self._ports = ports
89         # assign ports by range
90         self.update_ports()
91         # assign specified ports
92         if sport_from:
93             self.sport_from = sport_from
94         if sport_to:
95             self.sport_to = sport_to
96         if dport_from:
97             self.dport_from = dport_from
98         if dport_to:
99             self.dport_to = dport_to
100
101     def __copy__(self):
102         new_rule = AclRule(
103             self.is_permit,
104             self.src_prefix,
105             self.dst_prefix,
106             self._proto,
107             self._ports,
108             self.sport_from,
109             self.sport_to,
110             self.dport_from,
111             self.dport_to,
112         )
113         return new_rule
114
115     def update_ports(self):
116         if self._ports == self.PORTS_ALL:
117             self.sport_from = 0
118             self.dport_from = 0
119             self.sport_to = 65535
120             if self._proto == 1 or self._proto == 58:
121                 self.sport_to = 255
122             self.dport_to = self.sport_to
123         elif self._ports == self.PORTS_RANGE:
124             if self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP:
125                 self.sport_from = self.icmp4_type
126                 self.sport_to = self.icmp4_type
127                 self.dport_from = self.icmp4_code
128                 self.dport_to = self.icmp4_code
129             elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP6:
130                 self.sport_from = self.icmp6_type
131                 self.sport_to = self.icmp6_type
132                 self.dport_from = self.icmp6_code
133                 self.dport_to = self.icmp6_code
134             elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_TCP:
135                 self.sport_from = self.tcp_sport_from
136                 self.sport_to = self.tcp_sport_to
137                 self.dport_from = self.tcp_dport_from
138                 self.dport_to = self.tcp_dport_to
139             elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP:
140                 self.sport_from = self.udp_sport_from
141                 self.sport_to = self.udp_sport_to
142                 self.dport_from = self.udp_dport_from
143                 self.dport_to = self.udp_dport_to
144         elif self._ports == self.PORTS_RANGE_2:
145             if self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP:
146                 self.sport_from = self.icmp4_type_2
147                 self.sport_to = self.icmp4_type_2
148                 self.dport_from = self.icmp4_code_from_2
149                 self.dport_to = self.icmp4_code_to_2
150             elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP6:
151                 self.sport_from = self.icmp6_type_2
152                 self.sport_to = self.icmp6_type_2
153                 self.dport_from = self.icmp6_code_from_2
154                 self.dport_to = self.icmp6_code_to_2
155             elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_TCP:
156                 self.sport_from = self.tcp_sport_from_2
157                 self.sport_to = self.tcp_sport_to_2
158                 self.dport_from = self.tcp_dport_from_2
159                 self.dport_to = self.tcp_dport_to_2
160             elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP:
161                 self.sport_from = self.udp_sport_from_2
162                 self.sport_to = self.udp_sport_to_2
163                 self.dport_from = self.udp_dport_from_2
164                 self.dport_to = self.udp_dport_to_2
165         else:
166             self.sport_from = self._ports
167             self.sport_to = self._ports
168             self.dport_from = self._ports
169             self.dport_to = self._ports
170
171     @property
172     def proto(self):
173         return self._proto
174
175     @proto.setter
176     def proto(self, proto):
177         self._proto = proto
178         self.update_ports()
179
180     @property
181     def ports(self):
182         return self._ports
183
184     @ports.setter
185     def ports(self, ports):
186         self._ports = ports
187         self.update_ports()
188
189     def encode(self):
190         return {
191             "is_permit": self.is_permit,
192             "proto": self.proto,
193             "srcport_or_icmptype_first": self.sport_from,
194             "srcport_or_icmptype_last": self.sport_to,
195             "src_prefix": self.src_prefix,
196             "dstport_or_icmpcode_first": self.dport_from,
197             "dstport_or_icmpcode_last": self.dport_to,
198             "dst_prefix": self.dst_prefix,
199         }
200
201
202 class VppAcl(VppObject):
203     """VPP ACL"""
204
205     def __init__(self, test, rules, acl_index=INVALID_INDEX, tag=None):
206         self._test = test
207         self._acl_index = acl_index
208         self.tag = tag
209         self._rules = rules
210
211     @property
212     def rules(self):
213         return self._rules
214
215     @property
216     def acl_index(self):
217         return self._acl_index
218
219     @property
220     def count(self):
221         return len(self._rules)
222
223     def encode_rules(self):
224         rules = []
225         for rule in self._rules:
226             rules.append(rule.encode())
227         return rules
228
229     def add_vpp_config(self, expect_error=False):
230         try:
231             reply = self._test.vapi.acl_add_replace(
232                 acl_index=self._acl_index,
233                 tag=self.tag,
234                 count=self.count,
235                 r=self.encode_rules(),
236             )
237             self._acl_index = reply.acl_index
238             self._test.registry.register(self, self._test.logger)
239             if expect_error:
240                 self._test.fail("Unexpected api reply")
241             return self
242         except UnexpectedApiReturnValueError:
243             if not expect_error:
244                 self._test.fail("Unexpected api reply")
245         return None
246
247     def modify_vpp_config(self, rules):
248         self._rules = rules
249         self.add_vpp_config()
250
251     def remove_vpp_config(self, expect_error=False):
252         try:
253             self._test.vapi.acl_del(acl_index=self._acl_index)
254             if expect_error:
255                 self._test.fail("Unexpected api reply")
256         except UnexpectedApiReturnValueError:
257             if not expect_error:
258                 self._test.fail("Unexpected api reply")
259
260     def dump(self):
261         return self._test.vapi.acl_dump(acl_index=self._acl_index)
262
263     def query_vpp_config(self):
264         dump = self.dump()
265         for rule in dump:
266             if rule.acl_index == self._acl_index:
267                 return True
268         return False
269
270     def object_id(self):
271         return "acl-%s-%d" % (self.tag, self._acl_index)
272
273
274 class VppEtypeWhitelist(VppObject):
275     """VPP Etype Whitelist"""
276
277     def __init__(self, test, sw_if_index, whitelist, n_input=0):
278         self._test = test
279         self.whitelist = whitelist
280         self.n_input = n_input
281         self._sw_if_index = sw_if_index
282
283     @property
284     def sw_if_index(self):
285         return self._sw_if_index
286
287     @property
288     def count(self):
289         return len(self.whitelist)
290
291     def add_vpp_config(self):
292         self._test.vapi.acl_interface_set_etype_whitelist(
293             sw_if_index=self._sw_if_index,
294             count=self.count,
295             n_input=self.n_input,
296             whitelist=self.whitelist,
297         )
298         self._test.registry.register(self, self._test.logger)
299         return self
300
301     def remove_vpp_config(self):
302         self._test.vapi.acl_interface_set_etype_whitelist(
303             sw_if_index=self._sw_if_index, count=0, n_input=0, whitelist=[]
304         )
305
306     def query_vpp_config(self):
307         self._test.vapi.acl_interface_etype_whitelist_dump(
308             sw_if_index=self._sw_if_index
309         )
310         return False
311
312     def object_id(self):
313         return "acl-etype_wl-%d" % (self._sw_if_index)
314
315
316 class VppAclInterface(VppObject):
317     """VPP ACL Interface"""
318
319     def __init__(self, test, sw_if_index, acls, n_input=0):
320         self._test = test
321         self._sw_if_index = sw_if_index
322         self.n_input = n_input
323         self.acls = acls
324
325     @property
326     def sw_if_index(self):
327         return self._sw_if_index
328
329     @property
330     def count(self):
331         return len(self.acls)
332
333     def encode_acls(self):
334         acls = []
335         for acl in self.acls:
336             acls.append(acl.acl_index)
337         return acls
338
339     def add_vpp_config(self, expect_error=False):
340         try:
341             reply = self._test.vapi.acl_interface_set_acl_list(
342                 sw_if_index=self._sw_if_index,
343                 n_input=self.n_input,
344                 count=self.count,
345                 acls=self.encode_acls(),
346             )
347             self._test.registry.register(self, self._test.logger)
348             if expect_error:
349                 self._test.fail("Unexpected api reply")
350             return self
351         except UnexpectedApiReturnValueError:
352             if not expect_error:
353                 self._test.fail("Unexpected api reply")
354         return None
355
356     def remove_vpp_config(self, expect_error=False):
357         try:
358             reply = self._test.vapi.acl_interface_set_acl_list(
359                 sw_if_index=self._sw_if_index, n_input=0, count=0, acls=[]
360             )
361             if expect_error:
362                 self._test.fail("Unexpected api reply")
363         except UnexpectedApiReturnValueError:
364             if not expect_error:
365                 self._test.fail("Unexpected api reply")
366
367     def query_vpp_config(self):
368         dump = self._test.vapi.acl_interface_list_dump(sw_if_index=self._sw_if_index)
369         for acl_list in dump:
370             if acl_list.count > 0:
371                 return True
372         return False
373
374     def object_id(self):
375         return "acl-if-list-%d" % (self._sw_if_index)
376
377
378 class MacipRule:
379     """Mac Ip rule"""
380
381     def __init__(
382         self, is_permit, src_mac=0, src_mac_mask=0, src_prefix=IPv4Network("0.0.0.0/0")
383     ):
384         self.is_permit = is_permit
385         self.src_mac = src_mac
386         self.src_mac_mask = src_mac_mask
387         self.src_prefix = src_prefix
388
389     def encode(self):
390         return {
391             "is_permit": self.is_permit,
392             "src_mac": self.src_mac,
393             "src_mac_mask": self.src_mac_mask,
394             "src_prefix": self.src_prefix,
395         }
396
397
398 class VppMacipAcl(VppObject):
399     """Vpp Mac Ip ACL"""
400
401     def __init__(self, test, rules, acl_index=INVALID_INDEX, tag=None):
402         self._test = test
403         self._acl_index = acl_index
404         self.tag = tag
405         self._rules = rules
406
407     @property
408     def acl_index(self):
409         return self._acl_index
410
411     @property
412     def rules(self):
413         return self._rules
414
415     @property
416     def count(self):
417         return len(self._rules)
418
419     def encode_rules(self):
420         rules = []
421         for rule in self._rules:
422             rules.append(rule.encode())
423         return rules
424
425     def add_vpp_config(self, expect_error=False):
426         try:
427             reply = self._test.vapi.macip_acl_add_replace(
428                 acl_index=self._acl_index,
429                 tag=self.tag,
430                 count=self.count,
431                 r=self.encode_rules(),
432             )
433             self._acl_index = reply.acl_index
434             self._test.registry.register(self, self._test.logger)
435             if expect_error:
436                 self._test.fail("Unexpected api reply")
437             return self
438         except UnexpectedApiReturnValueError:
439             if not expect_error:
440                 self._test.fail("Unexpected api reply")
441         return None
442
443     def modify_vpp_config(self, rules):
444         self._rules = rules
445         self.add_vpp_config()
446
447     def remove_vpp_config(self, expect_error=False):
448         try:
449             self._test.vapi.macip_acl_del(acl_index=self._acl_index)
450             if expect_error:
451                 self._test.fail("Unexpected api reply")
452         except UnexpectedApiReturnValueError:
453             if not expect_error:
454                 self._test.fail("Unexpected api reply")
455
456     def dump(self):
457         return self._test.vapi.macip_acl_dump(acl_index=self._acl_index)
458
459     def query_vpp_config(self):
460         dump = self.dump()
461         for rule in dump:
462             if rule.acl_index == self._acl_index:
463                 return True
464         return False
465
466     def object_id(self):
467         return "macip-acl-%s-%d" % (self.tag, self._acl_index)
468
469
470 class VppMacipAclInterface(VppObject):
471     """VPP Mac Ip ACL Interface"""
472
473     def __init__(self, test, sw_if_index, acls):
474         self._test = test
475         self._sw_if_index = sw_if_index
476         self.acls = acls
477
478     @property
479     def sw_if_index(self):
480         return self._sw_if_index
481
482     @property
483     def count(self):
484         return len(self.acls)
485
486     def add_vpp_config(self):
487         for acl in self.acls:
488             self._test.vapi.macip_acl_interface_add_del(
489                 is_add=True, sw_if_index=self._sw_if_index, acl_index=acl.acl_index
490             )
491         self._test.registry.register(self, self._test.logger)
492
493     def remove_vpp_config(self):
494         for acl in self.acls:
495             self._test.vapi.macip_acl_interface_add_del(
496                 is_add=False, sw_if_index=self._sw_if_index, acl_index=acl.acl_index
497             )
498
499     def dump(self):
500         return self._test.vapi.macip_acl_interface_list_dump(
501             sw_if_index=self._sw_if_index
502         )
503
504     def query_vpp_config(self):
505         dump = self.dump()
506         for acl_list in dump:
507             for acl_index in acl_list.acls:
508                 if acl_index != INVALID_INDEX:
509                     return True
510         return False
511
512     def object_id(self):
513         return "macip-acl-if-list-%d" % (self._sw_if_index)