1 from ipaddress import IPv4Network
3 from vpp_object import VppObject
4 from vpp_papi import VppEnum
5 from vpp_ip import INVALID_INDEX
6 from vpp_papi_provider import UnexpectedApiReturnValueError
9 class VppAclPlugin(VppObject):
11 def __init__(self, test, enable_intf_counters=False):
13 self.enable_intf_counters = enable_intf_counters
16 def enable_intf_counters(self):
17 return self._enable_intf_counters
19 @enable_intf_counters.setter
20 def enable_intf_counters(self, enable):
21 self.vapi.acl_stats_intf_counters_enable(enable=enable)
23 def add_vpp_config(self):
26 def remove_vpp_config(self):
29 def query_vpp_config(self):
33 return ("acl-plugin-%d" % (self._sw_if_index))
44 udp_sport_to = udp_sport_from + 5
45 udp_dport_from = 20000
46 udp_dport_to = udp_dport_from + 5000
48 tcp_sport_to = tcp_sport_from + 5
49 tcp_dport_from = 40000
50 tcp_dport_to = tcp_dport_from + 5000
53 udp_sport_to_2 = udp_sport_from_2 + 5
54 udp_dport_from_2 = 30000
55 udp_dport_to_2 = udp_dport_from_2 + 5000
56 tcp_sport_from_2 = 130
57 tcp_sport_to_2 = tcp_sport_from_2 + 5
58 tcp_dport_from_2 = 20000
59 tcp_dport_to_2 = tcp_dport_from_2 + 5000
61 icmp4_type = 8 # echo request
63 icmp6_type = 128 # echo request
73 def __init__(self, is_permit, src_prefix=IPv4Network('0.0.0.0/0'),
74 dst_prefix=IPv4Network('0.0.0.0/0'),
75 proto=0, ports=PORTS_ALL):
76 self.is_permit = is_permit
77 self.src_prefix = src_prefix
78 self.dst_prefix = dst_prefix
89 ports are assigned implicitly based on _proto and _ports values,
90 so we need to set them manually in case they were user defined
92 new_rule = AclRule(self.is_permit, self.src_prefix, self.dst_prefix,
93 self._proto, self._ports)
94 new_rule.sport_from = self.sport_from
95 new_rule.sport_to = self.sport_to
96 new_rule.dport_from = self.dport_from
97 new_rule.dport_to = self.dport_to
100 def update_ports(self):
101 if self._ports == self.PORTS_ALL:
104 self.sport_to = 65535
105 if self._proto == 1 or self._proto == 58:
107 self.dport_to = self.sport_to
108 elif self._ports == self.PORTS_RANGE:
109 if self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP:
110 self.sport_from = self.icmp4_type
111 self.sport_to = self.icmp4_type
112 self.dport_from = self.icmp4_code
113 self.dport_to = self.icmp4_code
114 elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP6:
115 self.sport_from = self.icmp6_type
116 self.sport_to = self.icmp6_type
117 self.dport_from = self.icmp6_code
118 self.dport_to = self.icmp6_code
119 elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_TCP:
120 self.sport_from = self.tcp_sport_from
121 self.sport_to = self.tcp_sport_to
122 self.dport_from = self.tcp_dport_from
123 self.dport_to = self.tcp_dport_to
124 elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP:
125 self.sport_from = self.udp_sport_from
126 self.sport_to = self.udp_sport_to
127 self.dport_from = self.udp_dport_from
128 self.dport_to = self.udp_dport_to
129 elif self._ports == self.PORTS_RANGE_2:
130 if self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP:
131 self.sport_from = self.icmp4_type_2
132 self.sport_to = self.icmp4_type_2
133 self.dport_from = self.icmp4_code_from_2
134 self.dport_to = self.icmp4_code_to_2
135 elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP6:
136 self.sport_from = self.icmp6_type_2
137 self.sport_to = self.icmp6_type_2
138 self.dport_from = self.icmp6_code_from_2
139 self.dport_to = self.icmp6_code_to_2
140 elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_TCP:
141 self.sport_from = self.tcp_sport_from_2
142 self.sport_to = self.tcp_sport_to_2
143 self.dport_from = self.tcp_dport_from_2
144 self.dport_to = self.tcp_dport_to_2
145 elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP:
146 self.sport_from = self.udp_sport_from_2
147 self.sport_to = self.udp_sport_to_2
148 self.dport_from = self.udp_dport_from_2
149 self.dport_to = self.udp_dport_to_2
151 self.sport_from = self._ports
152 self.sport_to = self._ports
153 self.dport_from = self._ports
154 self.dport_to = self._ports
161 def proto(self, proto):
170 def ports(self, ports):
175 return {'is_permit': self.is_permit, 'proto': self.proto,
176 'srcport_or_icmptype_first': self.sport_from,
177 'srcport_or_icmptype_last': self.sport_to,
178 'src_prefix': self.src_prefix,
179 'dstport_or_icmpcode_first': self.dport_from,
180 'dstport_or_icmpcode_last': self.dport_to,
181 'dst_prefix': self.dst_prefix}
184 class VppAcl(VppObject):
187 def __init__(self, test, rules, acl_index=INVALID_INDEX, tag=None):
189 self._acl_index = acl_index
195 return self._acl_index
199 return len(self.rules)
201 def encode_rules(self):
203 for rule in self.rules:
204 rules.append(rule.encode())
207 def add_vpp_config(self, expect_error=False):
209 reply = self._test.vapi.acl_add_replace(
210 acl_index=self._acl_index, tag=self.tag, count=self.count,
211 r=self.encode_rules())
212 self._acl_index = reply.acl_index
213 self._test.registry.register(self, self._test.logger)
215 self._test.fail("Unexpected api reply")
217 except UnexpectedApiReturnValueError:
219 self._test.fail("Unexpected api reply")
222 def remove_vpp_config(self, expect_error=False):
224 self._test.vapi.acl_del(acl_index=self._acl_index)
226 self._test.fail("Unexpected api reply")
227 except UnexpectedApiReturnValueError:
229 self._test.fail("Unexpected api reply")
232 return self._test.vapi.acl_dump(acl_index=self._acl_index)
234 def query_vpp_config(self):
237 if rule.acl_index == self._acl_index:
242 return ("acl-%s-%d" % (self.tag, self._acl_index))
245 class VppEtypeWhitelist(VppObject):
246 """ VPP Etype Whitelist """
248 def __init__(self, test, sw_if_index, whitelist, n_input=0):
250 self.whitelist = whitelist
251 self.n_input = n_input
252 self._sw_if_index = sw_if_index
255 def sw_if_index(self):
256 return self._sw_if_index
260 return len(self.whitelist)
262 def add_vpp_config(self):
263 self._test.vapi.acl_interface_set_etype_whitelist(
264 sw_if_index=self._sw_if_index, count=self.count,
265 n_input=self.n_input, whitelist=self.whitelist)
266 self._test.registry.register(self, self._test.logger)
269 def remove_vpp_config(self):
270 self._test.vapi.acl_interface_set_etype_whitelist(
271 sw_if_index=self._sw_if_index, count=0, n_input=0, whitelist=[])
273 def query_vpp_config(self):
274 self._test.vapi.acl_interface_etype_whitelist_dump(
275 sw_if_index=self._sw_if_index)
279 return ("acl-etype_wl-%d" % (self._sw_if_index))
282 class VppAclInterface(VppObject):
283 """ VPP ACL Interface """
285 def __init__(self, test, sw_if_index, acls, n_input=0):
287 self._sw_if_index = sw_if_index
288 self.n_input = n_input
292 def sw_if_index(self):
293 return self._sw_if_index
297 return len(self.acls)
299 def encode_acls(self):
301 for acl in self.acls:
302 acls.append(acl.acl_index)
305 def add_vpp_config(self, expect_error=False):
307 reply = self._test.vapi.acl_interface_set_acl_list(
308 sw_if_index=self._sw_if_index, n_input=self.n_input,
309 count=self.count, acls=self.encode_acls())
310 self._test.registry.register(self, self._test.logger)
312 self._test.fail("Unexpected api reply")
314 except UnexpectedApiReturnValueError:
316 self._test.fail("Unexpected api reply")
319 def remove_vpp_config(self, expect_error=False):
321 reply = self._test.vapi.acl_interface_set_acl_list(
322 sw_if_index=self._sw_if_index, n_input=0, count=0, acls=[])
324 self._test.fail("Unexpected api reply")
325 except UnexpectedApiReturnValueError:
327 self._test.fail("Unexpected api reply")
329 def query_vpp_config(self):
330 dump = self._test.vapi.acl_interface_list_dump(
331 sw_if_index=self._sw_if_index)
332 for acl_list in dump:
333 if acl_list.count > 0:
338 return ("acl-if-list-%d" % (self._sw_if_index))
344 def __init__(self, is_permit, src_mac=0, src_mac_mask=0,
345 src_prefix=IPv4Network('0.0.0.0/0')):
346 self.is_permit = is_permit
347 self.src_mac = src_mac
348 self.src_mac_mask = src_mac_mask
349 self.src_prefix = src_prefix
352 return {'is_permit': self.is_permit, 'src_mac': self.src_mac,
353 'src_mac_mask': self.src_mac_mask,
354 'src_prefix': self.src_prefix}
357 class VppMacipAcl(VppObject):
358 """ Vpp Mac Ip ACL """
360 def __init__(self, test, rules, acl_index=INVALID_INDEX, tag=None):
362 self._acl_index = acl_index
368 return self._acl_index
372 return len(self.rules)
374 def encode_rules(self):
376 for rule in self.rules:
377 rules.append(rule.encode())
380 def add_vpp_config(self, expect_error=False):
382 reply = self._test.vapi.macip_acl_add_replace(
383 acl_index=self._acl_index, tag=self.tag, count=self.count,
384 r=self.encode_rules())
385 self._acl_index = reply.acl_index
386 self._test.registry.register(self, self._test.logger)
388 self._test.fail("Unexpected api reply")
390 except UnexpectedApiReturnValueError:
392 self._test.fail("Unexpected api reply")
395 def remove_vpp_config(self, expect_error=False):
397 self._test.vapi.macip_acl_del(acl_index=self._acl_index)
399 self._test.fail("Unexpected api reply")
400 except UnexpectedApiReturnValueError:
402 self._test.fail("Unexpected api reply")
405 return self._test.vapi.macip_acl_dump(acl_index=self._acl_index)
407 def query_vpp_config(self):
410 if rule.acl_index == self._acl_index:
415 return ("macip-acl-%s-%d" % (self.tag, self._acl_index))
418 class VppMacipAclInterface(VppObject):
419 """ VPP Mac Ip ACL Interface """
421 def __init__(self, test, sw_if_index, acls):
423 self._sw_if_index = sw_if_index
427 def sw_if_index(self):
428 return self._sw_if_index
432 return len(self.acls)
434 def add_vpp_config(self):
435 for acl in self.acls:
436 self._test.vapi.macip_acl_interface_add_del(
437 is_add=True, sw_if_index=self._sw_if_index,
438 acl_index=acl.acl_index)
439 self._test.registry.register(self, self._test.logger)
441 def remove_vpp_config(self):
442 for acl in self.acls:
443 self._test.vapi.macip_acl_interface_add_del(
444 is_add=False, sw_if_index=self._sw_if_index,
445 acl_index=acl.acl_index)
448 return self._test.vapi.macip_acl_interface_list_dump(
449 sw_if_index=self._sw_if_index)
451 def query_vpp_config(self):
453 for acl_list in dump:
454 for acl_index in acl_list.acls:
455 if acl_index != INVALID_INDEX:
460 return ("macip-acl-if-list-%d" % (self._sw_if_index))