5 from socket import AF_INET, AF_INET6
7 from dataclasses import dataclass
9 from framework import VppTestCase
11 from scapy.packet import Raw
12 from scapy.layers.l2 import Ether
13 from scapy.layers.inet import IP, UDP, TCP
14 from scapy.layers.inet6 import IPv6
31 class TestClassifier(VppTestCase):
33 def _resolve_mask_match(mask_match):
34 mask_match = binascii.unhexlify(mask_match)
35 mask_match_len = ((len(mask_match) - 1) // 16 + 1) * 16
36 mask_match = mask_match + b"\0" * (mask_match_len - len(mask_match))
37 return mask_match, mask_match_len
42 Perform standard class setup (defined by class method setUpClass in
43 class VppTestCase) before running the test case, set test case related
44 variables and configure VPP.
46 super(TestClassifier, cls).setUpClass()
47 cls.acl_active_table = ""
52 Perform test setup before test case.
55 - create 4 pg interfaces
56 - untagged pg0/pg1/pg2 interface
57 pg0 -------> pg1 (IP ACL)
63 - put it into UP state
64 - set IPv4/6 addresses
65 - resolve neighbor address using ARP
67 :ivar list interfaces: pg interfaces.
68 :ivar list pg_if_packet_sizes: packet sizes in test.
69 :ivar dict acl_tbl_idx: ACL table index.
70 :ivar int pbr_vrfid: VRF id for PBR test.
72 self.reset_packet_infos()
73 super(TestClassifier, self).setUp()
74 if self.af is None: # l2_acl test case
77 # create 4 pg interfaces
78 self.create_pg_interfaces(range(4))
80 # packet sizes to test
81 self.pg_if_packet_sizes = [64, 9018]
83 self.interfaces = list(self.pg_interfaces)
89 # setup all interfaces
90 for intf in self.interfaces:
92 if self.af == AF_INET:
95 elif self.af == AF_INET6:
100 """Run standard test teardown and acl related log."""
101 if self.af is not None and not self.vpp_dead:
102 if self.af == AF_INET:
103 self.logger.info(self.vapi.ppcli("show inacl type ip4"))
104 self.logger.info(self.vapi.ppcli("show outacl type ip4"))
105 elif self.af == AF_INET6:
106 self.logger.info(self.vapi.ppcli("show inacl type ip6"))
107 self.logger.info(self.vapi.ppcli("show outacl type ip6"))
109 self.logger.info(self.vapi.cli("show classify table verbose"))
110 self.logger.info(self.vapi.cli("show ip fib"))
111 self.logger.info(self.vapi.cli("show error"))
113 if self.acl_active_table.endswith("out"):
114 self.output_acl_set_interface(
115 self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0
117 elif self.acl_active_table != "":
118 self.input_acl_set_interface(
119 self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0
121 self.acl_active_table = ""
123 for intf in self.interfaces:
124 if self.af == AF_INET:
126 elif self.af == AF_INET6:
130 super(TestClassifier, self).tearDown()
133 def build_mac_match(dst_mac="", src_mac="", ether_type=""):
134 """Build MAC ACL match data with hexstring format.
136 :param str dst_mac: source MAC address <x:x:x:x:x:x>
137 :param str src_mac: destination MAC address <x:x:x:x:x:x>
138 :param str ether_type: ethernet type <0-ffff>
141 dst_mac = dst_mac.replace(":", "")
143 src_mac = src_mac.replace(":", "")
146 "{!s:0>12}{!s:0>12}{!s:0>4}".format(dst_mac, src_mac, ether_type)
150 def build_mac_mask(dst_mac="", src_mac="", ether_type=""):
151 """Build MAC ACL mask data with hexstring format.
153 :param str dst_mac: source MAC address <0-ffffffffffff>
154 :param str src_mac: destination MAC address <0-ffffffffffff>
155 :param str ether_type: ethernet type <0-ffff>
159 "{!s:0>12}{!s:0>12}{!s:0>4}".format(dst_mac, src_mac, ether_type)
163 def build_ip_mask(proto="", src_ip="", dst_ip="", src_port="", dst_port=""):
164 """Build IP ACL mask data with hexstring format.
166 :param str proto: protocol number <0-ff>
167 :param str src_ip: source ip address <0-ffffffff>
168 :param str dst_ip: destination ip address <0-ffffffff>
169 :param str src_port: source port number <0-ffff>
170 :param str dst_port: destination port number <0-ffff>
174 "{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}".format(
175 proto, src_ip, dst_ip, src_port, dst_port
180 def build_ip6_mask(nh="", src_ip="", dst_ip="", src_port="", dst_port=""):
181 """Build IPv6 ACL mask data with hexstring format.
183 :param str nh: next header number <0-ff>
184 :param str src_ip: source ip address <0-ffffffff>
185 :param str dst_ip: destination ip address <0-ffffffff>
186 :param str src_port: source port number <0-ffff>
187 :param str dst_port: destination port number <0-ffff>
191 "{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}".format(
192 nh, src_ip, dst_ip, src_port, dst_port
197 def build_payload_mask(masks):
201 # offset is specified in bytes, convert to hex format.
202 length = (mask.offset * 2) + len(mask.spec)
203 format_spec = "{!s:0>" + str(length) + "}"
204 payload_mask += format_spec.format(mask.spec)
206 return payload_mask.rstrip("0")
209 def build_ip_match(proto=0, src_ip="", dst_ip="", src_port=0, dst_port=0):
210 """Build IP ACL match data with hexstring format.
212 :param int proto: protocol number with valid option "x"
213 :param str src_ip: source ip address with format of "x.x.x.x"
214 :param str dst_ip: destination ip address with format of "x.x.x.x"
215 :param int src_port: source port number "x"
216 :param int dst_port: destination port number "x"
219 src_ip = binascii.hexlify(socket.inet_aton(src_ip)).decode("ascii")
221 dst_ip = binascii.hexlify(socket.inet_aton(dst_ip)).decode("ascii")
224 "{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}".format(
225 hex(proto)[2:], src_ip, dst_ip, hex(src_port)[2:], hex(dst_port)[2:]
230 def build_ip6_match(nh=0, src_ip="", dst_ip="", src_port=0, dst_port=0):
231 """Build IPv6 ACL match data with hexstring format.
233 :param int nh: next header number with valid option "x"
234 :param str src_ip: source ip6 address with format of "xxx:xxxx::xxxx"
235 :param str dst_ip: destination ip6 address with format of
237 :param int src_port: source port number "x"
238 :param int dst_port: destination port number "x"
241 if sys.version_info[0] == 2:
242 src_ip = binascii.hexlify(socket.inet_pton(socket.AF_INET6, src_ip))
244 src_ip = socket.inet_pton(socket.AF_INET6, src_ip).hex()
247 if sys.version_info[0] == 2:
248 dst_ip = binascii.hexlify(socket.inet_pton(socket.AF_INET6, dst_ip))
250 dst_ip = socket.inet_pton(socket.AF_INET6, dst_ip).hex()
253 "{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}".format(
254 hex(nh)[2:], src_ip, dst_ip, hex(src_port)[2:], hex(dst_port)[2:]
259 def build_payload_match(matches):
262 for match in matches:
263 sval = str(hex(match.value)[2:])
264 # offset is specified in bytes, convert to hex format.
265 length = (match.offset + match.length) * 2
267 format_spec = "{!s:0>" + str(length) + "}"
268 payload_match += format_spec.format(sval)
270 return payload_match.rstrip("0")
277 proto_l=UDP(sport=1234, dport=5678),
280 """Create input packet stream for defined interfaces.
282 :param VppInterface src_if: Source Interface for packet stream.
283 :param VppInterface dst_if: Destination Interface for packet stream.
284 :param list packet_sizes: packet size to test.
285 :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
289 for size in packet_sizes:
290 info = self.create_packet_info(src_if, dst_if)
291 payload = self.info_to_payload(info)
293 # append any additional payload after info
294 if payload_ex is not None:
295 payload += payload_ex
297 if self.af == AF_INET:
299 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
300 / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
304 elif self.af == AF_INET6:
306 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
307 / IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)
312 self.extend_packet(p, size)
316 def verify_capture(self, dst_if, capture, proto_l=UDP):
317 """Verify captured input packet stream for defined interface.
319 :param VppInterface dst_if: Interface to verify captured packet stream.
320 :param list capture: Captured packet stream.
321 :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
324 if self.af == AF_INET6:
326 self.logger.info("Verifying capture on interface %s" % dst_if.name)
328 for i in self.interfaces:
329 last_info[i.sw_if_index] = None
330 dst_sw_if_index = dst_if.sw_if_index
331 for packet in capture:
333 ip_received = packet[ip_proto]
334 proto_received = packet[proto_l]
335 payload_info = self.payload_to_info(packet[Raw])
336 packet_index = payload_info.index
337 self.assertEqual(payload_info.dst, dst_sw_if_index)
339 "Got packet on port %s: src=%u (id=%u)"
340 % (dst_if.name, payload_info.src, packet_index)
342 next_info = self.get_next_packet_info_for_interface2(
343 payload_info.src, dst_sw_if_index, last_info[payload_info.src]
345 last_info[payload_info.src] = next_info
346 self.assertTrue(next_info is not None)
347 self.assertEqual(packet_index, next_info.index)
348 saved_packet = next_info.data
349 ip_saved = saved_packet[ip_proto]
350 proto_saved = saved_packet[proto_l]
351 # Check standard fields
352 self.assertEqual(ip_received.src, ip_saved.src)
353 self.assertEqual(ip_received.dst, ip_saved.dst)
354 self.assertEqual(proto_received.sport, proto_saved.sport)
355 self.assertEqual(proto_received.dport, proto_saved.dport)
356 except BaseException:
357 self.logger.error(ppp("Unexpected or invalid packet:", packet))
359 for i in self.interfaces:
360 remaining_packet = self.get_next_packet_info_for_interface2(
361 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index]
364 remaining_packet is None,
365 "Interface %s: Packet expected from interface %s "
366 "didn't arrive" % (dst_if.name, i.name),
369 def create_classify_table(self, key, mask, data_offset=0, next_table_index=None):
370 """Create Classify Table
372 :param str key: key for classify table (ex, ACL name).
373 :param str mask: mask value for interested traffic.
374 :param int data_offset:
375 :param str next_table_index
377 mask_match, mask_match_len = self._resolve_mask_match(mask)
378 r = self.vapi.classify_add_del_table(
381 mask_len=mask_match_len,
382 match_n_vectors=(len(mask) - 1) // 32 + 1,
385 current_data_offset=data_offset,
386 next_table_index=next_table_index,
388 self.assertIsNotNone(r, "No response msg for add_del_table")
389 self.acl_tbl_idx[key] = r.new_table_index
391 def create_classify_session(
392 self, table_index, match, pbr_option=0, vrfid=0, is_add=1
394 """Create Classify Session
396 :param int table_index: table index to identify classify table.
397 :param str match: matched value for interested traffic.
398 :param int pbr_option: enable/disable PBR feature.
399 :param int vrfid: VRF id.
400 :param int is_add: option to configure classify session.
401 - create(1) or delete(0)
403 mask_match, mask_match_len = self._resolve_mask_match(match)
404 r = self.vapi.classify_add_del_session(
406 table_index=table_index,
408 match_len=mask_match_len,
413 self.assertIsNotNone(r, "No response msg for add_del_session")
415 def input_acl_set_interface(self, intf, table_index, is_add=1):
416 """Configure Input ACL interface
418 :param VppInterface intf: Interface to apply Input ACL feature.
419 :param int table_index: table index to identify classify table.
420 :param int is_add: option to configure classify session.
421 - enable(1) or disable(0)
424 if self.af == AF_INET:
425 r = self.vapi.input_acl_set_interface(
426 is_add, intf.sw_if_index, ip4_table_index=table_index
428 elif self.af == AF_INET6:
429 r = self.vapi.input_acl_set_interface(
430 is_add, intf.sw_if_index, ip6_table_index=table_index
433 r = self.vapi.input_acl_set_interface(
434 is_add, intf.sw_if_index, l2_table_index=table_index
436 self.assertIsNotNone(r, "No response msg for acl_set_interface")
438 def output_acl_set_interface(self, intf, table_index, is_add=1):
439 """Configure Output ACL interface
441 :param VppInterface intf: Interface to apply Output ACL feature.
442 :param int table_index: table index to identify classify table.
443 :param int is_add: option to configure classify session.
444 - enable(1) or disable(0)
447 if self.af == AF_INET:
448 r = self.vapi.output_acl_set_interface(
449 is_add, intf.sw_if_index, ip4_table_index=table_index
451 elif self.af == AF_INET6:
452 r = self.vapi.output_acl_set_interface(
453 is_add, intf.sw_if_index, ip6_table_index=table_index
456 r = self.vapi.output_acl_set_interface(
457 is_add, intf.sw_if_index, l2_table_index=table_index
459 self.assertIsNotNone(r, "No response msg for acl_set_interface")
461 def config_pbr_fib_entry(self, intf, is_add=1):
462 """Configure fib entry to route traffic toward PBR VRF table
464 :param VppInterface intf: destination interface to be routed for PBR.
468 self.vapi.ip_add_del_route(
469 dst_address=intf.local_ip4,
470 dst_address_length=addr_len,
471 next_hop_address=intf.remote_ip4,
472 table_id=self.pbr_vrfid,
476 def verify_vrf(self, vrf_id):
478 Check if the FIB table / VRF ID is configured.
480 :param int vrf_id: The FIB table / VRF ID to be verified.
481 :return: 1 if the FIB table / VRF ID is configured, otherwise return 0.
483 ip_fib_dump = self.vapi.ip_route_dump(vrf_id, False)
484 vrf_count = len(ip_fib_dump)
486 self.logger.info("IPv4 VRF ID %d is not configured" % vrf_id)
489 self.logger.info("IPv4 VRF ID %d is configured" % vrf_id)