8 from framework import VppTestCase, VppTestRunner
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether
12 from scapy.layers.inet6 import IPv6, UDP, TCP
16 class TestClassifier(VppTestCase):
17 """ Classifier Test Case """
22 Perform standard class setup (defined by class method setUpClass in
23 class VppTestCase) before running the test case, set test case related
24 variables and configure VPP.
26 super(TestClassifier, cls).setUpClass()
27 cls.acl_active_table = ''
30 def tearDownClass(cls):
31 super(TestClassifier, cls).tearDownClass()
35 Perform test setup before test case.
38 - create 4 pg interfaces
39 - untagged pg0/pg1/pg2 interface
40 pg0 -------> pg1 (IP ACL)
44 - put it into UP state
46 - resolve neighbor address using NDP
48 :ivar list interfaces: pg interfaces.
49 :ivar list pg_if_packet_sizes: packet sizes in test.
50 :ivar dict acl_tbl_idx: ACL table index.
51 :ivar int pbr_vrfid: VRF id for PBR test.
53 self.reset_packet_infos()
54 super(TestClassifier, self).setUp()
56 # create 4 pg interfaces
57 self.create_pg_interfaces(range(3))
59 # packet sizes to test
60 self.pg_if_packet_sizes = [64, 9018]
62 self.interfaces = list(self.pg_interfaces)
67 # setup all interfaces
68 for intf in self.interfaces:
74 """Run standard test teardown and acl related log."""
76 if self.acl_active_table == 'ip6_out':
77 self.output_acl_set_interface(
78 self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
79 self.acl_active_table = ''
80 elif self.acl_active_table != '':
81 self.input_acl_set_interface(
82 self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
83 self.acl_active_table = ''
84 for intf in self.interfaces:
88 super(TestClassifier, self).tearDown()
90 def show_commands_at_teardown(self):
91 self.logger.info(self.vapi.ppcli("show inacl type ip6"))
92 self.logger.info(self.vapi.ppcli("show outacl type ip6"))
93 self.logger.info(self.vapi.cli("show classify table verbose"))
94 self.logger.info(self.vapi.cli("show ip fib"))
96 def create_stream(self, src_if, dst_if, packet_sizes,
97 proto_l=UDP(sport=1234, dport=5678)):
98 """Create input packet stream for defined interfaces.
100 :param VppInterface src_if: Source Interface for packet stream.
101 :param VppInterface dst_if: Destination Interface for packet stream.
102 :param list packet_sizes: packet size to test.
103 :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
107 for size in packet_sizes:
108 info = self.create_packet_info(src_if, dst_if)
109 payload = self.info_to_payload(info)
110 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
111 IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6) /
115 self.extend_packet(p, size)
119 def verify_capture(self, dst_if, capture, proto_l=UDP):
120 """Verify captured input packet stream for defined interface.
122 :param VppInterface dst_if: Interface to verify captured packet stream.
123 :param list capture: Captured packet stream.
124 :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
126 self.logger.info("Verifying capture on interface %s" % dst_if.name)
128 for i in self.interfaces:
129 last_info[i.sw_if_index] = None
130 dst_sw_if_index = dst_if.sw_if_index
131 for packet in capture:
133 ip6_received = packet[IPv6]
134 proto_received = packet[proto_l]
135 payload_info = self.payload_to_info(packet[Raw])
136 packet_index = payload_info.index
137 self.assertEqual(payload_info.dst, dst_sw_if_index)
139 "Got packet on port %s: src=%u (id=%u)" %
140 (dst_if.name, payload_info.src, packet_index))
141 next_info = self.get_next_packet_info_for_interface2(
142 payload_info.src, dst_sw_if_index,
143 last_info[payload_info.src])
144 last_info[payload_info.src] = next_info
145 self.assertTrue(next_info is not None)
146 self.assertEqual(packet_index, next_info.index)
147 saved_packet = next_info.data
148 ip_saved = saved_packet[IPv6]
149 proto_saved = saved_packet[proto_l]
150 # Check standard fields
151 self.assertEqual(ip6_received.src, ip_saved.src)
152 self.assertEqual(ip6_received.dst, ip_saved.dst)
153 self.assertEqual(proto_received.sport, proto_saved.sport)
154 self.assertEqual(proto_received.dport, proto_saved.dport)
156 self.logger.error(ppp("Unexpected or invalid packet:", packet))
158 for i in self.interfaces:
159 remaining_packet = self.get_next_packet_info_for_interface2(
160 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
161 self.assertTrue(remaining_packet is None,
162 "Interface %s: Packet expected from interface %s "
163 "didn't arrive" % (dst_if.name, i.name))
166 def build_ip6_mask(nh='', src_ip='', dst_ip='',
167 src_port='', dst_port=''):
168 """Build IPv6 ACL mask data with hexstring format.
170 :param str nh: next header number <0-ff>
171 :param str src_ip: source ip address <0-ffffffff>
172 :param str dst_ip: destination ip address <0-ffffffff>
173 :param str src_port: source port number <0-ffff>
174 :param str dst_port: destination port number <0-ffff>
177 return ('{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}'.format(
178 nh, src_ip, dst_ip, src_port, dst_port)).rstrip('0')
181 def build_ip6_match(nh=0, src_ip='', dst_ip='',
182 src_port=0, dst_port=0):
183 """Build IPv6 ACL match data with hexstring format.
185 :param int nh: next header number with valid option "x"
186 :param str src_ip: source ip6 address with format of "xxx:xxxx::xxxx"
187 :param str dst_ip: destination ip6 address with format of
189 :param int src_port: source port number "x"
190 :param int dst_port: destination port number "x"
193 if sys.version_info[0] == 2:
194 src_ip = binascii.hexlify(socket.inet_pton(
195 socket.AF_INET6, src_ip))
197 src_ip = socket.inet_pton(socket.AF_INET6, src_ip).hex()
200 if sys.version_info[0] == 2:
201 dst_ip = binascii.hexlify(socket.inet_pton(
202 socket.AF_INET6, dst_ip))
204 dst_ip = socket.inet_pton(socket.AF_INET6, dst_ip).hex()
206 return ('{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}'.format(
207 hex(nh)[2:], src_ip, dst_ip, hex(src_port)[2:],
208 hex(dst_port)[2:])).rstrip('0')
211 def build_mac_mask(dst_mac='', src_mac='', ether_type=''):
212 """Build MAC ACL mask data with hexstring format.
214 :param str dst_mac: source MAC address <0-ffffffffffff>
215 :param str src_mac: destination MAC address <0-ffffffffffff>
216 :param str ether_type: ethernet type <0-ffff>
219 return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
220 dst_mac, src_mac, ether_type)).rstrip('0')
223 def build_mac_match(dst_mac='', src_mac='', ether_type=''):
224 """Build MAC ACL match data with hexstring format.
226 :param str dst_mac: source MAC address <x:x:x:x:x:x>
227 :param str src_mac: destination MAC address <x:x:x:x:x:x>
228 :param str ether_type: ethernet type <0-ffff>
231 dst_mac = dst_mac.replace(':', '')
233 src_mac = src_mac.replace(':', '')
235 return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
236 dst_mac, src_mac, ether_type)).rstrip('0')
238 def create_classify_table(self, key, mask, data_offset=0):
239 """Create Classify Table
241 :param str key: key for classify table (ex, ACL name).
242 :param str mask: mask value for interested traffic.
243 :param int data_offset:
245 r = self.vapi.classify_add_del_table(
247 mask=binascii.unhexlify(mask),
248 match_n_vectors=(len(mask) - 1) // 32 + 1,
251 current_data_offset=data_offset)
252 self.assertIsNotNone(r, 'No response msg for add_del_table')
253 self.acl_tbl_idx[key] = r.new_table_index
255 def create_classify_session(self, table_index, match, vrfid=0, is_add=1):
256 """Create Classify Session
258 :param int table_index: table index to identify classify table.
259 :param str match: matched value for interested traffic.
260 :param int vrfid: VRF id.
261 :param int is_add: option to configure classify session.
262 - create(1) or delete(0)
264 r = self.vapi.classify_add_del_session(
267 binascii.unhexlify(match),
270 self.assertIsNotNone(r, 'No response msg for add_del_session')
272 def input_acl_set_interface(self, intf, table_index, is_add=1):
273 """Configure Input ACL interface
275 :param VppInterface intf: Interface to apply Input ACL feature.
276 :param int table_index: table index to identify classify table.
277 :param int is_add: option to configure classify session.
278 - enable(1) or disable(0)
280 r = self.vapi.input_acl_set_interface(
283 ip6_table_index=table_index)
284 self.assertIsNotNone(r, 'No response msg for acl_set_interface')
286 def output_acl_set_interface(self, intf, table_index, is_add=1):
287 """Configure Output ACL interface
289 :param VppInterface intf: Interface to apply Output ACL feature.
290 :param int table_index: table index to identify classify table.
291 :param int is_add: option to configure classify session.
292 - enable(1) or disable(0)
294 r = self.vapi.output_acl_set_interface(
297 ip6_table_index=table_index)
298 self.assertIsNotNone(r, 'No response msg for acl_set_interface')
301 class TestClassifierIP6(TestClassifier):
302 """ Classifier IP6 Test Case """
306 super(TestClassifierIP6, cls).setUpClass()
309 def tearDownClass(cls):
310 super(TestClassifierIP6, cls).tearDownClass()
312 def test_iacl_src_ip(self):
313 """ Source IP6 iACL test
315 Test scenario for basic IP ACL with source IP
316 - Create IPv6 stream for pg0 -> pg1 interface.
317 - Create iACL with source IP address.
318 - Send and verify received packets on pg1 interface.
321 # Basic iACL testing with source IP
322 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
323 self.pg0.add_stream(pkts)
326 self.create_classify_table(
328 self.build_ip6_mask(src_ip='ffffffffffffffffffffffffffffffff'))
329 self.create_classify_session(
330 self.acl_tbl_idx.get(key),
331 self.build_ip6_match(src_ip=self.pg0.remote_ip6))
332 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
333 self.acl_active_table = key
335 self.pg_enable_capture(self.pg_interfaces)
338 pkts = self.pg1.get_capture(len(pkts))
339 self.verify_capture(self.pg1, pkts)
340 self.pg0.assert_nothing_captured(remark="packets forwarded")
341 self.pg2.assert_nothing_captured(remark="packets forwarded")
343 def test_iacl_dst_ip(self):
344 """ Destination IP6 iACL test
346 Test scenario for basic IP ACL with destination IP
347 - Create IPv6 stream for pg0 -> pg1 interface.
348 - Create iACL with destination IP address.
349 - Send and verify received packets on pg1 interface.
352 # Basic iACL testing with destination IP
353 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
354 self.pg0.add_stream(pkts)
357 self.create_classify_table(
359 self.build_ip6_mask(dst_ip='ffffffffffffffffffffffffffffffff'))
360 self.create_classify_session(
361 self.acl_tbl_idx.get(key),
362 self.build_ip6_match(dst_ip=self.pg1.remote_ip6))
363 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
364 self.acl_active_table = key
366 self.pg_enable_capture(self.pg_interfaces)
369 pkts = self.pg1.get_capture(len(pkts))
370 self.verify_capture(self.pg1, pkts)
371 self.pg0.assert_nothing_captured(remark="packets forwarded")
372 self.pg2.assert_nothing_captured(remark="packets forwarded")
374 def test_iacl_src_dst_ip(self):
375 """ Source and destination IP6 iACL test
377 Test scenario for basic IP ACL with source and destination IP
378 - Create IPv4 stream for pg0 -> pg1 interface.
379 - Create iACL with source and destination IP addresses.
380 - Send and verify received packets on pg1 interface.
383 # Basic iACL testing with source and destination IP
384 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
385 self.pg0.add_stream(pkts)
388 self.create_classify_table(
390 self.build_ip6_mask(src_ip='ffffffffffffffffffffffffffffffff',
391 dst_ip='ffffffffffffffffffffffffffffffff'))
392 self.create_classify_session(
393 self.acl_tbl_idx.get(key),
394 self.build_ip6_match(src_ip=self.pg0.remote_ip6,
395 dst_ip=self.pg1.remote_ip6))
396 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
397 self.acl_active_table = key
399 self.pg_enable_capture(self.pg_interfaces)
402 pkts = self.pg1.get_capture(len(pkts))
403 self.verify_capture(self.pg1, pkts)
404 self.pg0.assert_nothing_captured(remark="packets forwarded")
405 self.pg2.assert_nothing_captured(remark="packets forwarded")
408 # Tests split to different test case classes because of issue reported in
410 class TestClassifierIP6UDP(TestClassifier):
411 """ Classifier IP6 UDP proto Test Case """
415 super(TestClassifierIP6UDP, cls).setUpClass()
418 def tearDownClass(cls):
419 super(TestClassifierIP6UDP, cls).tearDownClass()
421 def test_iacl_proto_udp(self):
422 """ IP6 UDP protocol iACL test
424 Test scenario for basic protocol ACL with UDP protocol
425 - Create IPv6 stream for pg0 -> pg1 interface.
426 - Create iACL with UDP IP protocol.
427 - Send and verify received packets on pg1 interface.
430 # Basic iACL testing with UDP protocol
431 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
432 self.pg0.add_stream(pkts)
435 self.create_classify_table(key, self.build_ip6_mask(nh='ff'))
436 self.create_classify_session(
437 self.acl_tbl_idx.get(key),
438 self.build_ip6_match(nh=socket.IPPROTO_UDP))
439 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
440 self.acl_active_table = key
442 self.pg_enable_capture(self.pg_interfaces)
445 pkts = self.pg1.get_capture(len(pkts))
446 self.verify_capture(self.pg1, pkts)
447 self.pg0.assert_nothing_captured(remark="packets forwarded")
448 self.pg2.assert_nothing_captured(remark="packets forwarded")
450 def test_iacl_proto_udp_sport(self):
451 """ IP6 UDP source port iACL test
453 Test scenario for basic protocol ACL with UDP and sport
454 - Create IPv6 stream for pg0 -> pg1 interface.
455 - Create iACL with UDP IP protocol and defined sport.
456 - Send and verify received packets on pg1 interface.
459 # Basic iACL testing with UDP and sport
461 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
462 UDP(sport=sport, dport=5678))
463 self.pg0.add_stream(pkts)
466 self.create_classify_table(
467 key, self.build_ip6_mask(nh='ff', src_port='ffff'))
468 self.create_classify_session(
469 self.acl_tbl_idx.get(key),
470 self.build_ip6_match(nh=socket.IPPROTO_UDP, src_port=sport))
471 self.input_acl_set_interface(
472 self.pg0, self.acl_tbl_idx.get(key))
473 self.acl_active_table = key
475 self.pg_enable_capture(self.pg_interfaces)
478 pkts = self.pg1.get_capture(len(pkts))
479 self.verify_capture(self.pg1, pkts)
480 self.pg0.assert_nothing_captured(remark="packets forwarded")
481 self.pg2.assert_nothing_captured(remark="packets forwarded")
483 def test_iacl_proto_udp_dport(self):
484 """ IP6 UDP destination port iACL test
486 Test scenario for basic protocol ACL with UDP and dport
487 - Create IPv6 stream for pg0 -> pg1 interface.
488 - Create iACL with UDP IP protocol and defined dport.
489 - Send and verify received packets on pg1 interface.
492 # Basic iACL testing with UDP and dport
494 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
495 UDP(sport=1234, dport=dport))
496 self.pg0.add_stream(pkts)
499 self.create_classify_table(
500 key, self.build_ip6_mask(nh='ff', dst_port='ffff'))
501 self.create_classify_session(
502 self.acl_tbl_idx.get(key),
503 self.build_ip6_match(nh=socket.IPPROTO_UDP, dst_port=dport))
504 self.input_acl_set_interface(
505 self.pg0, self.acl_tbl_idx.get(key))
506 self.acl_active_table = key
508 self.pg_enable_capture(self.pg_interfaces)
511 pkts = self.pg1.get_capture(len(pkts))
512 self.verify_capture(self.pg1, pkts)
513 self.pg0.assert_nothing_captured(remark="packets forwarded")
514 self.pg2.assert_nothing_captured(remark="packets forwarded")
516 def test_iacl_proto_udp_sport_dport(self):
517 """ IP6 UDP source and destination ports iACL test
519 Test scenario for basic protocol ACL with UDP and sport and dport
520 - Create IPv6 stream for pg0 -> pg1 interface.
521 - Create iACL with UDP IP protocol and defined sport and dport.
522 - Send and verify received packets on pg1 interface.
525 # Basic iACL testing with UDP and sport and dport
528 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
529 UDP(sport=sport, dport=dport))
530 self.pg0.add_stream(pkts)
533 self.create_classify_table(
535 self.build_ip6_mask(nh='ff', src_port='ffff', dst_port='ffff'))
536 self.create_classify_session(
537 self.acl_tbl_idx.get(key),
538 self.build_ip6_match(nh=socket.IPPROTO_UDP, src_port=sport,
540 self.input_acl_set_interface(
541 self.pg0, self.acl_tbl_idx.get(key))
542 self.acl_active_table = key
544 self.pg_enable_capture(self.pg_interfaces)
547 pkts = self.pg1.get_capture(len(pkts))
548 self.verify_capture(self.pg1, pkts)
549 self.pg0.assert_nothing_captured(remark="packets forwarded")
550 self.pg2.assert_nothing_captured(remark="packets forwarded")
553 class TestClassifierIP6TCP(TestClassifier):
554 """ Classifier IP6 TCP proto Test Case """
558 super(TestClassifierIP6TCP, cls).setUpClass()
561 def tearDownClass(cls):
562 super(TestClassifierIP6TCP, cls).tearDownClass()
564 def test_iacl_proto_tcp(self):
565 """ IP6 TCP protocol iACL test
567 Test scenario for basic protocol ACL with TCP protocol
568 - Create IPv6 stream for pg0 -> pg1 interface.
569 - Create iACL with TCP IP protocol.
570 - Send and verify received packets on pg1 interface.
573 # Basic iACL testing with TCP protocol
574 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
575 TCP(sport=1234, dport=5678))
576 self.pg0.add_stream(pkts)
579 self.create_classify_table(key, self.build_ip6_mask(nh='ff'))
580 self.create_classify_session(
581 self.acl_tbl_idx.get(key),
582 self.build_ip6_match(nh=socket.IPPROTO_TCP))
583 self.input_acl_set_interface(
584 self.pg0, self.acl_tbl_idx.get(key))
585 self.acl_active_table = key
587 self.pg_enable_capture(self.pg_interfaces)
590 pkts = self.pg1.get_capture(len(pkts))
591 self.verify_capture(self.pg1, pkts, TCP)
592 self.pg0.assert_nothing_captured(remark="packets forwarded")
593 self.pg2.assert_nothing_captured(remark="packets forwarded")
595 def test_iacl_proto_tcp_sport(self):
596 """ IP6 TCP source port iACL test
598 Test scenario for basic protocol ACL with TCP and sport
599 - Create IPv6 stream for pg0 -> pg1 interface.
600 - Create iACL with TCP IP protocol and defined sport.
601 - Send and verify received packets on pg1 interface.
604 # Basic iACL testing with TCP and sport
606 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
607 TCP(sport=sport, dport=5678))
608 self.pg0.add_stream(pkts)
611 self.create_classify_table(
612 key, self.build_ip6_mask(nh='ff', src_port='ffff'))
613 self.create_classify_session(
614 self.acl_tbl_idx.get(key),
615 self.build_ip6_match(nh=socket.IPPROTO_TCP, src_port=sport))
616 self.input_acl_set_interface(
617 self.pg0, self.acl_tbl_idx.get(key))
618 self.acl_active_table = key
620 self.pg_enable_capture(self.pg_interfaces)
623 pkts = self.pg1.get_capture(len(pkts))
624 self.verify_capture(self.pg1, pkts, TCP)
625 self.pg0.assert_nothing_captured(remark="packets forwarded")
626 self.pg2.assert_nothing_captured(remark="packets forwarded")
628 def test_iacl_proto_tcp_dport(self):
629 """ IP6 TCP destination port iACL test
631 Test scenario for basic protocol ACL with TCP and dport
632 - Create IPv6 stream for pg0 -> pg1 interface.
633 - Create iACL with TCP IP protocol and defined dport.
634 - Send and verify received packets on pg1 interface.
637 # Basic iACL testing with TCP and dport
639 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
640 TCP(sport=1234, dport=dport))
641 self.pg0.add_stream(pkts)
644 self.create_classify_table(
645 key, self.build_ip6_mask(nh='ff', dst_port='ffff'))
646 self.create_classify_session(
647 self.acl_tbl_idx.get(key),
648 self.build_ip6_match(nh=socket.IPPROTO_TCP, dst_port=dport))
649 self.input_acl_set_interface(
650 self.pg0, self.acl_tbl_idx.get(key))
651 self.acl_active_table = key
653 self.pg_enable_capture(self.pg_interfaces)
656 pkts = self.pg1.get_capture(len(pkts))
657 self.verify_capture(self.pg1, pkts, TCP)
658 self.pg0.assert_nothing_captured(remark="packets forwarded")
659 self.pg2.assert_nothing_captured(remark="packets forwarded")
661 def test_iacl_proto_tcp_sport_dport(self):
662 """ IP6 TCP source and destination ports iACL test
664 Test scenario for basic protocol ACL with TCP and sport and dport
665 - Create IPv6 stream for pg0 -> pg1 interface.
666 - Create iACL with TCP IP protocol and defined sport and dport.
667 - Send and verify received packets on pg1 interface.
670 # Basic iACL testing with TCP and sport and dport
673 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
674 TCP(sport=sport, dport=dport))
675 self.pg0.add_stream(pkts)
678 self.create_classify_table(
680 self.build_ip6_mask(nh='ff', src_port='ffff', dst_port='ffff'))
681 self.create_classify_session(
682 self.acl_tbl_idx.get(key),
683 self.build_ip6_match(nh=socket.IPPROTO_TCP, src_port=sport,
685 self.input_acl_set_interface(
686 self.pg0, self.acl_tbl_idx.get(key))
687 self.acl_active_table = key
689 self.pg_enable_capture(self.pg_interfaces)
692 pkts = self.pg1.get_capture(len(pkts))
693 self.verify_capture(self.pg1, pkts, TCP)
694 self.pg0.assert_nothing_captured(remark="packets forwarded")
695 self.pg2.assert_nothing_captured(remark="packets forwarded")
698 class TestClassifierIP6Out(TestClassifier):
699 """ Classifier output IP6 Test Case """
703 super(TestClassifierIP6Out, cls).setUpClass()
706 def tearDownClass(cls):
707 super(TestClassifierIP6Out, cls).tearDownClass()
709 def test_acl_ip_out(self):
710 """ Output IP6 ACL test
712 Test scenario for basic IP ACL with source IP
713 - Create IPv6 stream for pg1 -> pg0 interface.
714 - Create ACL with source IP address.
715 - Send and verify received packets on pg0 interface.
718 # Basic oACL testing with source IP
719 pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes)
720 self.pg1.add_stream(pkts)
723 self.create_classify_table(
725 self.build_ip6_mask(src_ip='ffffffffffffffffffffffffffffffff'),
727 self.create_classify_session(
728 self.acl_tbl_idx.get(key),
729 self.build_ip6_match(src_ip=self.pg1.remote_ip6))
730 self.output_acl_set_interface(
731 self.pg0, self.acl_tbl_idx.get(key))
732 self.acl_active_table = key
734 self.pg_enable_capture(self.pg_interfaces)
737 pkts = self.pg0.get_capture(len(pkts))
738 self.verify_capture(self.pg0, pkts)
739 self.pg1.assert_nothing_captured(remark="packets forwarded")
740 self.pg2.assert_nothing_captured(remark="packets forwarded")
743 class TestClassifierIP6MAC(TestClassifier):
744 """ Classifier IP6 MAC Test Case """
748 super(TestClassifierIP6MAC, cls).setUpClass()
751 def tearDownClass(cls):
752 super(TestClassifierIP6MAC, cls).tearDownClass()
754 def test_acl_mac(self):
755 """ IP6 MAC iACL test
757 Test scenario for basic MAC ACL with source MAC
758 - Create IPv6 stream for pg0 -> pg2 interface.
759 - Create ACL with source MAC address.
760 - Send and verify received packets on pg2 interface.
763 # Basic iACL testing with source MAC
764 pkts = self.create_stream(self.pg0, self.pg2, self.pg_if_packet_sizes)
765 self.pg0.add_stream(pkts)
768 self.create_classify_table(
769 key, self.build_mac_mask(src_mac='ffffffffffff'), data_offset=-14)
770 self.create_classify_session(
771 self.acl_tbl_idx.get(key),
772 self.build_mac_match(src_mac=self.pg0.remote_mac))
773 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
774 self.acl_active_table = key
776 self.pg_enable_capture(self.pg_interfaces)
779 pkts = self.pg2.get_capture(len(pkts))
780 self.verify_capture(self.pg2, pkts)
781 self.pg0.assert_nothing_captured(remark="packets forwarded")
782 self.pg1.assert_nothing_captured(remark="packets forwarded")
785 if __name__ == '__main__':
786 unittest.main(testRunner=VppTestRunner)