1 from ipaddress import IPv4Network
3 from vpp_object import VppObject
4 from vpp_papi import VppEnum
5 from vpp_ip import INVALID_INDEX
6 from vpp_papi_provider import UnexpectedApiReturnValueError
9 class VppAclPlugin(VppObject):
10 def __init__(self, test, enable_intf_counters=False):
12 self.enable_intf_counters = enable_intf_counters
15 def enable_intf_counters(self):
16 return self._enable_intf_counters
18 @enable_intf_counters.setter
19 def enable_intf_counters(self, enable):
20 self.vapi.acl_stats_intf_counters_enable(enable=enable)
22 def add_vpp_config(self):
25 def remove_vpp_config(self):
28 def query_vpp_config(self):
32 return "acl-plugin-%d" % (self._sw_if_index)
43 udp_sport_to = udp_sport_from + 5
44 udp_dport_from = 20000
45 udp_dport_to = udp_dport_from + 5000
47 tcp_sport_to = tcp_sport_from + 5
48 tcp_dport_from = 40000
49 tcp_dport_to = tcp_dport_from + 5000
52 udp_sport_to_2 = udp_sport_from_2 + 5
53 udp_dport_from_2 = 30000
54 udp_dport_to_2 = udp_dport_from_2 + 5000
55 tcp_sport_from_2 = 130
56 tcp_sport_to_2 = tcp_sport_from_2 + 5
57 tcp_dport_from_2 = 20000
58 tcp_dport_to_2 = tcp_dport_from_2 + 5000
60 icmp4_type = 8 # echo request
62 icmp6_type = 128 # echo request
75 src_prefix=IPv4Network("0.0.0.0/0"),
76 dst_prefix=IPv4Network("0.0.0.0/0"),
84 self.is_permit = is_permit
85 self.src_prefix = src_prefix
86 self.dst_prefix = dst_prefix
89 # assign ports by range
91 # assign specified ports
93 self.sport_from = sport_from
95 self.sport_to = sport_to
97 self.dport_from = dport_from
99 self.dport_to = dport_to
115 def update_ports(self):
116 if self._ports == self.PORTS_ALL:
119 self.sport_to = 65535
120 if self._proto == 1 or self._proto == 58:
122 self.dport_to = self.sport_to
123 elif self._ports == self.PORTS_RANGE:
124 if self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP:
125 self.sport_from = self.icmp4_type
126 self.sport_to = self.icmp4_type
127 self.dport_from = self.icmp4_code
128 self.dport_to = self.icmp4_code
129 elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP6:
130 self.sport_from = self.icmp6_type
131 self.sport_to = self.icmp6_type
132 self.dport_from = self.icmp6_code
133 self.dport_to = self.icmp6_code
134 elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_TCP:
135 self.sport_from = self.tcp_sport_from
136 self.sport_to = self.tcp_sport_to
137 self.dport_from = self.tcp_dport_from
138 self.dport_to = self.tcp_dport_to
139 elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP:
140 self.sport_from = self.udp_sport_from
141 self.sport_to = self.udp_sport_to
142 self.dport_from = self.udp_dport_from
143 self.dport_to = self.udp_dport_to
144 elif self._ports == self.PORTS_RANGE_2:
145 if self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP:
146 self.sport_from = self.icmp4_type_2
147 self.sport_to = self.icmp4_type_2
148 self.dport_from = self.icmp4_code_from_2
149 self.dport_to = self.icmp4_code_to_2
150 elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP6:
151 self.sport_from = self.icmp6_type_2
152 self.sport_to = self.icmp6_type_2
153 self.dport_from = self.icmp6_code_from_2
154 self.dport_to = self.icmp6_code_to_2
155 elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_TCP:
156 self.sport_from = self.tcp_sport_from_2
157 self.sport_to = self.tcp_sport_to_2
158 self.dport_from = self.tcp_dport_from_2
159 self.dport_to = self.tcp_dport_to_2
160 elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP:
161 self.sport_from = self.udp_sport_from_2
162 self.sport_to = self.udp_sport_to_2
163 self.dport_from = self.udp_dport_from_2
164 self.dport_to = self.udp_dport_to_2
166 self.sport_from = self._ports
167 self.sport_to = self._ports
168 self.dport_from = self._ports
169 self.dport_to = self._ports
176 def proto(self, proto):
185 def ports(self, ports):
191 "is_permit": self.is_permit,
193 "srcport_or_icmptype_first": self.sport_from,
194 "srcport_or_icmptype_last": self.sport_to,
195 "src_prefix": self.src_prefix,
196 "dstport_or_icmpcode_first": self.dport_from,
197 "dstport_or_icmpcode_last": self.dport_to,
198 "dst_prefix": self.dst_prefix,
202 class VppAcl(VppObject):
205 def __init__(self, test, rules, acl_index=INVALID_INDEX, tag=None):
207 self._acl_index = acl_index
217 return self._acl_index
221 return len(self._rules)
223 def encode_rules(self):
225 for rule in self._rules:
226 rules.append(rule.encode())
229 def add_vpp_config(self, expect_error=False):
231 reply = self._test.vapi.acl_add_replace(
232 acl_index=self._acl_index,
235 r=self.encode_rules(),
237 self._acl_index = reply.acl_index
238 self._test.registry.register(self, self._test.logger)
240 self._test.fail("Unexpected api reply")
242 except UnexpectedApiReturnValueError:
244 self._test.fail("Unexpected api reply")
247 def modify_vpp_config(self, rules):
249 self.add_vpp_config()
251 def remove_vpp_config(self, expect_error=False):
253 self._test.vapi.acl_del(acl_index=self._acl_index)
255 self._test.fail("Unexpected api reply")
256 except UnexpectedApiReturnValueError:
258 self._test.fail("Unexpected api reply")
261 return self._test.vapi.acl_dump(acl_index=self._acl_index)
263 def query_vpp_config(self):
266 if rule.acl_index == self._acl_index:
271 return "acl-%s-%d" % (self.tag, self._acl_index)
274 class VppEtypeWhitelist(VppObject):
275 """VPP Etype Whitelist"""
277 def __init__(self, test, sw_if_index, whitelist, n_input=0):
279 self.whitelist = whitelist
280 self.n_input = n_input
281 self._sw_if_index = sw_if_index
284 def sw_if_index(self):
285 return self._sw_if_index
289 return len(self.whitelist)
291 def add_vpp_config(self):
292 self._test.vapi.acl_interface_set_etype_whitelist(
293 sw_if_index=self._sw_if_index,
295 n_input=self.n_input,
296 whitelist=self.whitelist,
298 self._test.registry.register(self, self._test.logger)
301 def remove_vpp_config(self):
302 self._test.vapi.acl_interface_set_etype_whitelist(
303 sw_if_index=self._sw_if_index, count=0, n_input=0, whitelist=[]
306 def query_vpp_config(self):
307 self._test.vapi.acl_interface_etype_whitelist_dump(
308 sw_if_index=self._sw_if_index
313 return "acl-etype_wl-%d" % (self._sw_if_index)
316 class VppAclInterface(VppObject):
317 """VPP ACL Interface"""
319 def __init__(self, test, sw_if_index, acls, n_input=0):
321 self._sw_if_index = sw_if_index
322 self.n_input = n_input
326 def sw_if_index(self):
327 return self._sw_if_index
331 return len(self.acls)
333 def encode_acls(self):
335 for acl in self.acls:
336 acls.append(acl.acl_index)
339 def add_vpp_config(self, expect_error=False):
341 reply = self._test.vapi.acl_interface_set_acl_list(
342 sw_if_index=self._sw_if_index,
343 n_input=self.n_input,
345 acls=self.encode_acls(),
347 self._test.registry.register(self, self._test.logger)
349 self._test.fail("Unexpected api reply")
351 except UnexpectedApiReturnValueError:
353 self._test.fail("Unexpected api reply")
356 def remove_vpp_config(self, expect_error=False):
358 reply = self._test.vapi.acl_interface_set_acl_list(
359 sw_if_index=self._sw_if_index, n_input=0, count=0, acls=[]
362 self._test.fail("Unexpected api reply")
363 except UnexpectedApiReturnValueError:
365 self._test.fail("Unexpected api reply")
367 def query_vpp_config(self):
368 dump = self._test.vapi.acl_interface_list_dump(sw_if_index=self._sw_if_index)
369 for acl_list in dump:
370 if acl_list.count > 0:
375 return "acl-if-list-%d" % (self._sw_if_index)
382 self, is_permit, src_mac=0, src_mac_mask=0, src_prefix=IPv4Network("0.0.0.0/0")
384 self.is_permit = is_permit
385 self.src_mac = src_mac
386 self.src_mac_mask = src_mac_mask
387 self.src_prefix = src_prefix
391 "is_permit": self.is_permit,
392 "src_mac": self.src_mac,
393 "src_mac_mask": self.src_mac_mask,
394 "src_prefix": self.src_prefix,
398 class VppMacipAcl(VppObject):
401 def __init__(self, test, rules, acl_index=INVALID_INDEX, tag=None):
403 self._acl_index = acl_index
409 return self._acl_index
417 return len(self._rules)
419 def encode_rules(self):
421 for rule in self._rules:
422 rules.append(rule.encode())
425 def add_vpp_config(self, expect_error=False):
427 reply = self._test.vapi.macip_acl_add_replace(
428 acl_index=self._acl_index,
431 r=self.encode_rules(),
433 self._acl_index = reply.acl_index
434 self._test.registry.register(self, self._test.logger)
436 self._test.fail("Unexpected api reply")
438 except UnexpectedApiReturnValueError:
440 self._test.fail("Unexpected api reply")
443 def modify_vpp_config(self, rules):
445 self.add_vpp_config()
447 def remove_vpp_config(self, expect_error=False):
449 self._test.vapi.macip_acl_del(acl_index=self._acl_index)
451 self._test.fail("Unexpected api reply")
452 except UnexpectedApiReturnValueError:
454 self._test.fail("Unexpected api reply")
457 return self._test.vapi.macip_acl_dump(acl_index=self._acl_index)
459 def query_vpp_config(self):
462 if rule.acl_index == self._acl_index:
467 return "macip-acl-%s-%d" % (self.tag, self._acl_index)
470 class VppMacipAclInterface(VppObject):
471 """VPP Mac Ip ACL Interface"""
473 def __init__(self, test, sw_if_index, acls):
475 self._sw_if_index = sw_if_index
479 def sw_if_index(self):
480 return self._sw_if_index
484 return len(self.acls)
486 def add_vpp_config(self):
487 for acl in self.acls:
488 self._test.vapi.macip_acl_interface_add_del(
489 is_add=True, sw_if_index=self._sw_if_index, acl_index=acl.acl_index
491 self._test.registry.register(self, self._test.logger)
493 def remove_vpp_config(self):
494 for acl in self.acls:
495 self._test.vapi.macip_acl_interface_add_del(
496 is_add=False, sw_if_index=self._sw_if_index, acl_index=acl.acl_index
500 return self._test.vapi.macip_acl_interface_list_dump(
501 sw_if_index=self._sw_if_index
504 def query_vpp_config(self):
506 for acl_list in dump:
507 for acl_index in acl_list.acls:
508 if acl_index != INVALID_INDEX:
513 return "macip-acl-if-list-%d" % (self._sw_if_index)