5 from socket import AF_INET, AF_INET6
8 from dataclasses import dataclass
10 from framework import VppTestCase
12 from scapy.packet import Raw
13 from scapy.layers.l2 import Ether
14 from scapy.layers.inet import IP, UDP, TCP
15 from scapy.layers.inet6 import IPv6
32 class TestClassifier(VppTestCase):
35 def _resolve_mask_match(mask_match):
36 mask_match = binascii.unhexlify(mask_match)
37 mask_match_len = ((len(mask_match) - 1) // 16 + 1) * 16
38 mask_match = mask_match + b'\0' * \
39 (mask_match_len - len(mask_match))
40 return mask_match, mask_match_len
45 Perform standard class setup (defined by class method setUpClass in
46 class VppTestCase) before running the test case, set test case related
47 variables and configure VPP.
49 super(TestClassifier, cls).setUpClass()
50 cls.acl_active_table = ''
55 Perform test setup before test case.
58 - create 4 pg interfaces
59 - untagged pg0/pg1/pg2 interface
60 pg0 -------> pg1 (IP ACL)
66 - put it into UP state
67 - set IPv4/6 addresses
68 - resolve neighbor address using ARP
70 :ivar list interfaces: pg interfaces.
71 :ivar list pg_if_packet_sizes: packet sizes in test.
72 :ivar dict acl_tbl_idx: ACL table index.
73 :ivar int pbr_vrfid: VRF id for PBR test.
75 self.reset_packet_infos()
76 super(TestClassifier, self).setUp()
77 if self.af is None: # l2_acl test case
80 # create 4 pg interfaces
81 self.create_pg_interfaces(range(4))
83 # packet sizes to test
84 self.pg_if_packet_sizes = [64, 9018]
86 self.interfaces = list(self.pg_interfaces)
92 # setup all interfaces
93 for intf in self.interfaces:
95 if self.af == AF_INET:
98 elif self.af == AF_INET6:
103 """Run standard test teardown and acl related log."""
104 if self.af is not None and not self.vpp_dead:
105 if self.af == AF_INET:
106 self.logger.info(self.vapi.ppcli("show inacl type ip4"))
107 self.logger.info(self.vapi.ppcli("show outacl type ip4"))
108 elif self.af == AF_INET6:
109 self.logger.info(self.vapi.ppcli("show inacl type ip6"))
110 self.logger.info(self.vapi.ppcli("show outacl type ip6"))
112 self.logger.info(self.vapi.cli("show classify table verbose"))
113 self.logger.info(self.vapi.cli("show ip fib"))
114 self.logger.info(self.vapi.cli("show error"))
116 if self.acl_active_table.endswith('out'):
117 self.output_acl_set_interface(
118 self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
119 self.acl_active_table = ''
120 elif self.acl_active_table != '':
121 self.input_acl_set_interface(
122 self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
123 self.acl_active_table = ''
125 for intf in self.interfaces:
126 if self.af == AF_INET:
128 elif self.af == AF_INET6:
132 super(TestClassifier, self).tearDown()
135 def build_mac_match(dst_mac='', src_mac='', ether_type=''):
136 """Build MAC ACL match data with hexstring format.
138 :param str dst_mac: source MAC address <x:x:x:x:x:x>
139 :param str src_mac: destination MAC address <x:x:x:x:x:x>
140 :param str ether_type: ethernet type <0-ffff>
143 dst_mac = dst_mac.replace(':', '')
145 src_mac = src_mac.replace(':', '')
147 return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
148 dst_mac, src_mac, ether_type)).rstrip()
151 def build_mac_mask(dst_mac='', src_mac='', ether_type=''):
152 """Build MAC ACL mask data with hexstring format.
154 :param str dst_mac: source MAC address <0-ffffffffffff>
155 :param str src_mac: destination MAC address <0-ffffffffffff>
156 :param str ether_type: ethernet type <0-ffff>
159 return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
160 dst_mac, src_mac, ether_type)).rstrip()
163 def build_ip_mask(proto='', src_ip='', dst_ip='',
164 src_port='', dst_port=''):
165 """Build IP ACL mask data with hexstring format.
167 :param str proto: protocol number <0-ff>
168 :param str src_ip: source ip address <0-ffffffff>
169 :param str dst_ip: destination ip address <0-ffffffff>
170 :param str src_port: source port number <0-ffff>
171 :param str dst_port: destination port number <0-ffff>
174 return ('{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}'.format(
175 proto, src_ip, dst_ip, src_port, dst_port)).rstrip('0')
178 def build_ip6_mask(nh='', src_ip='', dst_ip='',
179 src_port='', dst_port=''):
180 """Build IPv6 ACL mask data with hexstring format.
182 :param str nh: next header number <0-ff>
183 :param str src_ip: source ip address <0-ffffffff>
184 :param str dst_ip: destination ip address <0-ffffffff>
185 :param str src_port: source port number <0-ffff>
186 :param str dst_port: destination port number <0-ffff>
189 return ('{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}'.format(
190 nh, src_ip, dst_ip, src_port, dst_port)).rstrip('0')
193 def build_payload_mask(masks):
197 # offset is specified in bytes, convert to hex format.
198 length = (mask.offset * 2) + len(mask.spec)
199 format_spec = '{!s:0>' + str(length) + '}'
200 payload_mask += format_spec.format(mask.spec)
202 return payload_mask.rstrip('0')
205 def build_ip_match(proto=0, src_ip='', dst_ip='',
206 src_port=0, dst_port=0):
207 """Build IP ACL match data with hexstring format.
209 :param int proto: protocol number with valid option "x"
210 :param str src_ip: source ip address with format of "x.x.x.x"
211 :param str dst_ip: destination ip address with format of "x.x.x.x"
212 :param int src_port: source port number "x"
213 :param int dst_port: destination port number "x"
216 src_ip = binascii.hexlify(socket.inet_aton(src_ip)).decode('ascii')
218 dst_ip = binascii.hexlify(socket.inet_aton(dst_ip)).decode('ascii')
220 return ('{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}'.format(
221 hex(proto)[2:], src_ip, dst_ip, hex(src_port)[2:],
222 hex(dst_port)[2:])).rstrip('0')
225 def build_ip6_match(nh=0, src_ip='', dst_ip='',
226 src_port=0, dst_port=0):
227 """Build IPv6 ACL match data with hexstring format.
229 :param int nh: next header number with valid option "x"
230 :param str src_ip: source ip6 address with format of "xxx:xxxx::xxxx"
231 :param str dst_ip: destination ip6 address with format of
233 :param int src_port: source port number "x"
234 :param int dst_port: destination port number "x"
237 if sys.version_info[0] == 2:
238 src_ip = binascii.hexlify(socket.inet_pton(
239 socket.AF_INET6, src_ip))
241 src_ip = socket.inet_pton(socket.AF_INET6, src_ip).hex()
244 if sys.version_info[0] == 2:
245 dst_ip = binascii.hexlify(socket.inet_pton(
246 socket.AF_INET6, dst_ip))
248 dst_ip = socket.inet_pton(socket.AF_INET6, dst_ip).hex()
250 return ('{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}'.format(
251 hex(nh)[2:], src_ip, dst_ip, hex(src_port)[2:],
252 hex(dst_port)[2:])).rstrip('0')
255 def build_payload_match(matches):
258 for match in matches:
259 sval = str(hex(match.value)[2:])
260 # offset is specified in bytes, convert to hex format.
261 length = (match.offset + match.length) * 2
263 format_spec = '{!s:0>' + str(length) + '}'
264 payload_match += format_spec.format(sval)
266 return payload_match.rstrip('0')
268 def create_stream(self, src_if, dst_if, packet_sizes,
269 proto_l=UDP(sport=1234, dport=5678),
271 """Create input packet stream for defined interfaces.
273 :param VppInterface src_if: Source Interface for packet stream.
274 :param VppInterface dst_if: Destination Interface for packet stream.
275 :param list packet_sizes: packet size to test.
276 :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
280 for size in packet_sizes:
281 info = self.create_packet_info(src_if, dst_if)
282 payload = self.info_to_payload(info)
284 # append any additional payload after info
285 if payload_ex is not None:
286 payload += payload_ex
288 if self.af == AF_INET:
289 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
290 IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) /
293 elif self.af == AF_INET6:
294 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
295 IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6) /
299 self.extend_packet(p, size)
303 def verify_capture(self, dst_if, capture, proto_l=UDP):
304 """Verify captured input packet stream for defined interface.
306 :param VppInterface dst_if: Interface to verify captured packet stream.
307 :param list capture: Captured packet stream.
308 :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
311 if self.af == AF_INET6:
313 self.logger.info("Verifying capture on interface %s" % dst_if.name)
315 for i in self.interfaces:
316 last_info[i.sw_if_index] = None
317 dst_sw_if_index = dst_if.sw_if_index
318 for packet in capture:
320 ip_received = packet[ip_proto]
321 proto_received = packet[proto_l]
322 payload_info = self.payload_to_info(packet[Raw])
323 packet_index = payload_info.index
324 self.assertEqual(payload_info.dst, dst_sw_if_index)
326 "Got packet on port %s: src=%u (id=%u)" %
327 (dst_if.name, payload_info.src, packet_index))
328 next_info = self.get_next_packet_info_for_interface2(
329 payload_info.src, dst_sw_if_index,
330 last_info[payload_info.src])
331 last_info[payload_info.src] = next_info
332 self.assertTrue(next_info is not None)
333 self.assertEqual(packet_index, next_info.index)
334 saved_packet = next_info.data
335 ip_saved = saved_packet[ip_proto]
336 proto_saved = saved_packet[proto_l]
337 # Check standard fields
338 self.assertEqual(ip_received.src, ip_saved.src)
339 self.assertEqual(ip_received.dst, ip_saved.dst)
340 self.assertEqual(proto_received.sport, proto_saved.sport)
341 self.assertEqual(proto_received.dport, proto_saved.dport)
342 except BaseException:
343 self.logger.error(ppp("Unexpected or invalid packet:", packet))
345 for i in self.interfaces:
346 remaining_packet = self.get_next_packet_info_for_interface2(
347 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
348 self.assertTrue(remaining_packet is None,
349 "Interface %s: Packet expected from interface %s "
350 "didn't arrive" % (dst_if.name, i.name))
352 def create_classify_table(self, key, mask, data_offset=0,
353 next_table_index=None):
354 """Create Classify Table
356 :param str key: key for classify table (ex, ACL name).
357 :param str mask: mask value for interested traffic.
358 :param int data_offset:
359 :param str next_table_index
361 mask_match, mask_match_len = self._resolve_mask_match(mask)
362 r = self.vapi.classify_add_del_table(
365 mask_len=mask_match_len,
366 match_n_vectors=(len(mask) - 1) // 32 + 1,
369 current_data_offset=data_offset,
370 next_table_index=next_table_index)
371 self.assertIsNotNone(r, 'No response msg for add_del_table')
372 self.acl_tbl_idx[key] = r.new_table_index
374 def create_classify_session(self, table_index, match, pbr_option=0,
376 """Create Classify Session
378 :param int table_index: table index to identify classify table.
379 :param str match: matched value for interested traffic.
380 :param int pbr_option: enable/disable PBR feature.
381 :param int vrfid: VRF id.
382 :param int is_add: option to configure classify session.
383 - create(1) or delete(0)
385 mask_match, mask_match_len = self._resolve_mask_match(match)
386 r = self.vapi.classify_add_del_session(
388 table_index=table_index,
390 match_len=mask_match_len,
394 self.assertIsNotNone(r, 'No response msg for add_del_session')
396 def input_acl_set_interface(self, intf, table_index, is_add=1):
397 """Configure Input ACL interface
399 :param VppInterface intf: Interface to apply Input ACL feature.
400 :param int table_index: table index to identify classify table.
401 :param int is_add: option to configure classify session.
402 - enable(1) or disable(0)
405 if self.af == AF_INET:
406 r = self.vapi.input_acl_set_interface(
409 ip4_table_index=table_index)
410 elif self.af == AF_INET6:
411 r = self.vapi.input_acl_set_interface(
414 ip6_table_index=table_index)
416 r = self.vapi.input_acl_set_interface(
419 l2_table_index=table_index)
420 self.assertIsNotNone(r, 'No response msg for acl_set_interface')
422 def output_acl_set_interface(self, intf, table_index, is_add=1):
423 """Configure Output ACL interface
425 :param VppInterface intf: Interface to apply Output ACL feature.
426 :param int table_index: table index to identify classify table.
427 :param int is_add: option to configure classify session.
428 - enable(1) or disable(0)
431 if self.af == AF_INET:
432 r = self.vapi.output_acl_set_interface(
435 ip4_table_index=table_index)
436 elif self.af == AF_INET6:
437 r = self.vapi.output_acl_set_interface(
440 ip6_table_index=table_index)
442 r = self.vapi.output_acl_set_interface(
445 l2_table_index=table_index)
446 self.assertIsNotNone(r, 'No response msg for acl_set_interface')
448 def config_pbr_fib_entry(self, intf, is_add=1):
449 """Configure fib entry to route traffic toward PBR VRF table
451 :param VppInterface intf: destination interface to be routed for PBR.
455 self.vapi.ip_add_del_route(dst_address=intf.local_ip4,
456 dst_address_length=addr_len,
457 next_hop_address=intf.remote_ip4,
458 table_id=self.pbr_vrfid, is_add=is_add)
460 def verify_vrf(self, vrf_id):
462 Check if the FIB table / VRF ID is configured.
464 :param int vrf_id: The FIB table / VRF ID to be verified.
465 :return: 1 if the FIB table / VRF ID is configured, otherwise return 0.
467 ip_fib_dump = self.vapi.ip_route_dump(vrf_id, False)
468 vrf_count = len(ip_fib_dump)
470 self.logger.info("IPv4 VRF ID %d is not configured" % vrf_id)
473 self.logger.info("IPv4 VRF ID %d is configured" % vrf_id)