8 from framework import VppTestCase, VppTestRunner
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether
12 from scapy.layers.inet6 import IPv6, UDP, TCP
16 class TestClassifier(VppTestCase):
17 """ Classifier Test Case """
22 Perform standard class setup (defined by class method setUpClass in
23 class VppTestCase) before running the test case, set test case related
24 variables and configure VPP.
26 super(TestClassifier, cls).setUpClass()
27 cls.acl_active_table = ''
31 Perform test setup before test case.
34 - create 4 pg interfaces
35 - untagged pg0/pg1/pg2 interface
36 pg0 -------> pg1 (IP ACL)
40 - put it into UP state
42 - resolve neighbor address using NDP
44 :ivar list interfaces: pg interfaces.
45 :ivar list pg_if_packet_sizes: packet sizes in test.
46 :ivar dict acl_tbl_idx: ACL table index.
47 :ivar int pbr_vrfid: VRF id for PBR test.
49 self.reset_packet_infos()
50 super(TestClassifier, self).setUp()
52 # create 4 pg interfaces
53 self.create_pg_interfaces(range(3))
55 # packet sizes to test
56 self.pg_if_packet_sizes = [64, 9018]
58 self.interfaces = list(self.pg_interfaces)
63 # setup all interfaces
64 for intf in self.interfaces:
70 """Run standard test teardown and acl related log."""
72 self.logger.info(self.vapi.ppcli("show inacl type ip6"))
73 self.logger.info(self.vapi.ppcli("show outacl type ip6"))
74 self.logger.info(self.vapi.cli("show classify table verbose"))
75 self.logger.info(self.vapi.cli("show ip fib"))
76 if self.acl_active_table == 'ip6_out':
77 self.output_acl_set_interface(
78 self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
79 self.acl_active_table = ''
80 elif self.acl_active_table != '':
81 self.input_acl_set_interface(
82 self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
83 self.acl_active_table = ''
84 for intf in self.interfaces:
88 super(TestClassifier, self).tearDown()
90 def create_stream(self, src_if, dst_if, packet_sizes,
91 proto_l=UDP(sport=1234, dport=5678)):
92 """Create input packet stream for defined interfaces.
94 :param VppInterface src_if: Source Interface for packet stream.
95 :param VppInterface dst_if: Destination Interface for packet stream.
96 :param list packet_sizes: packet size to test.
97 :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
101 for size in packet_sizes:
102 info = self.create_packet_info(src_if, dst_if)
103 payload = self.info_to_payload(info)
104 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
105 IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6) /
109 self.extend_packet(p, size)
113 def verify_capture(self, dst_if, capture, proto_l=UDP):
114 """Verify captured input packet stream for defined interface.
116 :param VppInterface dst_if: Interface to verify captured packet stream.
117 :param list capture: Captured packet stream.
118 :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
120 self.logger.info("Verifying capture on interface %s" % dst_if.name)
122 for i in self.interfaces:
123 last_info[i.sw_if_index] = None
124 dst_sw_if_index = dst_if.sw_if_index
125 for packet in capture:
127 ip6_received = packet[IPv6]
128 proto_received = packet[proto_l]
129 payload_info = self.payload_to_info(str(packet[Raw]))
130 packet_index = payload_info.index
131 self.assertEqual(payload_info.dst, dst_sw_if_index)
133 "Got packet on port %s: src=%u (id=%u)" %
134 (dst_if.name, payload_info.src, packet_index))
135 next_info = self.get_next_packet_info_for_interface2(
136 payload_info.src, dst_sw_if_index,
137 last_info[payload_info.src])
138 last_info[payload_info.src] = next_info
139 self.assertTrue(next_info is not None)
140 self.assertEqual(packet_index, next_info.index)
141 saved_packet = next_info.data
142 ip_saved = saved_packet[IPv6]
143 proto_saved = saved_packet[proto_l]
144 # Check standard fields
145 self.assertEqual(ip6_received.src, ip_saved.src)
146 self.assertEqual(ip6_received.dst, ip_saved.dst)
147 self.assertEqual(proto_received.sport, proto_saved.sport)
148 self.assertEqual(proto_received.dport, proto_saved.dport)
150 self.logger.error(ppp("Unexpected or invalid packet:", packet))
152 for i in self.interfaces:
153 remaining_packet = self.get_next_packet_info_for_interface2(
154 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
155 self.assertTrue(remaining_packet is None,
156 "Interface %s: Packet expected from interface %s "
157 "didn't arrive" % (dst_if.name, i.name))
160 def build_ip6_mask(nh='', src_ip='', dst_ip='',
161 src_port='', dst_port=''):
162 """Build IPv6 ACL mask data with hexstring format.
164 :param str nh: next header number <0-ff>
165 :param str src_ip: source ip address <0-ffffffff>
166 :param str dst_ip: destination ip address <0-ffffffff>
167 :param str src_port: source port number <0-ffff>
168 :param str dst_port: destination port number <0-ffff>
171 return ('{:0>14}{:0>34}{:0>32}{:0>4}{:0>4}'.format(
172 nh, src_ip, dst_ip, src_port, dst_port)).rstrip('0')
175 def build_ip6_match(nh=0, src_ip='', dst_ip='',
176 src_port=0, dst_port=0):
177 """Build IPv6 ACL match data with hexstring format.
179 :param int nh: next header number with valid option "x"
180 :param str src_ip: source ip6 address with format of "xxx:xxxx::xxxx"
181 :param str dst_ip: destination ip6 address with format of
183 :param int src_port: source port number "x"
184 :param int dst_port: destination port number "x"
187 src_ip = socket.inet_pton(socket.AF_INET6, src_ip).encode('hex')
189 dst_ip = socket.inet_pton(socket.AF_INET6, dst_ip).encode('hex')
191 return ('{:0>14}{:0>34}{:0>32}{:0>4}{:0>4}'.format(
192 hex(nh)[2:], src_ip, dst_ip, hex(src_port)[2:],
193 hex(dst_port)[2:])).rstrip('0')
196 def build_mac_mask(dst_mac='', src_mac='', ether_type=''):
197 """Build MAC ACL mask data with hexstring format.
199 :param str dst_mac: source MAC address <0-ffffffffffff>
200 :param str src_mac: destination MAC address <0-ffffffffffff>
201 :param str ether_type: ethernet type <0-ffff>
204 return ('{:0>12}{:0>12}{:0>4}'.format(dst_mac, src_mac,
205 ether_type)).rstrip('0')
208 def build_mac_match(dst_mac='', src_mac='', ether_type=''):
209 """Build MAC ACL match data with hexstring format.
211 :param str dst_mac: source MAC address <x:x:x:x:x:x>
212 :param str src_mac: destination MAC address <x:x:x:x:x:x>
213 :param str ether_type: ethernet type <0-ffff>
216 dst_mac = dst_mac.replace(':', '')
218 src_mac = src_mac.replace(':', '')
220 return ('{:0>12}{:0>12}{:0>4}'.format(dst_mac, src_mac,
221 ether_type)).rstrip('0')
223 def create_classify_table(self, key, mask, data_offset=0):
224 """Create Classify Table
226 :param str key: key for classify table (ex, ACL name).
227 :param str mask: mask value for interested traffic.
228 :param int data_offset:
230 r = self.vapi.classify_add_del_table(
232 mask=binascii.unhexlify(mask),
233 match_n_vectors=(len(mask) - 1) // 32 + 1,
236 current_data_offset=data_offset)
237 self.assertIsNotNone(r, msg='No response msg for add_del_table')
238 self.acl_tbl_idx[key] = r.new_table_index
240 def create_classify_session(self, table_index, match, vrfid=0, is_add=1):
241 """Create Classify Session
243 :param int table_index: table index to identify classify table.
244 :param str match: matched value for interested traffic.
245 :param int vrfid: VRF id.
246 :param int is_add: option to configure classify session.
247 - create(1) or delete(0)
249 r = self.vapi.classify_add_del_session(
252 binascii.unhexlify(match),
255 self.assertIsNotNone(r, msg='No response msg for add_del_session')
257 def input_acl_set_interface(self, intf, table_index, is_add=1):
258 """Configure Input ACL interface
260 :param VppInterface intf: Interface to apply Input ACL feature.
261 :param int table_index: table index to identify classify table.
262 :param int is_add: option to configure classify session.
263 - enable(1) or disable(0)
265 r = self.vapi.input_acl_set_interface(
268 ip6_table_index=table_index)
269 self.assertIsNotNone(r, msg='No response msg for acl_set_interface')
271 def output_acl_set_interface(self, intf, table_index, is_add=1):
272 """Configure Output ACL interface
274 :param VppInterface intf: Interface to apply Output ACL feature.
275 :param int table_index: table index to identify classify table.
276 :param int is_add: option to configure classify session.
277 - enable(1) or disable(0)
279 r = self.vapi.output_acl_set_interface(
282 ip6_table_index=table_index)
283 self.assertIsNotNone(r, msg='No response msg for acl_set_interface')
286 class TestClassifierIP6(TestClassifier):
287 """ Classifier IP6 Test Case """
289 def test_iacl_src_ip(self):
290 """ Source IP6 iACL test
292 Test scenario for basic IP ACL with source IP
293 - Create IPv6 stream for pg0 -> pg1 interface.
294 - Create iACL with source IP address.
295 - Send and verify received packets on pg1 interface.
298 # Basic iACL testing with source IP
299 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
300 self.pg0.add_stream(pkts)
303 self.create_classify_table(
305 self.build_ip6_mask(src_ip='ffffffffffffffffffffffffffffffff'))
306 self.create_classify_session(
307 self.acl_tbl_idx.get(key),
308 self.build_ip6_match(src_ip=self.pg0.remote_ip6))
309 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
310 self.acl_active_table = key
312 self.pg_enable_capture(self.pg_interfaces)
315 pkts = self.pg1.get_capture(len(pkts))
316 self.verify_capture(self.pg1, pkts)
317 self.pg0.assert_nothing_captured(remark="packets forwarded")
318 self.pg2.assert_nothing_captured(remark="packets forwarded")
320 def test_iacl_dst_ip(self):
321 """ Destination IP6 iACL test
323 Test scenario for basic IP ACL with destination IP
324 - Create IPv6 stream for pg0 -> pg1 interface.
325 - Create iACL with destination IP address.
326 - Send and verify received packets on pg1 interface.
329 # Basic iACL testing with destination IP
330 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
331 self.pg0.add_stream(pkts)
334 self.create_classify_table(
336 self.build_ip6_mask(dst_ip='ffffffffffffffffffffffffffffffff'))
337 self.create_classify_session(
338 self.acl_tbl_idx.get(key),
339 self.build_ip6_match(dst_ip=self.pg1.remote_ip6))
340 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
341 self.acl_active_table = key
343 self.pg_enable_capture(self.pg_interfaces)
346 pkts = self.pg1.get_capture(len(pkts))
347 self.verify_capture(self.pg1, pkts)
348 self.pg0.assert_nothing_captured(remark="packets forwarded")
349 self.pg2.assert_nothing_captured(remark="packets forwarded")
351 def test_iacl_src_dst_ip(self):
352 """ Source and destination IP6 iACL test
354 Test scenario for basic IP ACL with source and destination IP
355 - Create IPv4 stream for pg0 -> pg1 interface.
356 - Create iACL with source and destination IP addresses.
357 - Send and verify received packets on pg1 interface.
360 # Basic iACL testing with source and destination IP
361 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
362 self.pg0.add_stream(pkts)
365 self.create_classify_table(
367 self.build_ip6_mask(src_ip='ffffffffffffffffffffffffffffffff',
368 dst_ip='ffffffffffffffffffffffffffffffff'))
369 self.create_classify_session(
370 self.acl_tbl_idx.get(key),
371 self.build_ip6_match(src_ip=self.pg0.remote_ip6,
372 dst_ip=self.pg1.remote_ip6))
373 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
374 self.acl_active_table = key
376 self.pg_enable_capture(self.pg_interfaces)
379 pkts = self.pg1.get_capture(len(pkts))
380 self.verify_capture(self.pg1, pkts)
381 self.pg0.assert_nothing_captured(remark="packets forwarded")
382 self.pg2.assert_nothing_captured(remark="packets forwarded")
385 # Tests split to different test case classes because of issue reported in
387 class TestClassifierIP6UDP(TestClassifier):
388 """ Classifier IP6 UDP proto Test Case """
390 def test_iacl_proto_udp(self):
391 """ IP6 UDP protocol iACL test
393 Test scenario for basic protocol ACL with UDP protocol
394 - Create IPv6 stream for pg0 -> pg1 interface.
395 - Create iACL with UDP IP protocol.
396 - Send and verify received packets on pg1 interface.
399 # Basic iACL testing with UDP protocol
400 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
401 self.pg0.add_stream(pkts)
404 self.create_classify_table(key, self.build_ip6_mask(nh='ff'))
405 self.create_classify_session(
406 self.acl_tbl_idx.get(key),
407 self.build_ip6_match(nh=socket.IPPROTO_UDP))
408 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
409 self.acl_active_table = key
411 self.pg_enable_capture(self.pg_interfaces)
414 pkts = self.pg1.get_capture(len(pkts))
415 self.verify_capture(self.pg1, pkts)
416 self.pg0.assert_nothing_captured(remark="packets forwarded")
417 self.pg2.assert_nothing_captured(remark="packets forwarded")
419 def test_iacl_proto_udp_sport(self):
420 """ IP6 UDP source port iACL test
422 Test scenario for basic protocol ACL with UDP and sport
423 - Create IPv6 stream for pg0 -> pg1 interface.
424 - Create iACL with UDP IP protocol and defined sport.
425 - Send and verify received packets on pg1 interface.
428 # Basic iACL testing with UDP and sport
430 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
431 UDP(sport=sport, dport=5678))
432 self.pg0.add_stream(pkts)
435 self.create_classify_table(
436 key, self.build_ip6_mask(nh='ff', src_port='ffff'))
437 self.create_classify_session(
438 self.acl_tbl_idx.get(key),
439 self.build_ip6_match(nh=socket.IPPROTO_UDP, src_port=sport))
440 self.input_acl_set_interface(
441 self.pg0, self.acl_tbl_idx.get(key))
442 self.acl_active_table = key
444 self.pg_enable_capture(self.pg_interfaces)
447 pkts = self.pg1.get_capture(len(pkts))
448 self.verify_capture(self.pg1, pkts)
449 self.pg0.assert_nothing_captured(remark="packets forwarded")
450 self.pg2.assert_nothing_captured(remark="packets forwarded")
452 def test_iacl_proto_udp_dport(self):
453 """ IP6 UDP destination port iACL test
455 Test scenario for basic protocol ACL with UDP and dport
456 - Create IPv6 stream for pg0 -> pg1 interface.
457 - Create iACL with UDP IP protocol and defined dport.
458 - Send and verify received packets on pg1 interface.
461 # Basic iACL testing with UDP and dport
463 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
464 UDP(sport=1234, dport=dport))
465 self.pg0.add_stream(pkts)
468 self.create_classify_table(
469 key, self.build_ip6_mask(nh='ff', dst_port='ffff'))
470 self.create_classify_session(
471 self.acl_tbl_idx.get(key),
472 self.build_ip6_match(nh=socket.IPPROTO_UDP, dst_port=dport))
473 self.input_acl_set_interface(
474 self.pg0, self.acl_tbl_idx.get(key))
475 self.acl_active_table = key
477 self.pg_enable_capture(self.pg_interfaces)
480 pkts = self.pg1.get_capture(len(pkts))
481 self.verify_capture(self.pg1, pkts)
482 self.pg0.assert_nothing_captured(remark="packets forwarded")
483 self.pg2.assert_nothing_captured(remark="packets forwarded")
485 def test_iacl_proto_udp_sport_dport(self):
486 """ IP6 UDP source and destination ports iACL test
488 Test scenario for basic protocol ACL with UDP and sport and dport
489 - Create IPv6 stream for pg0 -> pg1 interface.
490 - Create iACL with UDP IP protocol and defined sport and dport.
491 - Send and verify received packets on pg1 interface.
494 # Basic iACL testing with UDP and sport and dport
497 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
498 UDP(sport=sport, dport=dport))
499 self.pg0.add_stream(pkts)
502 self.create_classify_table(
504 self.build_ip6_mask(nh='ff', src_port='ffff', dst_port='ffff'))
505 self.create_classify_session(
506 self.acl_tbl_idx.get(key),
507 self.build_ip6_match(nh=socket.IPPROTO_UDP, src_port=sport,
509 self.input_acl_set_interface(
510 self.pg0, self.acl_tbl_idx.get(key))
511 self.acl_active_table = key
513 self.pg_enable_capture(self.pg_interfaces)
516 pkts = self.pg1.get_capture(len(pkts))
517 self.verify_capture(self.pg1, pkts)
518 self.pg0.assert_nothing_captured(remark="packets forwarded")
519 self.pg2.assert_nothing_captured(remark="packets forwarded")
522 class TestClassifierIP6TCP(TestClassifier):
523 """ Classifier IP6 TCP proto Test Case """
525 def test_iacl_proto_tcp(self):
526 """ IP6 TCP protocol iACL test
528 Test scenario for basic protocol ACL with TCP protocol
529 - Create IPv6 stream for pg0 -> pg1 interface.
530 - Create iACL with TCP IP protocol.
531 - Send and verify received packets on pg1 interface.
534 # Basic iACL testing with TCP protocol
535 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
536 TCP(sport=1234, dport=5678))
537 self.pg0.add_stream(pkts)
540 self.create_classify_table(key, self.build_ip6_mask(nh='ff'))
541 self.create_classify_session(
542 self.acl_tbl_idx.get(key),
543 self.build_ip6_match(nh=socket.IPPROTO_TCP))
544 self.input_acl_set_interface(
545 self.pg0, self.acl_tbl_idx.get(key))
546 self.acl_active_table = key
548 self.pg_enable_capture(self.pg_interfaces)
551 pkts = self.pg1.get_capture(len(pkts))
552 self.verify_capture(self.pg1, pkts, TCP)
553 self.pg0.assert_nothing_captured(remark="packets forwarded")
554 self.pg2.assert_nothing_captured(remark="packets forwarded")
556 def test_iacl_proto_tcp_sport(self):
557 """ IP6 TCP source port iACL test
559 Test scenario for basic protocol ACL with TCP and sport
560 - Create IPv6 stream for pg0 -> pg1 interface.
561 - Create iACL with TCP IP protocol and defined sport.
562 - Send and verify received packets on pg1 interface.
565 # Basic iACL testing with TCP and sport
567 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
568 TCP(sport=sport, dport=5678))
569 self.pg0.add_stream(pkts)
572 self.create_classify_table(
573 key, self.build_ip6_mask(nh='ff', src_port='ffff'))
574 self.create_classify_session(
575 self.acl_tbl_idx.get(key),
576 self.build_ip6_match(nh=socket.IPPROTO_TCP, src_port=sport))
577 self.input_acl_set_interface(
578 self.pg0, self.acl_tbl_idx.get(key))
579 self.acl_active_table = key
581 self.pg_enable_capture(self.pg_interfaces)
584 pkts = self.pg1.get_capture(len(pkts))
585 self.verify_capture(self.pg1, pkts, TCP)
586 self.pg0.assert_nothing_captured(remark="packets forwarded")
587 self.pg2.assert_nothing_captured(remark="packets forwarded")
589 def test_iacl_proto_tcp_dport(self):
590 """ IP6 TCP destination port iACL test
592 Test scenario for basic protocol ACL with TCP and dport
593 - Create IPv6 stream for pg0 -> pg1 interface.
594 - Create iACL with TCP IP protocol and defined dport.
595 - Send and verify received packets on pg1 interface.
598 # Basic iACL testing with TCP and dport
600 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
601 TCP(sport=1234, dport=dport))
602 self.pg0.add_stream(pkts)
605 self.create_classify_table(
606 key, self.build_ip6_mask(nh='ff', dst_port='ffff'))
607 self.create_classify_session(
608 self.acl_tbl_idx.get(key),
609 self.build_ip6_match(nh=socket.IPPROTO_TCP, dst_port=dport))
610 self.input_acl_set_interface(
611 self.pg0, self.acl_tbl_idx.get(key))
612 self.acl_active_table = key
614 self.pg_enable_capture(self.pg_interfaces)
617 pkts = self.pg1.get_capture(len(pkts))
618 self.verify_capture(self.pg1, pkts, TCP)
619 self.pg0.assert_nothing_captured(remark="packets forwarded")
620 self.pg2.assert_nothing_captured(remark="packets forwarded")
622 def test_iacl_proto_tcp_sport_dport(self):
623 """ IP6 TCP source and destination ports iACL test
625 Test scenario for basic protocol ACL with TCP and sport and dport
626 - Create IPv6 stream for pg0 -> pg1 interface.
627 - Create iACL with TCP IP protocol and defined sport and dport.
628 - Send and verify received packets on pg1 interface.
631 # Basic iACL testing with TCP and sport and dport
634 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
635 TCP(sport=sport, dport=dport))
636 self.pg0.add_stream(pkts)
639 self.create_classify_table(
641 self.build_ip6_mask(nh='ff', src_port='ffff', dst_port='ffff'))
642 self.create_classify_session(
643 self.acl_tbl_idx.get(key),
644 self.build_ip6_match(nh=socket.IPPROTO_TCP, src_port=sport,
646 self.input_acl_set_interface(
647 self.pg0, self.acl_tbl_idx.get(key))
648 self.acl_active_table = key
650 self.pg_enable_capture(self.pg_interfaces)
653 pkts = self.pg1.get_capture(len(pkts))
654 self.verify_capture(self.pg1, pkts, TCP)
655 self.pg0.assert_nothing_captured(remark="packets forwarded")
656 self.pg2.assert_nothing_captured(remark="packets forwarded")
659 class TestClassifierIP6Out(TestClassifier):
660 """ Classifier output IP6 Test Case """
662 def test_acl_ip_out(self):
663 """ Output IP6 ACL test
665 Test scenario for basic IP ACL with source IP
666 - Create IPv6 stream for pg1 -> pg0 interface.
667 - Create ACL with source IP address.
668 - Send and verify received packets on pg0 interface.
671 # Basic oACL testing with source IP
672 pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes)
673 self.pg1.add_stream(pkts)
676 self.create_classify_table(
678 self.build_ip6_mask(src_ip='ffffffffffffffffffffffffffffffff'),
680 self.create_classify_session(
681 self.acl_tbl_idx.get(key),
682 self.build_ip6_match(src_ip=self.pg1.remote_ip6))
683 self.output_acl_set_interface(
684 self.pg0, self.acl_tbl_idx.get(key))
685 self.acl_active_table = key
687 self.pg_enable_capture(self.pg_interfaces)
690 pkts = self.pg0.get_capture(len(pkts))
691 self.verify_capture(self.pg0, pkts)
692 self.pg1.assert_nothing_captured(remark="packets forwarded")
693 self.pg2.assert_nothing_captured(remark="packets forwarded")
696 class TestClassifierIP6MAC(TestClassifier):
697 """ Classifier IP6 MAC Test Case """
699 def test_acl_mac(self):
700 """ IP6 MAC iACL test
702 Test scenario for basic MAC ACL with source MAC
703 - Create IPv6 stream for pg0 -> pg2 interface.
704 - Create ACL with source MAC address.
705 - Send and verify received packets on pg2 interface.
708 # Basic iACL testing with source MAC
709 pkts = self.create_stream(self.pg0, self.pg2, self.pg_if_packet_sizes)
710 self.pg0.add_stream(pkts)
713 self.create_classify_table(
714 key, self.build_mac_mask(src_mac='ffffffffffff'), data_offset=-14)
715 self.create_classify_session(
716 self.acl_tbl_idx.get(key),
717 self.build_mac_match(src_mac=self.pg0.remote_mac))
718 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
719 self.acl_active_table = key
721 self.pg_enable_capture(self.pg_interfaces)
724 pkts = self.pg2.get_capture(len(pkts))
725 self.verify_capture(self.pg2, pkts)
726 self.pg0.assert_nothing_captured(remark="packets forwarded")
727 self.pg1.assert_nothing_captured(remark="packets forwarded")
730 if __name__ == '__main__':
731 unittest.main(testRunner=VppTestRunner)