8 from framework import VppTestCase, VppTestRunner
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether
12 from scapy.layers.inet6 import IPv6, UDP, TCP
16 class TestClassifier(VppTestCase):
17 """ Classifier Test Case """
22 Perform standard class setup (defined by class method setUpClass in
23 class VppTestCase) before running the test case, set test case related
24 variables and configure VPP.
26 super(TestClassifier, cls).setUpClass()
27 cls.acl_active_table = ''
30 def tearDownClass(cls):
31 super(TestClassifier, cls).tearDownClass()
35 Perform test setup before test case.
38 - create 4 pg interfaces
39 - untagged pg0/pg1/pg2 interface
40 pg0 -------> pg1 (IP ACL)
44 - put it into UP state
46 - resolve neighbor address using NDP
48 :ivar list interfaces: pg interfaces.
49 :ivar list pg_if_packet_sizes: packet sizes in test.
50 :ivar dict acl_tbl_idx: ACL table index.
51 :ivar int pbr_vrfid: VRF id for PBR test.
53 self.reset_packet_infos()
54 super(TestClassifier, self).setUp()
56 # create 4 pg interfaces
57 self.create_pg_interfaces(range(3))
59 # packet sizes to test
60 self.pg_if_packet_sizes = [64, 9018]
62 self.interfaces = list(self.pg_interfaces)
67 # setup all interfaces
68 for intf in self.interfaces:
74 """Run standard test teardown and acl related log."""
76 if self.acl_active_table == 'ip6_out':
77 self.output_acl_set_interface(
78 self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
79 self.acl_active_table = ''
80 elif self.acl_active_table != '':
81 self.input_acl_set_interface(
82 self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
83 self.acl_active_table = ''
84 for intf in self.interfaces:
88 super(TestClassifier, self).tearDown()
90 def show_commands_at_teardown(self):
91 self.logger.info(self.vapi.ppcli("show inacl type ip6"))
92 self.logger.info(self.vapi.ppcli("show outacl type ip6"))
93 self.logger.info(self.vapi.cli("show classify table verbose"))
94 self.logger.info(self.vapi.cli("show ip fib"))
96 def create_stream(self, src_if, dst_if, packet_sizes,
97 proto_l=UDP(sport=1234, dport=5678)):
98 """Create input packet stream for defined interfaces.
100 :param VppInterface src_if: Source Interface for packet stream.
101 :param VppInterface dst_if: Destination Interface for packet stream.
102 :param list packet_sizes: packet size to test.
103 :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
107 for size in packet_sizes:
108 info = self.create_packet_info(src_if, dst_if)
109 payload = self.info_to_payload(info)
110 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
111 IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6) /
115 self.extend_packet(p, size)
119 def verify_capture(self, dst_if, capture, proto_l=UDP):
120 """Verify captured input packet stream for defined interface.
122 :param VppInterface dst_if: Interface to verify captured packet stream.
123 :param list capture: Captured packet stream.
124 :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
126 self.logger.info("Verifying capture on interface %s" % dst_if.name)
128 for i in self.interfaces:
129 last_info[i.sw_if_index] = None
130 dst_sw_if_index = dst_if.sw_if_index
131 for packet in capture:
133 ip6_received = packet[IPv6]
134 proto_received = packet[proto_l]
135 payload_info = self.payload_to_info(packet[Raw])
136 packet_index = payload_info.index
137 self.assertEqual(payload_info.dst, dst_sw_if_index)
139 "Got packet on port %s: src=%u (id=%u)" %
140 (dst_if.name, payload_info.src, packet_index))
141 next_info = self.get_next_packet_info_for_interface2(
142 payload_info.src, dst_sw_if_index,
143 last_info[payload_info.src])
144 last_info[payload_info.src] = next_info
145 self.assertTrue(next_info is not None)
146 self.assertEqual(packet_index, next_info.index)
147 saved_packet = next_info.data
148 ip_saved = saved_packet[IPv6]
149 proto_saved = saved_packet[proto_l]
150 # Check standard fields
151 self.assertEqual(ip6_received.src, ip_saved.src)
152 self.assertEqual(ip6_received.dst, ip_saved.dst)
153 self.assertEqual(proto_received.sport, proto_saved.sport)
154 self.assertEqual(proto_received.dport, proto_saved.dport)
156 self.logger.error(ppp("Unexpected or invalid packet:", packet))
158 for i in self.interfaces:
159 remaining_packet = self.get_next_packet_info_for_interface2(
160 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
161 self.assertTrue(remaining_packet is None,
162 "Interface %s: Packet expected from interface %s "
163 "didn't arrive" % (dst_if.name, i.name))
166 def build_ip6_mask(nh='', src_ip='', dst_ip='',
167 src_port='', dst_port=''):
168 """Build IPv6 ACL mask data with hexstring format.
170 :param str nh: next header number <0-ff>
171 :param str src_ip: source ip address <0-ffffffff>
172 :param str dst_ip: destination ip address <0-ffffffff>
173 :param str src_port: source port number <0-ffff>
174 :param str dst_port: destination port number <0-ffff>
177 return ('{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}'.format(
178 nh, src_ip, dst_ip, src_port, dst_port)).rstrip('0')
181 def build_ip6_match(nh=0, src_ip='', dst_ip='',
182 src_port=0, dst_port=0):
183 """Build IPv6 ACL match data with hexstring format.
185 :param int nh: next header number with valid option "x"
186 :param str src_ip: source ip6 address with format of "xxx:xxxx::xxxx"
187 :param str dst_ip: destination ip6 address with format of
189 :param int src_port: source port number "x"
190 :param int dst_port: destination port number "x"
193 src_ip = binascii.hexlify(socket.inet_pton(
194 socket.AF_INET6, src_ip))
196 dst_ip = binascii.hexlify(socket.inet_pton(
197 socket.AF_INET6, dst_ip))
199 return ('{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}'.format(
200 hex(nh)[2:], src_ip, dst_ip, hex(src_port)[2:],
201 hex(dst_port)[2:])).rstrip('0')
204 def build_mac_mask(dst_mac='', src_mac='', ether_type=''):
205 """Build MAC ACL mask data with hexstring format.
207 :param str dst_mac: source MAC address <0-ffffffffffff>
208 :param str src_mac: destination MAC address <0-ffffffffffff>
209 :param str ether_type: ethernet type <0-ffff>
212 return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
213 dst_mac, src_mac, ether_type)).rstrip('0')
216 def build_mac_match(dst_mac='', src_mac='', ether_type=''):
217 """Build MAC ACL match data with hexstring format.
219 :param str dst_mac: source MAC address <x:x:x:x:x:x>
220 :param str src_mac: destination MAC address <x:x:x:x:x:x>
221 :param str ether_type: ethernet type <0-ffff>
224 dst_mac = dst_mac.replace(':', '')
226 src_mac = src_mac.replace(':', '')
228 return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
229 dst_mac, src_mac, ether_type)).rstrip('0')
231 def create_classify_table(self, key, mask, data_offset=0):
232 """Create Classify Table
234 :param str key: key for classify table (ex, ACL name).
235 :param str mask: mask value for interested traffic.
236 :param int data_offset:
238 r = self.vapi.classify_add_del_table(
240 mask=binascii.unhexlify(mask),
241 match_n_vectors=(len(mask) - 1) // 32 + 1,
244 current_data_offset=data_offset)
245 self.assertIsNotNone(r, 'No response msg for add_del_table')
246 self.acl_tbl_idx[key] = r.new_table_index
248 def create_classify_session(self, table_index, match, vrfid=0, is_add=1):
249 """Create Classify Session
251 :param int table_index: table index to identify classify table.
252 :param str match: matched value for interested traffic.
253 :param int vrfid: VRF id.
254 :param int is_add: option to configure classify session.
255 - create(1) or delete(0)
257 r = self.vapi.classify_add_del_session(
260 binascii.unhexlify(match),
263 self.assertIsNotNone(r, 'No response msg for add_del_session')
265 def input_acl_set_interface(self, intf, table_index, is_add=1):
266 """Configure Input ACL interface
268 :param VppInterface intf: Interface to apply Input ACL feature.
269 :param int table_index: table index to identify classify table.
270 :param int is_add: option to configure classify session.
271 - enable(1) or disable(0)
273 r = self.vapi.input_acl_set_interface(
276 ip6_table_index=table_index)
277 self.assertIsNotNone(r, 'No response msg for acl_set_interface')
279 def output_acl_set_interface(self, intf, table_index, is_add=1):
280 """Configure Output ACL interface
282 :param VppInterface intf: Interface to apply Output ACL feature.
283 :param int table_index: table index to identify classify table.
284 :param int is_add: option to configure classify session.
285 - enable(1) or disable(0)
287 r = self.vapi.output_acl_set_interface(
290 ip6_table_index=table_index)
291 self.assertIsNotNone(r, 'No response msg for acl_set_interface')
294 class TestClassifierIP6(TestClassifier):
295 """ Classifier IP6 Test Case """
299 super(TestClassifierIP6, cls).setUpClass()
302 def tearDownClass(cls):
303 super(TestClassifierIP6, cls).tearDownClass()
305 def test_iacl_src_ip(self):
306 """ Source IP6 iACL test
308 Test scenario for basic IP ACL with source IP
309 - Create IPv6 stream for pg0 -> pg1 interface.
310 - Create iACL with source IP address.
311 - Send and verify received packets on pg1 interface.
314 # Basic iACL testing with source IP
315 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
316 self.pg0.add_stream(pkts)
319 self.create_classify_table(
321 self.build_ip6_mask(src_ip='ffffffffffffffffffffffffffffffff'))
322 self.create_classify_session(
323 self.acl_tbl_idx.get(key),
324 self.build_ip6_match(src_ip=self.pg0.remote_ip6))
325 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
326 self.acl_active_table = key
328 self.pg_enable_capture(self.pg_interfaces)
331 pkts = self.pg1.get_capture(len(pkts))
332 self.verify_capture(self.pg1, pkts)
333 self.pg0.assert_nothing_captured(remark="packets forwarded")
334 self.pg2.assert_nothing_captured(remark="packets forwarded")
336 def test_iacl_dst_ip(self):
337 """ Destination IP6 iACL test
339 Test scenario for basic IP ACL with destination IP
340 - Create IPv6 stream for pg0 -> pg1 interface.
341 - Create iACL with destination IP address.
342 - Send and verify received packets on pg1 interface.
345 # Basic iACL testing with destination IP
346 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
347 self.pg0.add_stream(pkts)
350 self.create_classify_table(
352 self.build_ip6_mask(dst_ip='ffffffffffffffffffffffffffffffff'))
353 self.create_classify_session(
354 self.acl_tbl_idx.get(key),
355 self.build_ip6_match(dst_ip=self.pg1.remote_ip6))
356 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
357 self.acl_active_table = key
359 self.pg_enable_capture(self.pg_interfaces)
362 pkts = self.pg1.get_capture(len(pkts))
363 self.verify_capture(self.pg1, pkts)
364 self.pg0.assert_nothing_captured(remark="packets forwarded")
365 self.pg2.assert_nothing_captured(remark="packets forwarded")
367 def test_iacl_src_dst_ip(self):
368 """ Source and destination IP6 iACL test
370 Test scenario for basic IP ACL with source and destination IP
371 - Create IPv4 stream for pg0 -> pg1 interface.
372 - Create iACL with source and destination IP addresses.
373 - Send and verify received packets on pg1 interface.
376 # Basic iACL testing with source and destination IP
377 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
378 self.pg0.add_stream(pkts)
381 self.create_classify_table(
383 self.build_ip6_mask(src_ip='ffffffffffffffffffffffffffffffff',
384 dst_ip='ffffffffffffffffffffffffffffffff'))
385 self.create_classify_session(
386 self.acl_tbl_idx.get(key),
387 self.build_ip6_match(src_ip=self.pg0.remote_ip6,
388 dst_ip=self.pg1.remote_ip6))
389 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
390 self.acl_active_table = key
392 self.pg_enable_capture(self.pg_interfaces)
395 pkts = self.pg1.get_capture(len(pkts))
396 self.verify_capture(self.pg1, pkts)
397 self.pg0.assert_nothing_captured(remark="packets forwarded")
398 self.pg2.assert_nothing_captured(remark="packets forwarded")
401 # Tests split to different test case classes because of issue reported in
403 class TestClassifierIP6UDP(TestClassifier):
404 """ Classifier IP6 UDP proto Test Case """
408 super(TestClassifierIP6UDP, cls).setUpClass()
411 def tearDownClass(cls):
412 super(TestClassifierIP6UDP, cls).tearDownClass()
414 def test_iacl_proto_udp(self):
415 """ IP6 UDP protocol iACL test
417 Test scenario for basic protocol ACL with UDP protocol
418 - Create IPv6 stream for pg0 -> pg1 interface.
419 - Create iACL with UDP IP protocol.
420 - Send and verify received packets on pg1 interface.
423 # Basic iACL testing with UDP protocol
424 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
425 self.pg0.add_stream(pkts)
428 self.create_classify_table(key, self.build_ip6_mask(nh='ff'))
429 self.create_classify_session(
430 self.acl_tbl_idx.get(key),
431 self.build_ip6_match(nh=socket.IPPROTO_UDP))
432 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
433 self.acl_active_table = key
435 self.pg_enable_capture(self.pg_interfaces)
438 pkts = self.pg1.get_capture(len(pkts))
439 self.verify_capture(self.pg1, pkts)
440 self.pg0.assert_nothing_captured(remark="packets forwarded")
441 self.pg2.assert_nothing_captured(remark="packets forwarded")
443 def test_iacl_proto_udp_sport(self):
444 """ IP6 UDP source port iACL test
446 Test scenario for basic protocol ACL with UDP and sport
447 - Create IPv6 stream for pg0 -> pg1 interface.
448 - Create iACL with UDP IP protocol and defined sport.
449 - Send and verify received packets on pg1 interface.
452 # Basic iACL testing with UDP and sport
454 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
455 UDP(sport=sport, dport=5678))
456 self.pg0.add_stream(pkts)
459 self.create_classify_table(
460 key, self.build_ip6_mask(nh='ff', src_port='ffff'))
461 self.create_classify_session(
462 self.acl_tbl_idx.get(key),
463 self.build_ip6_match(nh=socket.IPPROTO_UDP, src_port=sport))
464 self.input_acl_set_interface(
465 self.pg0, self.acl_tbl_idx.get(key))
466 self.acl_active_table = key
468 self.pg_enable_capture(self.pg_interfaces)
471 pkts = self.pg1.get_capture(len(pkts))
472 self.verify_capture(self.pg1, pkts)
473 self.pg0.assert_nothing_captured(remark="packets forwarded")
474 self.pg2.assert_nothing_captured(remark="packets forwarded")
476 def test_iacl_proto_udp_dport(self):
477 """ IP6 UDP destination port iACL test
479 Test scenario for basic protocol ACL with UDP and dport
480 - Create IPv6 stream for pg0 -> pg1 interface.
481 - Create iACL with UDP IP protocol and defined dport.
482 - Send and verify received packets on pg1 interface.
485 # Basic iACL testing with UDP and dport
487 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
488 UDP(sport=1234, dport=dport))
489 self.pg0.add_stream(pkts)
492 self.create_classify_table(
493 key, self.build_ip6_mask(nh='ff', dst_port='ffff'))
494 self.create_classify_session(
495 self.acl_tbl_idx.get(key),
496 self.build_ip6_match(nh=socket.IPPROTO_UDP, dst_port=dport))
497 self.input_acl_set_interface(
498 self.pg0, self.acl_tbl_idx.get(key))
499 self.acl_active_table = key
501 self.pg_enable_capture(self.pg_interfaces)
504 pkts = self.pg1.get_capture(len(pkts))
505 self.verify_capture(self.pg1, pkts)
506 self.pg0.assert_nothing_captured(remark="packets forwarded")
507 self.pg2.assert_nothing_captured(remark="packets forwarded")
509 def test_iacl_proto_udp_sport_dport(self):
510 """ IP6 UDP source and destination ports iACL test
512 Test scenario for basic protocol ACL with UDP and sport and dport
513 - Create IPv6 stream for pg0 -> pg1 interface.
514 - Create iACL with UDP IP protocol and defined sport and dport.
515 - Send and verify received packets on pg1 interface.
518 # Basic iACL testing with UDP and sport and dport
521 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
522 UDP(sport=sport, dport=dport))
523 self.pg0.add_stream(pkts)
526 self.create_classify_table(
528 self.build_ip6_mask(nh='ff', src_port='ffff', dst_port='ffff'))
529 self.create_classify_session(
530 self.acl_tbl_idx.get(key),
531 self.build_ip6_match(nh=socket.IPPROTO_UDP, src_port=sport,
533 self.input_acl_set_interface(
534 self.pg0, self.acl_tbl_idx.get(key))
535 self.acl_active_table = key
537 self.pg_enable_capture(self.pg_interfaces)
540 pkts = self.pg1.get_capture(len(pkts))
541 self.verify_capture(self.pg1, pkts)
542 self.pg0.assert_nothing_captured(remark="packets forwarded")
543 self.pg2.assert_nothing_captured(remark="packets forwarded")
546 class TestClassifierIP6TCP(TestClassifier):
547 """ Classifier IP6 TCP proto Test Case """
551 super(TestClassifierIP6TCP, cls).setUpClass()
554 def tearDownClass(cls):
555 super(TestClassifierIP6TCP, cls).tearDownClass()
557 def test_iacl_proto_tcp(self):
558 """ IP6 TCP protocol iACL test
560 Test scenario for basic protocol ACL with TCP protocol
561 - Create IPv6 stream for pg0 -> pg1 interface.
562 - Create iACL with TCP IP protocol.
563 - Send and verify received packets on pg1 interface.
566 # Basic iACL testing with TCP protocol
567 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
568 TCP(sport=1234, dport=5678))
569 self.pg0.add_stream(pkts)
572 self.create_classify_table(key, self.build_ip6_mask(nh='ff'))
573 self.create_classify_session(
574 self.acl_tbl_idx.get(key),
575 self.build_ip6_match(nh=socket.IPPROTO_TCP))
576 self.input_acl_set_interface(
577 self.pg0, self.acl_tbl_idx.get(key))
578 self.acl_active_table = key
580 self.pg_enable_capture(self.pg_interfaces)
583 pkts = self.pg1.get_capture(len(pkts))
584 self.verify_capture(self.pg1, pkts, TCP)
585 self.pg0.assert_nothing_captured(remark="packets forwarded")
586 self.pg2.assert_nothing_captured(remark="packets forwarded")
588 def test_iacl_proto_tcp_sport(self):
589 """ IP6 TCP source port iACL test
591 Test scenario for basic protocol ACL with TCP and sport
592 - Create IPv6 stream for pg0 -> pg1 interface.
593 - Create iACL with TCP IP protocol and defined sport.
594 - Send and verify received packets on pg1 interface.
597 # Basic iACL testing with TCP and sport
599 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
600 TCP(sport=sport, dport=5678))
601 self.pg0.add_stream(pkts)
604 self.create_classify_table(
605 key, self.build_ip6_mask(nh='ff', src_port='ffff'))
606 self.create_classify_session(
607 self.acl_tbl_idx.get(key),
608 self.build_ip6_match(nh=socket.IPPROTO_TCP, src_port=sport))
609 self.input_acl_set_interface(
610 self.pg0, self.acl_tbl_idx.get(key))
611 self.acl_active_table = key
613 self.pg_enable_capture(self.pg_interfaces)
616 pkts = self.pg1.get_capture(len(pkts))
617 self.verify_capture(self.pg1, pkts, TCP)
618 self.pg0.assert_nothing_captured(remark="packets forwarded")
619 self.pg2.assert_nothing_captured(remark="packets forwarded")
621 def test_iacl_proto_tcp_dport(self):
622 """ IP6 TCP destination port iACL test
624 Test scenario for basic protocol ACL with TCP and dport
625 - Create IPv6 stream for pg0 -> pg1 interface.
626 - Create iACL with TCP IP protocol and defined dport.
627 - Send and verify received packets on pg1 interface.
630 # Basic iACL testing with TCP and dport
632 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
633 TCP(sport=1234, dport=dport))
634 self.pg0.add_stream(pkts)
637 self.create_classify_table(
638 key, self.build_ip6_mask(nh='ff', dst_port='ffff'))
639 self.create_classify_session(
640 self.acl_tbl_idx.get(key),
641 self.build_ip6_match(nh=socket.IPPROTO_TCP, dst_port=dport))
642 self.input_acl_set_interface(
643 self.pg0, self.acl_tbl_idx.get(key))
644 self.acl_active_table = key
646 self.pg_enable_capture(self.pg_interfaces)
649 pkts = self.pg1.get_capture(len(pkts))
650 self.verify_capture(self.pg1, pkts, TCP)
651 self.pg0.assert_nothing_captured(remark="packets forwarded")
652 self.pg2.assert_nothing_captured(remark="packets forwarded")
654 def test_iacl_proto_tcp_sport_dport(self):
655 """ IP6 TCP source and destination ports iACL test
657 Test scenario for basic protocol ACL with TCP and sport and dport
658 - Create IPv6 stream for pg0 -> pg1 interface.
659 - Create iACL with TCP IP protocol and defined sport and dport.
660 - Send and verify received packets on pg1 interface.
663 # Basic iACL testing with TCP and sport and dport
666 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
667 TCP(sport=sport, dport=dport))
668 self.pg0.add_stream(pkts)
671 self.create_classify_table(
673 self.build_ip6_mask(nh='ff', src_port='ffff', dst_port='ffff'))
674 self.create_classify_session(
675 self.acl_tbl_idx.get(key),
676 self.build_ip6_match(nh=socket.IPPROTO_TCP, src_port=sport,
678 self.input_acl_set_interface(
679 self.pg0, self.acl_tbl_idx.get(key))
680 self.acl_active_table = key
682 self.pg_enable_capture(self.pg_interfaces)
685 pkts = self.pg1.get_capture(len(pkts))
686 self.verify_capture(self.pg1, pkts, TCP)
687 self.pg0.assert_nothing_captured(remark="packets forwarded")
688 self.pg2.assert_nothing_captured(remark="packets forwarded")
691 class TestClassifierIP6Out(TestClassifier):
692 """ Classifier output IP6 Test Case """
696 super(TestClassifierIP6Out, cls).setUpClass()
699 def tearDownClass(cls):
700 super(TestClassifierIP6Out, cls).tearDownClass()
702 def test_acl_ip_out(self):
703 """ Output IP6 ACL test
705 Test scenario for basic IP ACL with source IP
706 - Create IPv6 stream for pg1 -> pg0 interface.
707 - Create ACL with source IP address.
708 - Send and verify received packets on pg0 interface.
711 # Basic oACL testing with source IP
712 pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes)
713 self.pg1.add_stream(pkts)
716 self.create_classify_table(
718 self.build_ip6_mask(src_ip='ffffffffffffffffffffffffffffffff'),
720 self.create_classify_session(
721 self.acl_tbl_idx.get(key),
722 self.build_ip6_match(src_ip=self.pg1.remote_ip6))
723 self.output_acl_set_interface(
724 self.pg0, self.acl_tbl_idx.get(key))
725 self.acl_active_table = key
727 self.pg_enable_capture(self.pg_interfaces)
730 pkts = self.pg0.get_capture(len(pkts))
731 self.verify_capture(self.pg0, pkts)
732 self.pg1.assert_nothing_captured(remark="packets forwarded")
733 self.pg2.assert_nothing_captured(remark="packets forwarded")
736 class TestClassifierIP6MAC(TestClassifier):
737 """ Classifier IP6 MAC Test Case """
741 super(TestClassifierIP6MAC, cls).setUpClass()
744 def tearDownClass(cls):
745 super(TestClassifierIP6MAC, cls).tearDownClass()
747 def test_acl_mac(self):
748 """ IP6 MAC iACL test
750 Test scenario for basic MAC ACL with source MAC
751 - Create IPv6 stream for pg0 -> pg2 interface.
752 - Create ACL with source MAC address.
753 - Send and verify received packets on pg2 interface.
756 # Basic iACL testing with source MAC
757 pkts = self.create_stream(self.pg0, self.pg2, self.pg_if_packet_sizes)
758 self.pg0.add_stream(pkts)
761 self.create_classify_table(
762 key, self.build_mac_mask(src_mac='ffffffffffff'), data_offset=-14)
763 self.create_classify_session(
764 self.acl_tbl_idx.get(key),
765 self.build_mac_match(src_mac=self.pg0.remote_mac))
766 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
767 self.acl_active_table = key
769 self.pg_enable_capture(self.pg_interfaces)
772 pkts = self.pg2.get_capture(len(pkts))
773 self.verify_capture(self.pg2, pkts)
774 self.pg0.assert_nothing_captured(remark="packets forwarded")
775 self.pg1.assert_nothing_captured(remark="packets forwarded")
778 if __name__ == '__main__':
779 unittest.main(testRunner=VppTestRunner)