5 from socket import AF_INET, AF_INET6
8 from dataclasses import dataclass
10 from framework import VppTestCase
12 from scapy.packet import Raw
13 from scapy.layers.l2 import Ether
14 from scapy.layers.inet import IP, UDP, TCP
15 from scapy.layers.inet6 import IPv6
32 class TestClassifier(VppTestCase):
35 def _resolve_mask_match(mask_match):
36 mask_match = binascii.unhexlify(mask_match)
37 mask_match_len = ((len(mask_match) - 1) // 16 + 1) * 16
38 mask_match = mask_match + b'\0' * \
39 (mask_match_len - len(mask_match))
40 return mask_match, mask_match_len
45 Perform standard class setup (defined by class method setUpClass in
46 class VppTestCase) before running the test case, set test case related
47 variables and configure VPP.
49 super(TestClassifier, cls).setUpClass()
50 cls.acl_active_table = ''
55 Perform test setup before test case.
58 - create 4 pg interfaces
59 - untagged pg0/pg1/pg2 interface
60 pg0 -------> pg1 (IP ACL)
66 - put it into UP state
67 - set IPv4/6 addresses
68 - resolve neighbor address using ARP
70 :ivar list interfaces: pg interfaces.
71 :ivar list pg_if_packet_sizes: packet sizes in test.
72 :ivar dict acl_tbl_idx: ACL table index.
73 :ivar int pbr_vrfid: VRF id for PBR test.
75 self.reset_packet_infos()
76 super(TestClassifier, self).setUp()
77 if self.af is None: # l2_acl test case
80 # create 4 pg interfaces
81 self.create_pg_interfaces(range(4))
83 # packet sizes to test
84 self.pg_if_packet_sizes = [64, 9018]
86 self.interfaces = list(self.pg_interfaces)
92 # setup all interfaces
93 for intf in self.interfaces:
95 if self.af == AF_INET:
98 elif self.af == AF_INET6:
103 """Run standard test teardown and acl related log."""
104 if self.af is not None and not self.vpp_dead:
105 if self.af == AF_INET:
106 self.logger.info(self.vapi.ppcli("show inacl type ip4"))
107 self.logger.info(self.vapi.ppcli("show outacl type ip4"))
108 elif self.af == AF_INET6:
109 self.logger.info(self.vapi.ppcli("show inacl type ip6"))
110 self.logger.info(self.vapi.ppcli("show outacl type ip6"))
112 self.logger.info(self.vapi.cli("show classify table verbose"))
113 self.logger.info(self.vapi.cli("show ip fib"))
114 self.logger.info(self.vapi.cli("show error"))
116 if self.acl_active_table.endswith('out'):
117 self.output_acl_set_interface(
118 self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
119 elif self.acl_active_table != '':
120 self.input_acl_set_interface(
121 self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
122 self.acl_active_table = ''
124 for intf in self.interfaces:
125 if self.af == AF_INET:
127 elif self.af == AF_INET6:
131 super(TestClassifier, self).tearDown()
134 def build_mac_match(dst_mac='', src_mac='', ether_type=''):
135 """Build MAC ACL match data with hexstring format.
137 :param str dst_mac: source MAC address <x:x:x:x:x:x>
138 :param str src_mac: destination MAC address <x:x:x:x:x:x>
139 :param str ether_type: ethernet type <0-ffff>
142 dst_mac = dst_mac.replace(':', '')
144 src_mac = src_mac.replace(':', '')
146 return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
147 dst_mac, src_mac, ether_type)).rstrip()
150 def build_mac_mask(dst_mac='', src_mac='', ether_type=''):
151 """Build MAC ACL mask data with hexstring format.
153 :param str dst_mac: source MAC address <0-ffffffffffff>
154 :param str src_mac: destination MAC address <0-ffffffffffff>
155 :param str ether_type: ethernet type <0-ffff>
158 return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
159 dst_mac, src_mac, ether_type)).rstrip()
162 def build_ip_mask(proto='', src_ip='', dst_ip='',
163 src_port='', dst_port=''):
164 """Build IP ACL mask data with hexstring format.
166 :param str proto: protocol number <0-ff>
167 :param str src_ip: source ip address <0-ffffffff>
168 :param str dst_ip: destination ip address <0-ffffffff>
169 :param str src_port: source port number <0-ffff>
170 :param str dst_port: destination port number <0-ffff>
173 return ('{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}'.format(
174 proto, src_ip, dst_ip, src_port, dst_port)).rstrip('0')
177 def build_ip6_mask(nh='', src_ip='', dst_ip='',
178 src_port='', dst_port=''):
179 """Build IPv6 ACL mask data with hexstring format.
181 :param str nh: next header number <0-ff>
182 :param str src_ip: source ip address <0-ffffffff>
183 :param str dst_ip: destination ip address <0-ffffffff>
184 :param str src_port: source port number <0-ffff>
185 :param str dst_port: destination port number <0-ffff>
188 return ('{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}'.format(
189 nh, src_ip, dst_ip, src_port, dst_port)).rstrip('0')
192 def build_payload_mask(masks):
196 # offset is specified in bytes, convert to hex format.
197 length = (mask.offset * 2) + len(mask.spec)
198 format_spec = '{!s:0>' + str(length) + '}'
199 payload_mask += format_spec.format(mask.spec)
201 return payload_mask.rstrip('0')
204 def build_ip_match(proto=0, src_ip='', dst_ip='',
205 src_port=0, dst_port=0):
206 """Build IP ACL match data with hexstring format.
208 :param int proto: protocol number with valid option "x"
209 :param str src_ip: source ip address with format of "x.x.x.x"
210 :param str dst_ip: destination ip address with format of "x.x.x.x"
211 :param int src_port: source port number "x"
212 :param int dst_port: destination port number "x"
215 src_ip = binascii.hexlify(socket.inet_aton(src_ip)).decode('ascii')
217 dst_ip = binascii.hexlify(socket.inet_aton(dst_ip)).decode('ascii')
219 return ('{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}'.format(
220 hex(proto)[2:], src_ip, dst_ip, hex(src_port)[2:],
221 hex(dst_port)[2:])).rstrip('0')
224 def build_ip6_match(nh=0, src_ip='', dst_ip='',
225 src_port=0, dst_port=0):
226 """Build IPv6 ACL match data with hexstring format.
228 :param int nh: next header number with valid option "x"
229 :param str src_ip: source ip6 address with format of "xxx:xxxx::xxxx"
230 :param str dst_ip: destination ip6 address with format of
232 :param int src_port: source port number "x"
233 :param int dst_port: destination port number "x"
236 if sys.version_info[0] == 2:
237 src_ip = binascii.hexlify(socket.inet_pton(
238 socket.AF_INET6, src_ip))
240 src_ip = socket.inet_pton(socket.AF_INET6, src_ip).hex()
243 if sys.version_info[0] == 2:
244 dst_ip = binascii.hexlify(socket.inet_pton(
245 socket.AF_INET6, dst_ip))
247 dst_ip = socket.inet_pton(socket.AF_INET6, dst_ip).hex()
249 return ('{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}'.format(
250 hex(nh)[2:], src_ip, dst_ip, hex(src_port)[2:],
251 hex(dst_port)[2:])).rstrip('0')
254 def build_payload_match(matches):
257 for match in matches:
258 sval = str(hex(match.value)[2:])
259 # offset is specified in bytes, convert to hex format.
260 length = (match.offset + match.length) * 2
262 format_spec = '{!s:0>' + str(length) + '}'
263 payload_match += format_spec.format(sval)
265 return payload_match.rstrip('0')
267 def create_stream(self, src_if, dst_if, packet_sizes,
268 proto_l=UDP(sport=1234, dport=5678),
270 """Create input packet stream for defined interfaces.
272 :param VppInterface src_if: Source Interface for packet stream.
273 :param VppInterface dst_if: Destination Interface for packet stream.
274 :param list packet_sizes: packet size to test.
275 :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
279 for size in packet_sizes:
280 info = self.create_packet_info(src_if, dst_if)
281 payload = self.info_to_payload(info)
283 # append any additional payload after info
284 if payload_ex is not None:
285 payload += payload_ex
287 if self.af == AF_INET:
288 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
289 IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) /
292 elif self.af == AF_INET6:
293 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
294 IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6) /
298 self.extend_packet(p, size)
302 def verify_capture(self, dst_if, capture, proto_l=UDP):
303 """Verify captured input packet stream for defined interface.
305 :param VppInterface dst_if: Interface to verify captured packet stream.
306 :param list capture: Captured packet stream.
307 :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
310 if self.af == AF_INET6:
312 self.logger.info("Verifying capture on interface %s" % dst_if.name)
314 for i in self.interfaces:
315 last_info[i.sw_if_index] = None
316 dst_sw_if_index = dst_if.sw_if_index
317 for packet in capture:
319 ip_received = packet[ip_proto]
320 proto_received = packet[proto_l]
321 payload_info = self.payload_to_info(packet[Raw])
322 packet_index = payload_info.index
323 self.assertEqual(payload_info.dst, dst_sw_if_index)
325 "Got packet on port %s: src=%u (id=%u)" %
326 (dst_if.name, payload_info.src, packet_index))
327 next_info = self.get_next_packet_info_for_interface2(
328 payload_info.src, dst_sw_if_index,
329 last_info[payload_info.src])
330 last_info[payload_info.src] = next_info
331 self.assertTrue(next_info is not None)
332 self.assertEqual(packet_index, next_info.index)
333 saved_packet = next_info.data
334 ip_saved = saved_packet[ip_proto]
335 proto_saved = saved_packet[proto_l]
336 # Check standard fields
337 self.assertEqual(ip_received.src, ip_saved.src)
338 self.assertEqual(ip_received.dst, ip_saved.dst)
339 self.assertEqual(proto_received.sport, proto_saved.sport)
340 self.assertEqual(proto_received.dport, proto_saved.dport)
341 except BaseException:
342 self.logger.error(ppp("Unexpected or invalid packet:", packet))
344 for i in self.interfaces:
345 remaining_packet = self.get_next_packet_info_for_interface2(
346 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
347 self.assertTrue(remaining_packet is None,
348 "Interface %s: Packet expected from interface %s "
349 "didn't arrive" % (dst_if.name, i.name))
351 def create_classify_table(self, key, mask, data_offset=0,
352 next_table_index=None):
353 """Create Classify Table
355 :param str key: key for classify table (ex, ACL name).
356 :param str mask: mask value for interested traffic.
357 :param int data_offset:
358 :param str next_table_index
360 mask_match, mask_match_len = self._resolve_mask_match(mask)
361 r = self.vapi.classify_add_del_table(
364 mask_len=mask_match_len,
365 match_n_vectors=(len(mask) - 1) // 32 + 1,
368 current_data_offset=data_offset,
369 next_table_index=next_table_index)
370 self.assertIsNotNone(r, 'No response msg for add_del_table')
371 self.acl_tbl_idx[key] = r.new_table_index
373 def create_classify_session(self, table_index, match, pbr_option=0,
375 """Create Classify Session
377 :param int table_index: table index to identify classify table.
378 :param str match: matched value for interested traffic.
379 :param int pbr_option: enable/disable PBR feature.
380 :param int vrfid: VRF id.
381 :param int is_add: option to configure classify session.
382 - create(1) or delete(0)
384 mask_match, mask_match_len = self._resolve_mask_match(match)
385 r = self.vapi.classify_add_del_session(
387 table_index=table_index,
389 match_len=mask_match_len,
393 self.assertIsNotNone(r, 'No response msg for add_del_session')
395 def input_acl_set_interface(self, intf, table_index, is_add=1):
396 """Configure Input ACL interface
398 :param VppInterface intf: Interface to apply Input ACL feature.
399 :param int table_index: table index to identify classify table.
400 :param int is_add: option to configure classify session.
401 - enable(1) or disable(0)
404 if self.af == AF_INET:
405 r = self.vapi.input_acl_set_interface(
408 ip4_table_index=table_index)
409 elif self.af == AF_INET6:
410 r = self.vapi.input_acl_set_interface(
413 ip6_table_index=table_index)
415 r = self.vapi.input_acl_set_interface(
418 l2_table_index=table_index)
419 self.assertIsNotNone(r, 'No response msg for acl_set_interface')
421 def output_acl_set_interface(self, intf, table_index, is_add=1):
422 """Configure Output ACL interface
424 :param VppInterface intf: Interface to apply Output ACL feature.
425 :param int table_index: table index to identify classify table.
426 :param int is_add: option to configure classify session.
427 - enable(1) or disable(0)
430 if self.af == AF_INET:
431 r = self.vapi.output_acl_set_interface(
434 ip4_table_index=table_index)
435 elif self.af == AF_INET6:
436 r = self.vapi.output_acl_set_interface(
439 ip6_table_index=table_index)
441 r = self.vapi.output_acl_set_interface(
444 l2_table_index=table_index)
445 self.assertIsNotNone(r, 'No response msg for acl_set_interface')
447 def config_pbr_fib_entry(self, intf, is_add=1):
448 """Configure fib entry to route traffic toward PBR VRF table
450 :param VppInterface intf: destination interface to be routed for PBR.
454 self.vapi.ip_add_del_route(dst_address=intf.local_ip4,
455 dst_address_length=addr_len,
456 next_hop_address=intf.remote_ip4,
457 table_id=self.pbr_vrfid, is_add=is_add)
459 def verify_vrf(self, vrf_id):
461 Check if the FIB table / VRF ID is configured.
463 :param int vrf_id: The FIB table / VRF ID to be verified.
464 :return: 1 if the FIB table / VRF ID is configured, otherwise return 0.
466 ip_fib_dump = self.vapi.ip_route_dump(vrf_id, False)
467 vrf_count = len(ip_fib_dump)
469 self.logger.info("IPv4 VRF ID %d is not configured" % vrf_id)
472 self.logger.info("IPv4 VRF ID %d is configured" % vrf_id)