8 from framework import VppTestCase, VppTestRunner
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether
12 from scapy.layers.inet6 import IPv6, UDP, TCP
16 class TestClassifier(VppTestCase):
17 """ Classifier Test Case """
22 Perform standard class setup (defined by class method setUpClass in
23 class VppTestCase) before running the test case, set test case related
24 variables and configure VPP.
26 super(TestClassifier, cls).setUpClass()
27 cls.acl_active_table = ''
30 def tearDownClass(cls):
31 super(TestClassifier, cls).tearDownClass()
35 Perform test setup before test case.
38 - create 4 pg interfaces
39 - untagged pg0/pg1/pg2 interface
40 pg0 -------> pg1 (IP ACL)
44 - put it into UP state
46 - resolve neighbor address using NDP
48 :ivar list interfaces: pg interfaces.
49 :ivar list pg_if_packet_sizes: packet sizes in test.
50 :ivar dict acl_tbl_idx: ACL table index.
51 :ivar int pbr_vrfid: VRF id for PBR test.
53 self.reset_packet_infos()
54 super(TestClassifier, self).setUp()
56 # create 4 pg interfaces
57 self.create_pg_interfaces(range(3))
59 # packet sizes to test
60 self.pg_if_packet_sizes = [64, 9018]
62 self.interfaces = list(self.pg_interfaces)
67 # setup all interfaces
68 for intf in self.interfaces:
74 """Run standard test teardown and acl related log."""
76 self.logger.info(self.vapi.ppcli("show inacl type ip6"))
77 self.logger.info(self.vapi.ppcli("show outacl type ip6"))
78 self.logger.info(self.vapi.cli("show classify table verbose"))
79 self.logger.info(self.vapi.cli("show ip fib"))
80 if self.acl_active_table == 'ip6_out':
81 self.output_acl_set_interface(
82 self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
83 self.acl_active_table = ''
84 elif self.acl_active_table != '':
85 self.input_acl_set_interface(
86 self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
87 self.acl_active_table = ''
88 for intf in self.interfaces:
92 super(TestClassifier, self).tearDown()
94 def create_stream(self, src_if, dst_if, packet_sizes,
95 proto_l=UDP(sport=1234, dport=5678)):
96 """Create input packet stream for defined interfaces.
98 :param VppInterface src_if: Source Interface for packet stream.
99 :param VppInterface dst_if: Destination Interface for packet stream.
100 :param list packet_sizes: packet size to test.
101 :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
105 for size in packet_sizes:
106 info = self.create_packet_info(src_if, dst_if)
107 payload = self.info_to_payload(info)
108 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
109 IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6) /
113 self.extend_packet(p, size)
117 def verify_capture(self, dst_if, capture, proto_l=UDP):
118 """Verify captured input packet stream for defined interface.
120 :param VppInterface dst_if: Interface to verify captured packet stream.
121 :param list capture: Captured packet stream.
122 :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
124 self.logger.info("Verifying capture on interface %s" % dst_if.name)
126 for i in self.interfaces:
127 last_info[i.sw_if_index] = None
128 dst_sw_if_index = dst_if.sw_if_index
129 for packet in capture:
131 ip6_received = packet[IPv6]
132 proto_received = packet[proto_l]
133 payload_info = self.payload_to_info(packet[Raw])
134 packet_index = payload_info.index
135 self.assertEqual(payload_info.dst, dst_sw_if_index)
137 "Got packet on port %s: src=%u (id=%u)" %
138 (dst_if.name, payload_info.src, packet_index))
139 next_info = self.get_next_packet_info_for_interface2(
140 payload_info.src, dst_sw_if_index,
141 last_info[payload_info.src])
142 last_info[payload_info.src] = next_info
143 self.assertTrue(next_info is not None)
144 self.assertEqual(packet_index, next_info.index)
145 saved_packet = next_info.data
146 ip_saved = saved_packet[IPv6]
147 proto_saved = saved_packet[proto_l]
148 # Check standard fields
149 self.assertEqual(ip6_received.src, ip_saved.src)
150 self.assertEqual(ip6_received.dst, ip_saved.dst)
151 self.assertEqual(proto_received.sport, proto_saved.sport)
152 self.assertEqual(proto_received.dport, proto_saved.dport)
154 self.logger.error(ppp("Unexpected or invalid packet:", packet))
156 for i in self.interfaces:
157 remaining_packet = self.get_next_packet_info_for_interface2(
158 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
159 self.assertTrue(remaining_packet is None,
160 "Interface %s: Packet expected from interface %s "
161 "didn't arrive" % (dst_if.name, i.name))
164 def build_ip6_mask(nh='', src_ip='', dst_ip='',
165 src_port='', dst_port=''):
166 """Build IPv6 ACL mask data with hexstring format.
168 :param str nh: next header number <0-ff>
169 :param str src_ip: source ip address <0-ffffffff>
170 :param str dst_ip: destination ip address <0-ffffffff>
171 :param str src_port: source port number <0-ffff>
172 :param str dst_port: destination port number <0-ffff>
175 return ('{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}'.format(
176 nh, src_ip, dst_ip, src_port, dst_port)).rstrip('0')
179 def build_ip6_match(nh=0, src_ip='', dst_ip='',
180 src_port=0, dst_port=0):
181 """Build IPv6 ACL match data with hexstring format.
183 :param int nh: next header number with valid option "x"
184 :param str src_ip: source ip6 address with format of "xxx:xxxx::xxxx"
185 :param str dst_ip: destination ip6 address with format of
187 :param int src_port: source port number "x"
188 :param int dst_port: destination port number "x"
191 src_ip = binascii.hexlify(socket.inet_pton(
192 socket.AF_INET6, src_ip))
194 dst_ip = binascii.hexlify(socket.inet_pton(
195 socket.AF_INET6, dst_ip))
197 return ('{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}'.format(
198 hex(nh)[2:], src_ip, dst_ip, hex(src_port)[2:],
199 hex(dst_port)[2:])).rstrip('0')
202 def build_mac_mask(dst_mac='', src_mac='', ether_type=''):
203 """Build MAC ACL mask data with hexstring format.
205 :param str dst_mac: source MAC address <0-ffffffffffff>
206 :param str src_mac: destination MAC address <0-ffffffffffff>
207 :param str ether_type: ethernet type <0-ffff>
210 return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
211 dst_mac, src_mac, ether_type)).rstrip('0')
214 def build_mac_match(dst_mac='', src_mac='', ether_type=''):
215 """Build MAC ACL match data with hexstring format.
217 :param str dst_mac: source MAC address <x:x:x:x:x:x>
218 :param str src_mac: destination MAC address <x:x:x:x:x:x>
219 :param str ether_type: ethernet type <0-ffff>
222 dst_mac = dst_mac.replace(':', '')
224 src_mac = src_mac.replace(':', '')
226 return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
227 dst_mac, src_mac, ether_type)).rstrip('0')
229 def create_classify_table(self, key, mask, data_offset=0):
230 """Create Classify Table
232 :param str key: key for classify table (ex, ACL name).
233 :param str mask: mask value for interested traffic.
234 :param int data_offset:
236 r = self.vapi.classify_add_del_table(
238 mask=binascii.unhexlify(mask),
239 match_n_vectors=(len(mask) - 1) // 32 + 1,
242 current_data_offset=data_offset)
243 self.assertIsNotNone(r, 'No response msg for add_del_table')
244 self.acl_tbl_idx[key] = r.new_table_index
246 def create_classify_session(self, table_index, match, vrfid=0, is_add=1):
247 """Create Classify Session
249 :param int table_index: table index to identify classify table.
250 :param str match: matched value for interested traffic.
251 :param int vrfid: VRF id.
252 :param int is_add: option to configure classify session.
253 - create(1) or delete(0)
255 r = self.vapi.classify_add_del_session(
258 binascii.unhexlify(match),
261 self.assertIsNotNone(r, 'No response msg for add_del_session')
263 def input_acl_set_interface(self, intf, table_index, is_add=1):
264 """Configure Input ACL interface
266 :param VppInterface intf: Interface to apply Input ACL feature.
267 :param int table_index: table index to identify classify table.
268 :param int is_add: option to configure classify session.
269 - enable(1) or disable(0)
271 r = self.vapi.input_acl_set_interface(
274 ip6_table_index=table_index)
275 self.assertIsNotNone(r, 'No response msg for acl_set_interface')
277 def output_acl_set_interface(self, intf, table_index, is_add=1):
278 """Configure Output ACL interface
280 :param VppInterface intf: Interface to apply Output ACL feature.
281 :param int table_index: table index to identify classify table.
282 :param int is_add: option to configure classify session.
283 - enable(1) or disable(0)
285 r = self.vapi.output_acl_set_interface(
288 ip6_table_index=table_index)
289 self.assertIsNotNone(r, 'No response msg for acl_set_interface')
292 class TestClassifierIP6(TestClassifier):
293 """ Classifier IP6 Test Case """
297 super(TestClassifierIP6, cls).setUpClass()
300 def tearDownClass(cls):
301 super(TestClassifierIP6, cls).tearDownClass()
303 def test_iacl_src_ip(self):
304 """ Source IP6 iACL test
306 Test scenario for basic IP ACL with source IP
307 - Create IPv6 stream for pg0 -> pg1 interface.
308 - Create iACL with source IP address.
309 - Send and verify received packets on pg1 interface.
312 # Basic iACL testing with source IP
313 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
314 self.pg0.add_stream(pkts)
317 self.create_classify_table(
319 self.build_ip6_mask(src_ip='ffffffffffffffffffffffffffffffff'))
320 self.create_classify_session(
321 self.acl_tbl_idx.get(key),
322 self.build_ip6_match(src_ip=self.pg0.remote_ip6))
323 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
324 self.acl_active_table = key
326 self.pg_enable_capture(self.pg_interfaces)
329 pkts = self.pg1.get_capture(len(pkts))
330 self.verify_capture(self.pg1, pkts)
331 self.pg0.assert_nothing_captured(remark="packets forwarded")
332 self.pg2.assert_nothing_captured(remark="packets forwarded")
334 def test_iacl_dst_ip(self):
335 """ Destination IP6 iACL test
337 Test scenario for basic IP ACL with destination IP
338 - Create IPv6 stream for pg0 -> pg1 interface.
339 - Create iACL with destination IP address.
340 - Send and verify received packets on pg1 interface.
343 # Basic iACL testing with destination IP
344 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
345 self.pg0.add_stream(pkts)
348 self.create_classify_table(
350 self.build_ip6_mask(dst_ip='ffffffffffffffffffffffffffffffff'))
351 self.create_classify_session(
352 self.acl_tbl_idx.get(key),
353 self.build_ip6_match(dst_ip=self.pg1.remote_ip6))
354 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
355 self.acl_active_table = key
357 self.pg_enable_capture(self.pg_interfaces)
360 pkts = self.pg1.get_capture(len(pkts))
361 self.verify_capture(self.pg1, pkts)
362 self.pg0.assert_nothing_captured(remark="packets forwarded")
363 self.pg2.assert_nothing_captured(remark="packets forwarded")
365 def test_iacl_src_dst_ip(self):
366 """ Source and destination IP6 iACL test
368 Test scenario for basic IP ACL with source and destination IP
369 - Create IPv4 stream for pg0 -> pg1 interface.
370 - Create iACL with source and destination IP addresses.
371 - Send and verify received packets on pg1 interface.
374 # Basic iACL testing with source and destination IP
375 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
376 self.pg0.add_stream(pkts)
379 self.create_classify_table(
381 self.build_ip6_mask(src_ip='ffffffffffffffffffffffffffffffff',
382 dst_ip='ffffffffffffffffffffffffffffffff'))
383 self.create_classify_session(
384 self.acl_tbl_idx.get(key),
385 self.build_ip6_match(src_ip=self.pg0.remote_ip6,
386 dst_ip=self.pg1.remote_ip6))
387 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
388 self.acl_active_table = key
390 self.pg_enable_capture(self.pg_interfaces)
393 pkts = self.pg1.get_capture(len(pkts))
394 self.verify_capture(self.pg1, pkts)
395 self.pg0.assert_nothing_captured(remark="packets forwarded")
396 self.pg2.assert_nothing_captured(remark="packets forwarded")
399 # Tests split to different test case classes because of issue reported in
401 class TestClassifierIP6UDP(TestClassifier):
402 """ Classifier IP6 UDP proto Test Case """
406 super(TestClassifierIP6UDP, cls).setUpClass()
409 def tearDownClass(cls):
410 super(TestClassifierIP6UDP, cls).tearDownClass()
412 def test_iacl_proto_udp(self):
413 """ IP6 UDP protocol iACL test
415 Test scenario for basic protocol ACL with UDP protocol
416 - Create IPv6 stream for pg0 -> pg1 interface.
417 - Create iACL with UDP IP protocol.
418 - Send and verify received packets on pg1 interface.
421 # Basic iACL testing with UDP protocol
422 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
423 self.pg0.add_stream(pkts)
426 self.create_classify_table(key, self.build_ip6_mask(nh='ff'))
427 self.create_classify_session(
428 self.acl_tbl_idx.get(key),
429 self.build_ip6_match(nh=socket.IPPROTO_UDP))
430 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
431 self.acl_active_table = key
433 self.pg_enable_capture(self.pg_interfaces)
436 pkts = self.pg1.get_capture(len(pkts))
437 self.verify_capture(self.pg1, pkts)
438 self.pg0.assert_nothing_captured(remark="packets forwarded")
439 self.pg2.assert_nothing_captured(remark="packets forwarded")
441 def test_iacl_proto_udp_sport(self):
442 """ IP6 UDP source port iACL test
444 Test scenario for basic protocol ACL with UDP and sport
445 - Create IPv6 stream for pg0 -> pg1 interface.
446 - Create iACL with UDP IP protocol and defined sport.
447 - Send and verify received packets on pg1 interface.
450 # Basic iACL testing with UDP and sport
452 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
453 UDP(sport=sport, dport=5678))
454 self.pg0.add_stream(pkts)
457 self.create_classify_table(
458 key, self.build_ip6_mask(nh='ff', src_port='ffff'))
459 self.create_classify_session(
460 self.acl_tbl_idx.get(key),
461 self.build_ip6_match(nh=socket.IPPROTO_UDP, src_port=sport))
462 self.input_acl_set_interface(
463 self.pg0, self.acl_tbl_idx.get(key))
464 self.acl_active_table = key
466 self.pg_enable_capture(self.pg_interfaces)
469 pkts = self.pg1.get_capture(len(pkts))
470 self.verify_capture(self.pg1, pkts)
471 self.pg0.assert_nothing_captured(remark="packets forwarded")
472 self.pg2.assert_nothing_captured(remark="packets forwarded")
474 def test_iacl_proto_udp_dport(self):
475 """ IP6 UDP destination port iACL test
477 Test scenario for basic protocol ACL with UDP and dport
478 - Create IPv6 stream for pg0 -> pg1 interface.
479 - Create iACL with UDP IP protocol and defined dport.
480 - Send and verify received packets on pg1 interface.
483 # Basic iACL testing with UDP and dport
485 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
486 UDP(sport=1234, dport=dport))
487 self.pg0.add_stream(pkts)
490 self.create_classify_table(
491 key, self.build_ip6_mask(nh='ff', dst_port='ffff'))
492 self.create_classify_session(
493 self.acl_tbl_idx.get(key),
494 self.build_ip6_match(nh=socket.IPPROTO_UDP, dst_port=dport))
495 self.input_acl_set_interface(
496 self.pg0, self.acl_tbl_idx.get(key))
497 self.acl_active_table = key
499 self.pg_enable_capture(self.pg_interfaces)
502 pkts = self.pg1.get_capture(len(pkts))
503 self.verify_capture(self.pg1, pkts)
504 self.pg0.assert_nothing_captured(remark="packets forwarded")
505 self.pg2.assert_nothing_captured(remark="packets forwarded")
507 def test_iacl_proto_udp_sport_dport(self):
508 """ IP6 UDP source and destination ports iACL test
510 Test scenario for basic protocol ACL with UDP and sport and dport
511 - Create IPv6 stream for pg0 -> pg1 interface.
512 - Create iACL with UDP IP protocol and defined sport and dport.
513 - Send and verify received packets on pg1 interface.
516 # Basic iACL testing with UDP and sport and dport
519 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
520 UDP(sport=sport, dport=dport))
521 self.pg0.add_stream(pkts)
524 self.create_classify_table(
526 self.build_ip6_mask(nh='ff', src_port='ffff', dst_port='ffff'))
527 self.create_classify_session(
528 self.acl_tbl_idx.get(key),
529 self.build_ip6_match(nh=socket.IPPROTO_UDP, src_port=sport,
531 self.input_acl_set_interface(
532 self.pg0, self.acl_tbl_idx.get(key))
533 self.acl_active_table = key
535 self.pg_enable_capture(self.pg_interfaces)
538 pkts = self.pg1.get_capture(len(pkts))
539 self.verify_capture(self.pg1, pkts)
540 self.pg0.assert_nothing_captured(remark="packets forwarded")
541 self.pg2.assert_nothing_captured(remark="packets forwarded")
544 class TestClassifierIP6TCP(TestClassifier):
545 """ Classifier IP6 TCP proto Test Case """
549 super(TestClassifierIP6TCP, cls).setUpClass()
552 def tearDownClass(cls):
553 super(TestClassifierIP6TCP, cls).tearDownClass()
555 def test_iacl_proto_tcp(self):
556 """ IP6 TCP protocol iACL test
558 Test scenario for basic protocol ACL with TCP protocol
559 - Create IPv6 stream for pg0 -> pg1 interface.
560 - Create iACL with TCP IP protocol.
561 - Send and verify received packets on pg1 interface.
564 # Basic iACL testing with TCP protocol
565 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
566 TCP(sport=1234, dport=5678))
567 self.pg0.add_stream(pkts)
570 self.create_classify_table(key, self.build_ip6_mask(nh='ff'))
571 self.create_classify_session(
572 self.acl_tbl_idx.get(key),
573 self.build_ip6_match(nh=socket.IPPROTO_TCP))
574 self.input_acl_set_interface(
575 self.pg0, self.acl_tbl_idx.get(key))
576 self.acl_active_table = key
578 self.pg_enable_capture(self.pg_interfaces)
581 pkts = self.pg1.get_capture(len(pkts))
582 self.verify_capture(self.pg1, pkts, TCP)
583 self.pg0.assert_nothing_captured(remark="packets forwarded")
584 self.pg2.assert_nothing_captured(remark="packets forwarded")
586 def test_iacl_proto_tcp_sport(self):
587 """ IP6 TCP source port iACL test
589 Test scenario for basic protocol ACL with TCP and sport
590 - Create IPv6 stream for pg0 -> pg1 interface.
591 - Create iACL with TCP IP protocol and defined sport.
592 - Send and verify received packets on pg1 interface.
595 # Basic iACL testing with TCP and sport
597 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
598 TCP(sport=sport, dport=5678))
599 self.pg0.add_stream(pkts)
602 self.create_classify_table(
603 key, self.build_ip6_mask(nh='ff', src_port='ffff'))
604 self.create_classify_session(
605 self.acl_tbl_idx.get(key),
606 self.build_ip6_match(nh=socket.IPPROTO_TCP, src_port=sport))
607 self.input_acl_set_interface(
608 self.pg0, self.acl_tbl_idx.get(key))
609 self.acl_active_table = key
611 self.pg_enable_capture(self.pg_interfaces)
614 pkts = self.pg1.get_capture(len(pkts))
615 self.verify_capture(self.pg1, pkts, TCP)
616 self.pg0.assert_nothing_captured(remark="packets forwarded")
617 self.pg2.assert_nothing_captured(remark="packets forwarded")
619 def test_iacl_proto_tcp_dport(self):
620 """ IP6 TCP destination port iACL test
622 Test scenario for basic protocol ACL with TCP and dport
623 - Create IPv6 stream for pg0 -> pg1 interface.
624 - Create iACL with TCP IP protocol and defined dport.
625 - Send and verify received packets on pg1 interface.
628 # Basic iACL testing with TCP and dport
630 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
631 TCP(sport=1234, dport=dport))
632 self.pg0.add_stream(pkts)
635 self.create_classify_table(
636 key, self.build_ip6_mask(nh='ff', dst_port='ffff'))
637 self.create_classify_session(
638 self.acl_tbl_idx.get(key),
639 self.build_ip6_match(nh=socket.IPPROTO_TCP, dst_port=dport))
640 self.input_acl_set_interface(
641 self.pg0, self.acl_tbl_idx.get(key))
642 self.acl_active_table = key
644 self.pg_enable_capture(self.pg_interfaces)
647 pkts = self.pg1.get_capture(len(pkts))
648 self.verify_capture(self.pg1, pkts, TCP)
649 self.pg0.assert_nothing_captured(remark="packets forwarded")
650 self.pg2.assert_nothing_captured(remark="packets forwarded")
652 def test_iacl_proto_tcp_sport_dport(self):
653 """ IP6 TCP source and destination ports iACL test
655 Test scenario for basic protocol ACL with TCP and sport and dport
656 - Create IPv6 stream for pg0 -> pg1 interface.
657 - Create iACL with TCP IP protocol and defined sport and dport.
658 - Send and verify received packets on pg1 interface.
661 # Basic iACL testing with TCP and sport and dport
664 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
665 TCP(sport=sport, dport=dport))
666 self.pg0.add_stream(pkts)
669 self.create_classify_table(
671 self.build_ip6_mask(nh='ff', src_port='ffff', dst_port='ffff'))
672 self.create_classify_session(
673 self.acl_tbl_idx.get(key),
674 self.build_ip6_match(nh=socket.IPPROTO_TCP, src_port=sport,
676 self.input_acl_set_interface(
677 self.pg0, self.acl_tbl_idx.get(key))
678 self.acl_active_table = key
680 self.pg_enable_capture(self.pg_interfaces)
683 pkts = self.pg1.get_capture(len(pkts))
684 self.verify_capture(self.pg1, pkts, TCP)
685 self.pg0.assert_nothing_captured(remark="packets forwarded")
686 self.pg2.assert_nothing_captured(remark="packets forwarded")
689 class TestClassifierIP6Out(TestClassifier):
690 """ Classifier output IP6 Test Case """
694 super(TestClassifierIP6Out, cls).setUpClass()
697 def tearDownClass(cls):
698 super(TestClassifierIP6Out, cls).tearDownClass()
700 def test_acl_ip_out(self):
701 """ Output IP6 ACL test
703 Test scenario for basic IP ACL with source IP
704 - Create IPv6 stream for pg1 -> pg0 interface.
705 - Create ACL with source IP address.
706 - Send and verify received packets on pg0 interface.
709 # Basic oACL testing with source IP
710 pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes)
711 self.pg1.add_stream(pkts)
714 self.create_classify_table(
716 self.build_ip6_mask(src_ip='ffffffffffffffffffffffffffffffff'),
718 self.create_classify_session(
719 self.acl_tbl_idx.get(key),
720 self.build_ip6_match(src_ip=self.pg1.remote_ip6))
721 self.output_acl_set_interface(
722 self.pg0, self.acl_tbl_idx.get(key))
723 self.acl_active_table = key
725 self.pg_enable_capture(self.pg_interfaces)
728 pkts = self.pg0.get_capture(len(pkts))
729 self.verify_capture(self.pg0, pkts)
730 self.pg1.assert_nothing_captured(remark="packets forwarded")
731 self.pg2.assert_nothing_captured(remark="packets forwarded")
734 class TestClassifierIP6MAC(TestClassifier):
735 """ Classifier IP6 MAC Test Case """
739 super(TestClassifierIP6MAC, cls).setUpClass()
742 def tearDownClass(cls):
743 super(TestClassifierIP6MAC, cls).tearDownClass()
745 def test_acl_mac(self):
746 """ IP6 MAC iACL test
748 Test scenario for basic MAC ACL with source MAC
749 - Create IPv6 stream for pg0 -> pg2 interface.
750 - Create ACL with source MAC address.
751 - Send and verify received packets on pg2 interface.
754 # Basic iACL testing with source MAC
755 pkts = self.create_stream(self.pg0, self.pg2, self.pg_if_packet_sizes)
756 self.pg0.add_stream(pkts)
759 self.create_classify_table(
760 key, self.build_mac_mask(src_mac='ffffffffffff'), data_offset=-14)
761 self.create_classify_session(
762 self.acl_tbl_idx.get(key),
763 self.build_mac_match(src_mac=self.pg0.remote_mac))
764 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
765 self.acl_active_table = key
767 self.pg_enable_capture(self.pg_interfaces)
770 pkts = self.pg2.get_capture(len(pkts))
771 self.verify_capture(self.pg2, pkts)
772 self.pg0.assert_nothing_captured(remark="packets forwarded")
773 self.pg1.assert_nothing_captured(remark="packets forwarded")
776 if __name__ == '__main__':
777 unittest.main(testRunner=VppTestRunner)