acl: API cleanup
[vpp.git] / src / plugins / acl / test / vpp_acl.py
1 from ipaddress import IPv4Network
2
3 from vpp_object import VppObject
4 from vpp_papi import VppEnum
5 from vpp_ip import INVALID_INDEX
6 from vpp_papi_provider import UnexpectedApiReturnValueError
7
8
9 class VppAclPlugin(VppObject):
10
11     def __init__(self, test, enable_intf_counters=False):
12         self._test = test
13         self.enable_intf_counters = enable_intf_counters
14
15     @property
16     def enable_intf_counters(self):
17         return self._enable_intf_counters
18
19     @enable_intf_counters.setter
20     def enable_intf_counters(self, enable):
21         self.vapi.acl_stats_intf_counters_enable(enable=enable)
22
23     def add_vpp_config(self):
24         pass
25
26     def remove_vpp_config(self):
27         pass
28
29     def query_vpp_config(self):
30         pass
31
32     def object_id(self):
33         return ("acl-plugin-%d" % (self._sw_if_index))
34
35
36 class AclRule():
37     """ ACL Rule """
38
39     # port ranges
40     PORTS_ALL = -1
41     PORTS_RANGE = 0
42     PORTS_RANGE_2 = 1
43     udp_sport_from = 10
44     udp_sport_to = udp_sport_from + 5
45     udp_dport_from = 20000
46     udp_dport_to = udp_dport_from + 5000
47     tcp_sport_from = 30
48     tcp_sport_to = tcp_sport_from + 5
49     tcp_dport_from = 40000
50     tcp_dport_to = tcp_dport_from + 5000
51
52     udp_sport_from_2 = 90
53     udp_sport_to_2 = udp_sport_from_2 + 5
54     udp_dport_from_2 = 30000
55     udp_dport_to_2 = udp_dport_from_2 + 5000
56     tcp_sport_from_2 = 130
57     tcp_sport_to_2 = tcp_sport_from_2 + 5
58     tcp_dport_from_2 = 20000
59     tcp_dport_to_2 = tcp_dport_from_2 + 5000
60
61     icmp4_type = 8  # echo request
62     icmp4_code = 3
63     icmp6_type = 128  # echo request
64     icmp6_code = 3
65
66     icmp4_type_2 = 8
67     icmp4_code_from_2 = 5
68     icmp4_code_to_2 = 20
69     icmp6_type_2 = 128
70     icmp6_code_from_2 = 8
71     icmp6_code_to_2 = 42
72
73     def __init__(self, is_permit, src_prefix=IPv4Network('0.0.0.0/0'),
74                  dst_prefix=IPv4Network('0.0.0.0/0'),
75                  proto=0, ports=PORTS_ALL, sport_from=None, sport_to=None,
76                  dport_from=None, dport_to=None):
77         self.is_permit = is_permit
78         self.src_prefix = src_prefix
79         self.dst_prefix = dst_prefix
80         self._proto = proto
81         self._ports = ports
82         # assign ports by range
83         self.update_ports()
84         # assign specified ports
85         if sport_from:
86             self.sport_from = sport_from
87         if sport_to:
88             self.sport_to = sport_to
89         if dport_from:
90             self.dport_from = dport_from
91         if dport_to:
92             self.dport_to = dport_to
93
94     def __copy__(self):
95         new_rule = AclRule(self.is_permit, self.src_prefix, self.dst_prefix,
96                            self._proto, self._ports, self.sport_from,
97                            self.sport_to, self.dport_from, self.dport_to)
98         return new_rule
99
100     def update_ports(self):
101         if self._ports == self.PORTS_ALL:
102             self.sport_from = 0
103             self.dport_from = 0
104             self.sport_to = 65535
105             if self._proto == 1 or self._proto == 58:
106                 self.sport_to = 255
107             self.dport_to = self.sport_to
108         elif self._ports == self.PORTS_RANGE:
109             if self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP:
110                 self.sport_from = self.icmp4_type
111                 self.sport_to = self.icmp4_type
112                 self.dport_from = self.icmp4_code
113                 self.dport_to = self.icmp4_code
114             elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP6:
115                 self.sport_from = self.icmp6_type
116                 self.sport_to = self.icmp6_type
117                 self.dport_from = self.icmp6_code
118                 self.dport_to = self.icmp6_code
119             elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_TCP:
120                 self.sport_from = self.tcp_sport_from
121                 self.sport_to = self.tcp_sport_to
122                 self.dport_from = self.tcp_dport_from
123                 self.dport_to = self.tcp_dport_to
124             elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP:
125                 self.sport_from = self.udp_sport_from
126                 self.sport_to = self.udp_sport_to
127                 self.dport_from = self.udp_dport_from
128                 self.dport_to = self.udp_dport_to
129         elif self._ports == self.PORTS_RANGE_2:
130             if self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP:
131                 self.sport_from = self.icmp4_type_2
132                 self.sport_to = self.icmp4_type_2
133                 self.dport_from = self.icmp4_code_from_2
134                 self.dport_to = self.icmp4_code_to_2
135             elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP6:
136                 self.sport_from = self.icmp6_type_2
137                 self.sport_to = self.icmp6_type_2
138                 self.dport_from = self.icmp6_code_from_2
139                 self.dport_to = self.icmp6_code_to_2
140             elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_TCP:
141                 self.sport_from = self.tcp_sport_from_2
142                 self.sport_to = self.tcp_sport_to_2
143                 self.dport_from = self.tcp_dport_from_2
144                 self.dport_to = self.tcp_dport_to_2
145             elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP:
146                 self.sport_from = self.udp_sport_from_2
147                 self.sport_to = self.udp_sport_to_2
148                 self.dport_from = self.udp_dport_from_2
149                 self.dport_to = self.udp_dport_to_2
150         else:
151             self.sport_from = self._ports
152             self.sport_to = self._ports
153             self.dport_from = self._ports
154             self.dport_to = self._ports
155
156     @property
157     def proto(self):
158         return self._proto
159
160     @proto.setter
161     def proto(self, proto):
162         self._proto = proto
163         self.update_ports()
164
165     @property
166     def ports(self):
167         return self._ports
168
169     @ports.setter
170     def ports(self, ports):
171         self._ports = ports
172         self.update_ports()
173
174     def encode(self):
175         return {'is_permit': self.is_permit, 'proto': self.proto,
176                 'srcport_or_icmptype_first': self.sport_from,
177                 'srcport_or_icmptype_last': self.sport_to,
178                 'src_prefix': self.src_prefix,
179                 'dstport_or_icmpcode_first': self.dport_from,
180                 'dstport_or_icmpcode_last': self.dport_to,
181                 'dst_prefix': self.dst_prefix}
182
183
184 class VppAcl(VppObject):
185     """ VPP ACL """
186
187     def __init__(self, test, rules, acl_index=INVALID_INDEX, tag=None):
188         self._test = test
189         self._acl_index = acl_index
190         self.tag = tag
191         self._rules = rules
192
193     @property
194     def rules(self):
195         return self._rules
196
197     @property
198     def acl_index(self):
199         return self._acl_index
200
201     @property
202     def count(self):
203         return len(self._rules)
204
205     def encode_rules(self):
206         rules = []
207         for rule in self._rules:
208             rules.append(rule.encode())
209         return rules
210
211     def add_vpp_config(self, expect_error=False):
212         try:
213             reply = self._test.vapi.acl_add_replace(
214                 acl_index=self._acl_index, tag=self.tag, count=self.count,
215                 r=self.encode_rules())
216             self._acl_index = reply.acl_index
217             self._test.registry.register(self, self._test.logger)
218             if expect_error:
219                 self._test.fail("Unexpected api reply")
220             return self
221         except UnexpectedApiReturnValueError:
222             if not expect_error:
223                 self._test.fail("Unexpected api reply")
224         return None
225
226     def modify_vpp_config(self, rules):
227         self._rules = rules
228         self.add_vpp_config()
229
230     def remove_vpp_config(self, expect_error=False):
231         try:
232             self._test.vapi.acl_del(acl_index=self._acl_index)
233             if expect_error:
234                 self._test.fail("Unexpected api reply")
235         except UnexpectedApiReturnValueError:
236             if not expect_error:
237                 self._test.fail("Unexpected api reply")
238
239     def dump(self):
240         return self._test.vapi.acl_dump(acl_index=self._acl_index)
241
242     def query_vpp_config(self):
243         dump = self.dump()
244         for rule in dump:
245             if rule.acl_index == self._acl_index:
246                 return True
247         return False
248
249     def object_id(self):
250         return ("acl-%s-%d" % (self.tag, self._acl_index))
251
252
253 class VppEtypeWhitelist(VppObject):
254     """ VPP Etype Whitelist """
255
256     def __init__(self, test, sw_if_index, whitelist, n_input=0):
257         self._test = test
258         self.whitelist = whitelist
259         self.n_input = n_input
260         self._sw_if_index = sw_if_index
261
262     @property
263     def sw_if_index(self):
264         return self._sw_if_index
265
266     @property
267     def count(self):
268         return len(self.whitelist)
269
270     def add_vpp_config(self):
271         self._test.vapi.acl_interface_set_etype_whitelist(
272             sw_if_index=self._sw_if_index, count=self.count,
273             n_input=self.n_input, whitelist=self.whitelist)
274         self._test.registry.register(self, self._test.logger)
275         return self
276
277     def remove_vpp_config(self):
278         self._test.vapi.acl_interface_set_etype_whitelist(
279             sw_if_index=self._sw_if_index, count=0, n_input=0, whitelist=[])
280
281     def query_vpp_config(self):
282         self._test.vapi.acl_interface_etype_whitelist_dump(
283             sw_if_index=self._sw_if_index)
284         return False
285
286     def object_id(self):
287         return ("acl-etype_wl-%d" % (self._sw_if_index))
288
289
290 class VppAclInterface(VppObject):
291     """ VPP ACL Interface """
292
293     def __init__(self, test, sw_if_index, acls, n_input=0):
294         self._test = test
295         self._sw_if_index = sw_if_index
296         self.n_input = n_input
297         self.acls = acls
298
299     @property
300     def sw_if_index(self):
301         return self._sw_if_index
302
303     @property
304     def count(self):
305         return len(self.acls)
306
307     def encode_acls(self):
308         acls = []
309         for acl in self.acls:
310             acls.append(acl.acl_index)
311         return acls
312
313     def add_vpp_config(self, expect_error=False):
314         try:
315             reply = self._test.vapi.acl_interface_set_acl_list(
316                 sw_if_index=self._sw_if_index, n_input=self.n_input,
317                 count=self.count, acls=self.encode_acls())
318             self._test.registry.register(self, self._test.logger)
319             if expect_error:
320                 self._test.fail("Unexpected api reply")
321             return self
322         except UnexpectedApiReturnValueError:
323             if not expect_error:
324                 self._test.fail("Unexpected api reply")
325         return None
326
327     def remove_vpp_config(self, expect_error=False):
328         try:
329             reply = self._test.vapi.acl_interface_set_acl_list(
330                 sw_if_index=self._sw_if_index, n_input=0, count=0, acls=[])
331             if expect_error:
332                 self._test.fail("Unexpected api reply")
333         except UnexpectedApiReturnValueError:
334             if not expect_error:
335                 self._test.fail("Unexpected api reply")
336
337     def query_vpp_config(self):
338         dump = self._test.vapi.acl_interface_list_dump(
339             sw_if_index=self._sw_if_index)
340         for acl_list in dump:
341             if acl_list.count > 0:
342                 return True
343         return False
344
345     def object_id(self):
346         return ("acl-if-list-%d" % (self._sw_if_index))
347
348
349 class MacipRule():
350     """ Mac Ip rule """
351
352     def __init__(self, is_permit, src_mac=0, src_mac_mask=0,
353                  src_prefix=IPv4Network('0.0.0.0/0')):
354         self.is_permit = is_permit
355         self.src_mac = src_mac
356         self.src_mac_mask = src_mac_mask
357         self.src_prefix = src_prefix
358
359     def encode(self):
360         return {'is_permit': self.is_permit, 'src_mac': self.src_mac,
361                 'src_mac_mask': self.src_mac_mask,
362                 'src_prefix': self.src_prefix}
363
364
365 class VppMacipAcl(VppObject):
366     """ Vpp Mac Ip ACL """
367
368     def __init__(self, test, rules, acl_index=INVALID_INDEX, tag=None):
369         self._test = test
370         self._acl_index = acl_index
371         self.tag = tag
372         self._rules = rules
373
374     @property
375     def acl_index(self):
376         return self._acl_index
377
378     @property
379     def rules(self):
380         return self._rules
381
382     @property
383     def count(self):
384         return len(self._rules)
385
386     def encode_rules(self):
387         rules = []
388         for rule in self._rules:
389             rules.append(rule.encode())
390         return rules
391
392     def add_vpp_config(self, expect_error=False):
393         try:
394             reply = self._test.vapi.macip_acl_add_replace(
395                 acl_index=self._acl_index, tag=self.tag, count=self.count,
396                 r=self.encode_rules())
397             self._acl_index = reply.acl_index
398             self._test.registry.register(self, self._test.logger)
399             if expect_error:
400                 self._test.fail("Unexpected api reply")
401             return self
402         except UnexpectedApiReturnValueError:
403             if not expect_error:
404                 self._test.fail("Unexpected api reply")
405         return None
406
407     def modify_vpp_config(self, rules):
408         self._rules = rules
409         self.add_vpp_config()
410
411     def remove_vpp_config(self, expect_error=False):
412         try:
413             self._test.vapi.macip_acl_del(acl_index=self._acl_index)
414             if expect_error:
415                 self._test.fail("Unexpected api reply")
416         except UnexpectedApiReturnValueError:
417             if not expect_error:
418                 self._test.fail("Unexpected api reply")
419
420     def dump(self):
421         return self._test.vapi.macip_acl_dump(acl_index=self._acl_index)
422
423     def query_vpp_config(self):
424         dump = self.dump()
425         for rule in dump:
426             if rule.acl_index == self._acl_index:
427                 return True
428         return False
429
430     def object_id(self):
431         return ("macip-acl-%s-%d" % (self.tag, self._acl_index))
432
433
434 class VppMacipAclInterface(VppObject):
435     """ VPP Mac Ip ACL Interface """
436
437     def __init__(self, test, sw_if_index, acls):
438         self._test = test
439         self._sw_if_index = sw_if_index
440         self.acls = acls
441
442     @property
443     def sw_if_index(self):
444         return self._sw_if_index
445
446     @property
447     def count(self):
448         return len(self.acls)
449
450     def add_vpp_config(self):
451         for acl in self.acls:
452             self._test.vapi.macip_acl_interface_add_del(
453                 is_add=True, sw_if_index=self._sw_if_index,
454                 acl_index=acl.acl_index)
455         self._test.registry.register(self, self._test.logger)
456
457     def remove_vpp_config(self):
458         for acl in self.acls:
459             self._test.vapi.macip_acl_interface_add_del(
460                 is_add=False, sw_if_index=self._sw_if_index,
461                 acl_index=acl.acl_index)
462
463     def dump(self):
464         return self._test.vapi.macip_acl_interface_list_dump(
465             sw_if_index=self._sw_if_index)
466
467     def query_vpp_config(self):
468         dump = self.dump()
469         for acl_list in dump:
470             for acl_index in acl_list.acls:
471                 if acl_index != INVALID_INDEX:
472                     return True
473         return False
474
475     def object_id(self):
476         return ("macip-acl-if-list-%d" % (self._sw_if_index))