5 from socket import AF_INET, AF_INET6
8 from dataclasses import dataclass
10 from framework import VppTestCase
12 from scapy.packet import Raw
13 from scapy.layers.l2 import Ether
14 from scapy.layers.inet import IP, UDP, TCP
15 from scapy.layers.inet6 import IPv6
32 class TestClassifier(VppTestCase):
34 def _resolve_mask_match(mask_match):
35 mask_match = binascii.unhexlify(mask_match)
36 mask_match_len = ((len(mask_match) - 1) // 16 + 1) * 16
37 mask_match = mask_match + b"\0" * (mask_match_len - len(mask_match))
38 return mask_match, mask_match_len
43 Perform standard class setup (defined by class method setUpClass in
44 class VppTestCase) before running the test case, set test case related
45 variables and configure VPP.
47 super(TestClassifier, cls).setUpClass()
48 cls.acl_active_table = ""
53 Perform test setup before test case.
56 - create 4 pg interfaces
57 - untagged pg0/pg1/pg2 interface
58 pg0 -------> pg1 (IP ACL)
64 - put it into UP state
65 - set IPv4/6 addresses
66 - resolve neighbor address using ARP
68 :ivar list interfaces: pg interfaces.
69 :ivar list pg_if_packet_sizes: packet sizes in test.
70 :ivar dict acl_tbl_idx: ACL table index.
71 :ivar int pbr_vrfid: VRF id for PBR test.
73 self.reset_packet_infos()
74 super(TestClassifier, self).setUp()
75 if self.af is None: # l2_acl test case
78 # create 4 pg interfaces
79 self.create_pg_interfaces(range(4))
81 # packet sizes to test
82 self.pg_if_packet_sizes = [64, 9018]
84 self.interfaces = list(self.pg_interfaces)
90 # setup all interfaces
91 for intf in self.interfaces:
93 if self.af == AF_INET:
96 elif self.af == AF_INET6:
101 """Run standard test teardown and acl related log."""
102 if self.af is not None and not self.vpp_dead:
103 if self.af == AF_INET:
104 self.logger.info(self.vapi.ppcli("show inacl type ip4"))
105 self.logger.info(self.vapi.ppcli("show outacl type ip4"))
106 elif self.af == AF_INET6:
107 self.logger.info(self.vapi.ppcli("show inacl type ip6"))
108 self.logger.info(self.vapi.ppcli("show outacl type ip6"))
110 self.logger.info(self.vapi.cli("show classify table verbose"))
111 self.logger.info(self.vapi.cli("show ip fib"))
112 self.logger.info(self.vapi.cli("show error"))
114 if self.acl_active_table.endswith("out"):
115 self.output_acl_set_interface(
116 self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0
118 elif self.acl_active_table != "":
119 self.input_acl_set_interface(
120 self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0
122 self.acl_active_table = ""
124 for intf in self.interfaces:
125 if self.af == AF_INET:
127 elif self.af == AF_INET6:
131 super(TestClassifier, self).tearDown()
134 def build_mac_match(dst_mac="", src_mac="", ether_type=""):
135 """Build MAC ACL match data with hexstring format.
137 :param str dst_mac: source MAC address <x:x:x:x:x:x>
138 :param str src_mac: destination MAC address <x:x:x:x:x:x>
139 :param str ether_type: ethernet type <0-ffff>
142 dst_mac = dst_mac.replace(":", "")
144 src_mac = src_mac.replace(":", "")
147 "{!s:0>12}{!s:0>12}{!s:0>4}".format(dst_mac, src_mac, ether_type)
151 def build_mac_mask(dst_mac="", src_mac="", ether_type=""):
152 """Build MAC ACL mask data with hexstring format.
154 :param str dst_mac: source MAC address <0-ffffffffffff>
155 :param str src_mac: destination MAC address <0-ffffffffffff>
156 :param str ether_type: ethernet type <0-ffff>
160 "{!s:0>12}{!s:0>12}{!s:0>4}".format(dst_mac, src_mac, ether_type)
164 def build_ip_mask(proto="", src_ip="", dst_ip="", src_port="", dst_port=""):
165 """Build IP ACL mask data with hexstring format.
167 :param str proto: protocol number <0-ff>
168 :param str src_ip: source ip address <0-ffffffff>
169 :param str dst_ip: destination ip address <0-ffffffff>
170 :param str src_port: source port number <0-ffff>
171 :param str dst_port: destination port number <0-ffff>
175 "{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}".format(
176 proto, src_ip, dst_ip, src_port, dst_port
181 def build_ip6_mask(nh="", src_ip="", dst_ip="", src_port="", dst_port=""):
182 """Build IPv6 ACL mask data with hexstring format.
184 :param str nh: next header number <0-ff>
185 :param str src_ip: source ip address <0-ffffffff>
186 :param str dst_ip: destination ip address <0-ffffffff>
187 :param str src_port: source port number <0-ffff>
188 :param str dst_port: destination port number <0-ffff>
192 "{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}".format(
193 nh, src_ip, dst_ip, src_port, dst_port
198 def build_payload_mask(masks):
202 # offset is specified in bytes, convert to hex format.
203 length = (mask.offset * 2) + len(mask.spec)
204 format_spec = "{!s:0>" + str(length) + "}"
205 payload_mask += format_spec.format(mask.spec)
207 return payload_mask.rstrip("0")
210 def build_ip_match(proto=0, src_ip="", dst_ip="", src_port=0, dst_port=0):
211 """Build IP ACL match data with hexstring format.
213 :param int proto: protocol number with valid option "x"
214 :param str src_ip: source ip address with format of "x.x.x.x"
215 :param str dst_ip: destination ip address with format of "x.x.x.x"
216 :param int src_port: source port number "x"
217 :param int dst_port: destination port number "x"
220 src_ip = binascii.hexlify(socket.inet_aton(src_ip)).decode("ascii")
222 dst_ip = binascii.hexlify(socket.inet_aton(dst_ip)).decode("ascii")
225 "{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}".format(
226 hex(proto)[2:], src_ip, dst_ip, hex(src_port)[2:], hex(dst_port)[2:]
231 def build_ip6_match(nh=0, src_ip="", dst_ip="", src_port=0, dst_port=0):
232 """Build IPv6 ACL match data with hexstring format.
234 :param int nh: next header number with valid option "x"
235 :param str src_ip: source ip6 address with format of "xxx:xxxx::xxxx"
236 :param str dst_ip: destination ip6 address with format of
238 :param int src_port: source port number "x"
239 :param int dst_port: destination port number "x"
242 if sys.version_info[0] == 2:
243 src_ip = binascii.hexlify(socket.inet_pton(socket.AF_INET6, src_ip))
245 src_ip = socket.inet_pton(socket.AF_INET6, src_ip).hex()
248 if sys.version_info[0] == 2:
249 dst_ip = binascii.hexlify(socket.inet_pton(socket.AF_INET6, dst_ip))
251 dst_ip = socket.inet_pton(socket.AF_INET6, dst_ip).hex()
254 "{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}".format(
255 hex(nh)[2:], src_ip, dst_ip, hex(src_port)[2:], hex(dst_port)[2:]
260 def build_payload_match(matches):
263 for match in matches:
264 sval = str(hex(match.value)[2:])
265 # offset is specified in bytes, convert to hex format.
266 length = (match.offset + match.length) * 2
268 format_spec = "{!s:0>" + str(length) + "}"
269 payload_match += format_spec.format(sval)
271 return payload_match.rstrip("0")
278 proto_l=UDP(sport=1234, dport=5678),
281 """Create input packet stream for defined interfaces.
283 :param VppInterface src_if: Source Interface for packet stream.
284 :param VppInterface dst_if: Destination Interface for packet stream.
285 :param list packet_sizes: packet size to test.
286 :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
290 for size in packet_sizes:
291 info = self.create_packet_info(src_if, dst_if)
292 payload = self.info_to_payload(info)
294 # append any additional payload after info
295 if payload_ex is not None:
296 payload += payload_ex
298 if self.af == AF_INET:
300 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
301 / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
305 elif self.af == AF_INET6:
307 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
308 / IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)
313 self.extend_packet(p, size)
317 def verify_capture(self, dst_if, capture, proto_l=UDP):
318 """Verify captured input packet stream for defined interface.
320 :param VppInterface dst_if: Interface to verify captured packet stream.
321 :param list capture: Captured packet stream.
322 :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
325 if self.af == AF_INET6:
327 self.logger.info("Verifying capture on interface %s" % dst_if.name)
329 for i in self.interfaces:
330 last_info[i.sw_if_index] = None
331 dst_sw_if_index = dst_if.sw_if_index
332 for packet in capture:
334 ip_received = packet[ip_proto]
335 proto_received = packet[proto_l]
336 payload_info = self.payload_to_info(packet[Raw])
337 packet_index = payload_info.index
338 self.assertEqual(payload_info.dst, dst_sw_if_index)
340 "Got packet on port %s: src=%u (id=%u)"
341 % (dst_if.name, payload_info.src, packet_index)
343 next_info = self.get_next_packet_info_for_interface2(
344 payload_info.src, dst_sw_if_index, last_info[payload_info.src]
346 last_info[payload_info.src] = next_info
347 self.assertTrue(next_info is not None)
348 self.assertEqual(packet_index, next_info.index)
349 saved_packet = next_info.data
350 ip_saved = saved_packet[ip_proto]
351 proto_saved = saved_packet[proto_l]
352 # Check standard fields
353 self.assertEqual(ip_received.src, ip_saved.src)
354 self.assertEqual(ip_received.dst, ip_saved.dst)
355 self.assertEqual(proto_received.sport, proto_saved.sport)
356 self.assertEqual(proto_received.dport, proto_saved.dport)
357 except BaseException:
358 self.logger.error(ppp("Unexpected or invalid packet:", packet))
360 for i in self.interfaces:
361 remaining_packet = self.get_next_packet_info_for_interface2(
362 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index]
365 remaining_packet is None,
366 "Interface %s: Packet expected from interface %s "
367 "didn't arrive" % (dst_if.name, i.name),
370 def create_classify_table(self, key, mask, data_offset=0, next_table_index=None):
371 """Create Classify Table
373 :param str key: key for classify table (ex, ACL name).
374 :param str mask: mask value for interested traffic.
375 :param int data_offset:
376 :param str next_table_index
378 mask_match, mask_match_len = self._resolve_mask_match(mask)
379 r = self.vapi.classify_add_del_table(
382 mask_len=mask_match_len,
383 match_n_vectors=(len(mask) - 1) // 32 + 1,
386 current_data_offset=data_offset,
387 next_table_index=next_table_index,
389 self.assertIsNotNone(r, "No response msg for add_del_table")
390 self.acl_tbl_idx[key] = r.new_table_index
392 def create_classify_session(
393 self, table_index, match, pbr_option=0, vrfid=0, is_add=1
395 """Create Classify Session
397 :param int table_index: table index to identify classify table.
398 :param str match: matched value for interested traffic.
399 :param int pbr_option: enable/disable PBR feature.
400 :param int vrfid: VRF id.
401 :param int is_add: option to configure classify session.
402 - create(1) or delete(0)
404 mask_match, mask_match_len = self._resolve_mask_match(match)
405 r = self.vapi.classify_add_del_session(
407 table_index=table_index,
409 match_len=mask_match_len,
414 self.assertIsNotNone(r, "No response msg for add_del_session")
416 def input_acl_set_interface(self, intf, table_index, is_add=1):
417 """Configure Input ACL interface
419 :param VppInterface intf: Interface to apply Input ACL feature.
420 :param int table_index: table index to identify classify table.
421 :param int is_add: option to configure classify session.
422 - enable(1) or disable(0)
425 if self.af == AF_INET:
426 r = self.vapi.input_acl_set_interface(
427 is_add, intf.sw_if_index, ip4_table_index=table_index
429 elif self.af == AF_INET6:
430 r = self.vapi.input_acl_set_interface(
431 is_add, intf.sw_if_index, ip6_table_index=table_index
434 r = self.vapi.input_acl_set_interface(
435 is_add, intf.sw_if_index, l2_table_index=table_index
437 self.assertIsNotNone(r, "No response msg for acl_set_interface")
439 def output_acl_set_interface(self, intf, table_index, is_add=1):
440 """Configure Output ACL interface
442 :param VppInterface intf: Interface to apply Output ACL feature.
443 :param int table_index: table index to identify classify table.
444 :param int is_add: option to configure classify session.
445 - enable(1) or disable(0)
448 if self.af == AF_INET:
449 r = self.vapi.output_acl_set_interface(
450 is_add, intf.sw_if_index, ip4_table_index=table_index
452 elif self.af == AF_INET6:
453 r = self.vapi.output_acl_set_interface(
454 is_add, intf.sw_if_index, ip6_table_index=table_index
457 r = self.vapi.output_acl_set_interface(
458 is_add, intf.sw_if_index, l2_table_index=table_index
460 self.assertIsNotNone(r, "No response msg for acl_set_interface")
462 def config_pbr_fib_entry(self, intf, is_add=1):
463 """Configure fib entry to route traffic toward PBR VRF table
465 :param VppInterface intf: destination interface to be routed for PBR.
469 self.vapi.ip_add_del_route(
470 dst_address=intf.local_ip4,
471 dst_address_length=addr_len,
472 next_hop_address=intf.remote_ip4,
473 table_id=self.pbr_vrfid,
477 def verify_vrf(self, vrf_id):
479 Check if the FIB table / VRF ID is configured.
481 :param int vrf_id: The FIB table / VRF ID to be verified.
482 :return: 1 if the FIB table / VRF ID is configured, otherwise return 0.
484 ip_fib_dump = self.vapi.ip_route_dump(vrf_id, False)
485 vrf_count = len(ip_fib_dump)
487 self.logger.info("IPv4 VRF ID %d is not configured" % vrf_id)
490 self.logger.info("IPv4 VRF ID %d is configured" % vrf_id)