8 from framework import VppTestCase, VppTestRunner
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether
12 from scapy.layers.inet6 import IPv6, UDP, TCP
16 class TestClassifier(VppTestCase):
17 """ Classifier Test Case """
22 Perform standard class setup (defined by class method setUpClass in
23 class VppTestCase) before running the test case, set test case related
24 variables and configure VPP.
26 super(TestClassifier, cls).setUpClass()
27 cls.acl_active_table = ''
31 Perform test setup before test case.
34 - create 4 pg interfaces
35 - untagged pg0/pg1/pg2 interface
36 pg0 -------> pg1 (IP ACL)
40 - put it into UP state
42 - resolve neighbor address using NDP
44 :ivar list interfaces: pg interfaces.
45 :ivar list pg_if_packet_sizes: packet sizes in test.
46 :ivar dict acl_tbl_idx: ACL table index.
47 :ivar int pbr_vrfid: VRF id for PBR test.
49 self.reset_packet_infos()
50 super(TestClassifier, self).setUp()
52 # create 4 pg interfaces
53 self.create_pg_interfaces(range(3))
55 # packet sizes to test
56 self.pg_if_packet_sizes = [64, 9018]
58 self.interfaces = list(self.pg_interfaces)
63 # setup all interfaces
64 for intf in self.interfaces:
70 """Run standard test teardown and acl related log."""
72 self.logger.info(self.vapi.ppcli("show inacl type ip6"))
73 self.logger.info(self.vapi.ppcli("show outacl type ip6"))
74 self.logger.info(self.vapi.cli("show classify table verbose"))
75 self.logger.info(self.vapi.cli("show ip fib"))
76 if self.acl_active_table == 'ip6_out':
77 self.output_acl_set_interface(
78 self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
79 self.acl_active_table = ''
80 elif self.acl_active_table != '':
81 self.input_acl_set_interface(
82 self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
83 self.acl_active_table = ''
84 for intf in self.interfaces:
88 super(TestClassifier, self).tearDown()
90 def create_stream(self, src_if, dst_if, packet_sizes,
91 proto_l=UDP(sport=1234, dport=5678)):
92 """Create input packet stream for defined interfaces.
94 :param VppInterface src_if: Source Interface for packet stream.
95 :param VppInterface dst_if: Destination Interface for packet stream.
96 :param list packet_sizes: packet size to test.
97 :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
101 for size in packet_sizes:
102 info = self.create_packet_info(src_if, dst_if)
103 payload = self.info_to_payload(info)
104 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
105 IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6) /
109 self.extend_packet(p, size)
113 def verify_capture(self, dst_if, capture, proto_l=UDP):
114 """Verify captured input packet stream for defined interface.
116 :param VppInterface dst_if: Interface to verify captured packet stream.
117 :param list capture: Captured packet stream.
118 :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
120 self.logger.info("Verifying capture on interface %s" % dst_if.name)
122 for i in self.interfaces:
123 last_info[i.sw_if_index] = None
124 dst_sw_if_index = dst_if.sw_if_index
125 for packet in capture:
127 ip6_received = packet[IPv6]
128 proto_received = packet[proto_l]
129 payload_info = self.payload_to_info(packet[Raw])
130 packet_index = payload_info.index
131 self.assertEqual(payload_info.dst, dst_sw_if_index)
133 "Got packet on port %s: src=%u (id=%u)" %
134 (dst_if.name, payload_info.src, packet_index))
135 next_info = self.get_next_packet_info_for_interface2(
136 payload_info.src, dst_sw_if_index,
137 last_info[payload_info.src])
138 last_info[payload_info.src] = next_info
139 self.assertTrue(next_info is not None)
140 self.assertEqual(packet_index, next_info.index)
141 saved_packet = next_info.data
142 ip_saved = saved_packet[IPv6]
143 proto_saved = saved_packet[proto_l]
144 # Check standard fields
145 self.assertEqual(ip6_received.src, ip_saved.src)
146 self.assertEqual(ip6_received.dst, ip_saved.dst)
147 self.assertEqual(proto_received.sport, proto_saved.sport)
148 self.assertEqual(proto_received.dport, proto_saved.dport)
150 self.logger.error(ppp("Unexpected or invalid packet:", packet))
152 for i in self.interfaces:
153 remaining_packet = self.get_next_packet_info_for_interface2(
154 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
155 self.assertTrue(remaining_packet is None,
156 "Interface %s: Packet expected from interface %s "
157 "didn't arrive" % (dst_if.name, i.name))
160 def build_ip6_mask(nh='', src_ip='', dst_ip='',
161 src_port='', dst_port=''):
162 """Build IPv6 ACL mask data with hexstring format.
164 :param str nh: next header number <0-ff>
165 :param str src_ip: source ip address <0-ffffffff>
166 :param str dst_ip: destination ip address <0-ffffffff>
167 :param str src_port: source port number <0-ffff>
168 :param str dst_port: destination port number <0-ffff>
171 return ('{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}'.format(
172 nh, src_ip, dst_ip, src_port, dst_port)).rstrip('0')
175 def build_ip6_match(nh=0, src_ip='', dst_ip='',
176 src_port=0, dst_port=0):
177 """Build IPv6 ACL match data with hexstring format.
179 :param int nh: next header number with valid option "x"
180 :param str src_ip: source ip6 address with format of "xxx:xxxx::xxxx"
181 :param str dst_ip: destination ip6 address with format of
183 :param int src_port: source port number "x"
184 :param int dst_port: destination port number "x"
187 src_ip = binascii.hexlify(socket.inet_pton(
188 socket.AF_INET6, src_ip))
190 dst_ip = binascii.hexlify(socket.inet_pton(
191 socket.AF_INET6, dst_ip))
193 return ('{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}'.format(
194 hex(nh)[2:], src_ip, dst_ip, hex(src_port)[2:],
195 hex(dst_port)[2:])).rstrip('0')
198 def build_mac_mask(dst_mac='', src_mac='', ether_type=''):
199 """Build MAC ACL mask data with hexstring format.
201 :param str dst_mac: source MAC address <0-ffffffffffff>
202 :param str src_mac: destination MAC address <0-ffffffffffff>
203 :param str ether_type: ethernet type <0-ffff>
206 return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
207 dst_mac, src_mac, ether_type)).rstrip('0')
210 def build_mac_match(dst_mac='', src_mac='', ether_type=''):
211 """Build MAC ACL match data with hexstring format.
213 :param str dst_mac: source MAC address <x:x:x:x:x:x>
214 :param str src_mac: destination MAC address <x:x:x:x:x:x>
215 :param str ether_type: ethernet type <0-ffff>
218 dst_mac = dst_mac.replace(':', '')
220 src_mac = src_mac.replace(':', '')
222 return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
223 dst_mac, src_mac, ether_type)).rstrip('0')
225 def create_classify_table(self, key, mask, data_offset=0):
226 """Create Classify Table
228 :param str key: key for classify table (ex, ACL name).
229 :param str mask: mask value for interested traffic.
230 :param int data_offset:
232 r = self.vapi.classify_add_del_table(
234 mask=binascii.unhexlify(mask),
235 match_n_vectors=(len(mask) - 1) // 32 + 1,
238 current_data_offset=data_offset)
239 self.assertIsNotNone(r, 'No response msg for add_del_table')
240 self.acl_tbl_idx[key] = r.new_table_index
242 def create_classify_session(self, table_index, match, vrfid=0, is_add=1):
243 """Create Classify Session
245 :param int table_index: table index to identify classify table.
246 :param str match: matched value for interested traffic.
247 :param int vrfid: VRF id.
248 :param int is_add: option to configure classify session.
249 - create(1) or delete(0)
251 r = self.vapi.classify_add_del_session(
254 binascii.unhexlify(match),
257 self.assertIsNotNone(r, 'No response msg for add_del_session')
259 def input_acl_set_interface(self, intf, table_index, is_add=1):
260 """Configure Input ACL interface
262 :param VppInterface intf: Interface to apply Input ACL feature.
263 :param int table_index: table index to identify classify table.
264 :param int is_add: option to configure classify session.
265 - enable(1) or disable(0)
267 r = self.vapi.input_acl_set_interface(
270 ip6_table_index=table_index)
271 self.assertIsNotNone(r, 'No response msg for acl_set_interface')
273 def output_acl_set_interface(self, intf, table_index, is_add=1):
274 """Configure Output ACL interface
276 :param VppInterface intf: Interface to apply Output ACL feature.
277 :param int table_index: table index to identify classify table.
278 :param int is_add: option to configure classify session.
279 - enable(1) or disable(0)
281 r = self.vapi.output_acl_set_interface(
284 ip6_table_index=table_index)
285 self.assertIsNotNone(r, 'No response msg for acl_set_interface')
288 class TestClassifierIP6(TestClassifier):
289 """ Classifier IP6 Test Case """
291 def test_iacl_src_ip(self):
292 """ Source IP6 iACL test
294 Test scenario for basic IP ACL with source IP
295 - Create IPv6 stream for pg0 -> pg1 interface.
296 - Create iACL with source IP address.
297 - Send and verify received packets on pg1 interface.
300 # Basic iACL testing with source IP
301 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
302 self.pg0.add_stream(pkts)
305 self.create_classify_table(
307 self.build_ip6_mask(src_ip='ffffffffffffffffffffffffffffffff'))
308 self.create_classify_session(
309 self.acl_tbl_idx.get(key),
310 self.build_ip6_match(src_ip=self.pg0.remote_ip6))
311 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
312 self.acl_active_table = key
314 self.pg_enable_capture(self.pg_interfaces)
317 pkts = self.pg1.get_capture(len(pkts))
318 self.verify_capture(self.pg1, pkts)
319 self.pg0.assert_nothing_captured(remark="packets forwarded")
320 self.pg2.assert_nothing_captured(remark="packets forwarded")
322 def test_iacl_dst_ip(self):
323 """ Destination IP6 iACL test
325 Test scenario for basic IP ACL with destination IP
326 - Create IPv6 stream for pg0 -> pg1 interface.
327 - Create iACL with destination IP address.
328 - Send and verify received packets on pg1 interface.
331 # Basic iACL testing with destination IP
332 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
333 self.pg0.add_stream(pkts)
336 self.create_classify_table(
338 self.build_ip6_mask(dst_ip='ffffffffffffffffffffffffffffffff'))
339 self.create_classify_session(
340 self.acl_tbl_idx.get(key),
341 self.build_ip6_match(dst_ip=self.pg1.remote_ip6))
342 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
343 self.acl_active_table = key
345 self.pg_enable_capture(self.pg_interfaces)
348 pkts = self.pg1.get_capture(len(pkts))
349 self.verify_capture(self.pg1, pkts)
350 self.pg0.assert_nothing_captured(remark="packets forwarded")
351 self.pg2.assert_nothing_captured(remark="packets forwarded")
353 def test_iacl_src_dst_ip(self):
354 """ Source and destination IP6 iACL test
356 Test scenario for basic IP ACL with source and destination IP
357 - Create IPv4 stream for pg0 -> pg1 interface.
358 - Create iACL with source and destination IP addresses.
359 - Send and verify received packets on pg1 interface.
362 # Basic iACL testing with source and destination IP
363 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
364 self.pg0.add_stream(pkts)
367 self.create_classify_table(
369 self.build_ip6_mask(src_ip='ffffffffffffffffffffffffffffffff',
370 dst_ip='ffffffffffffffffffffffffffffffff'))
371 self.create_classify_session(
372 self.acl_tbl_idx.get(key),
373 self.build_ip6_match(src_ip=self.pg0.remote_ip6,
374 dst_ip=self.pg1.remote_ip6))
375 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
376 self.acl_active_table = key
378 self.pg_enable_capture(self.pg_interfaces)
381 pkts = self.pg1.get_capture(len(pkts))
382 self.verify_capture(self.pg1, pkts)
383 self.pg0.assert_nothing_captured(remark="packets forwarded")
384 self.pg2.assert_nothing_captured(remark="packets forwarded")
387 # Tests split to different test case classes because of issue reported in
389 class TestClassifierIP6UDP(TestClassifier):
390 """ Classifier IP6 UDP proto Test Case """
392 def test_iacl_proto_udp(self):
393 """ IP6 UDP protocol iACL test
395 Test scenario for basic protocol ACL with UDP protocol
396 - Create IPv6 stream for pg0 -> pg1 interface.
397 - Create iACL with UDP IP protocol.
398 - Send and verify received packets on pg1 interface.
401 # Basic iACL testing with UDP protocol
402 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
403 self.pg0.add_stream(pkts)
406 self.create_classify_table(key, self.build_ip6_mask(nh='ff'))
407 self.create_classify_session(
408 self.acl_tbl_idx.get(key),
409 self.build_ip6_match(nh=socket.IPPROTO_UDP))
410 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
411 self.acl_active_table = key
413 self.pg_enable_capture(self.pg_interfaces)
416 pkts = self.pg1.get_capture(len(pkts))
417 self.verify_capture(self.pg1, pkts)
418 self.pg0.assert_nothing_captured(remark="packets forwarded")
419 self.pg2.assert_nothing_captured(remark="packets forwarded")
421 def test_iacl_proto_udp_sport(self):
422 """ IP6 UDP source port iACL test
424 Test scenario for basic protocol ACL with UDP and sport
425 - Create IPv6 stream for pg0 -> pg1 interface.
426 - Create iACL with UDP IP protocol and defined sport.
427 - Send and verify received packets on pg1 interface.
430 # Basic iACL testing with UDP and sport
432 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
433 UDP(sport=sport, dport=5678))
434 self.pg0.add_stream(pkts)
437 self.create_classify_table(
438 key, self.build_ip6_mask(nh='ff', src_port='ffff'))
439 self.create_classify_session(
440 self.acl_tbl_idx.get(key),
441 self.build_ip6_match(nh=socket.IPPROTO_UDP, src_port=sport))
442 self.input_acl_set_interface(
443 self.pg0, self.acl_tbl_idx.get(key))
444 self.acl_active_table = key
446 self.pg_enable_capture(self.pg_interfaces)
449 pkts = self.pg1.get_capture(len(pkts))
450 self.verify_capture(self.pg1, pkts)
451 self.pg0.assert_nothing_captured(remark="packets forwarded")
452 self.pg2.assert_nothing_captured(remark="packets forwarded")
454 def test_iacl_proto_udp_dport(self):
455 """ IP6 UDP destination port iACL test
457 Test scenario for basic protocol ACL with UDP and dport
458 - Create IPv6 stream for pg0 -> pg1 interface.
459 - Create iACL with UDP IP protocol and defined dport.
460 - Send and verify received packets on pg1 interface.
463 # Basic iACL testing with UDP and dport
465 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
466 UDP(sport=1234, dport=dport))
467 self.pg0.add_stream(pkts)
470 self.create_classify_table(
471 key, self.build_ip6_mask(nh='ff', dst_port='ffff'))
472 self.create_classify_session(
473 self.acl_tbl_idx.get(key),
474 self.build_ip6_match(nh=socket.IPPROTO_UDP, dst_port=dport))
475 self.input_acl_set_interface(
476 self.pg0, self.acl_tbl_idx.get(key))
477 self.acl_active_table = key
479 self.pg_enable_capture(self.pg_interfaces)
482 pkts = self.pg1.get_capture(len(pkts))
483 self.verify_capture(self.pg1, pkts)
484 self.pg0.assert_nothing_captured(remark="packets forwarded")
485 self.pg2.assert_nothing_captured(remark="packets forwarded")
487 def test_iacl_proto_udp_sport_dport(self):
488 """ IP6 UDP source and destination ports iACL test
490 Test scenario for basic protocol ACL with UDP and sport and dport
491 - Create IPv6 stream for pg0 -> pg1 interface.
492 - Create iACL with UDP IP protocol and defined sport and dport.
493 - Send and verify received packets on pg1 interface.
496 # Basic iACL testing with UDP and sport and dport
499 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
500 UDP(sport=sport, dport=dport))
501 self.pg0.add_stream(pkts)
504 self.create_classify_table(
506 self.build_ip6_mask(nh='ff', src_port='ffff', dst_port='ffff'))
507 self.create_classify_session(
508 self.acl_tbl_idx.get(key),
509 self.build_ip6_match(nh=socket.IPPROTO_UDP, src_port=sport,
511 self.input_acl_set_interface(
512 self.pg0, self.acl_tbl_idx.get(key))
513 self.acl_active_table = key
515 self.pg_enable_capture(self.pg_interfaces)
518 pkts = self.pg1.get_capture(len(pkts))
519 self.verify_capture(self.pg1, pkts)
520 self.pg0.assert_nothing_captured(remark="packets forwarded")
521 self.pg2.assert_nothing_captured(remark="packets forwarded")
524 class TestClassifierIP6TCP(TestClassifier):
525 """ Classifier IP6 TCP proto Test Case """
527 def test_iacl_proto_tcp(self):
528 """ IP6 TCP protocol iACL test
530 Test scenario for basic protocol ACL with TCP protocol
531 - Create IPv6 stream for pg0 -> pg1 interface.
532 - Create iACL with TCP IP protocol.
533 - Send and verify received packets on pg1 interface.
536 # Basic iACL testing with TCP protocol
537 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
538 TCP(sport=1234, dport=5678))
539 self.pg0.add_stream(pkts)
542 self.create_classify_table(key, self.build_ip6_mask(nh='ff'))
543 self.create_classify_session(
544 self.acl_tbl_idx.get(key),
545 self.build_ip6_match(nh=socket.IPPROTO_TCP))
546 self.input_acl_set_interface(
547 self.pg0, self.acl_tbl_idx.get(key))
548 self.acl_active_table = key
550 self.pg_enable_capture(self.pg_interfaces)
553 pkts = self.pg1.get_capture(len(pkts))
554 self.verify_capture(self.pg1, pkts, TCP)
555 self.pg0.assert_nothing_captured(remark="packets forwarded")
556 self.pg2.assert_nothing_captured(remark="packets forwarded")
558 def test_iacl_proto_tcp_sport(self):
559 """ IP6 TCP source port iACL test
561 Test scenario for basic protocol ACL with TCP and sport
562 - Create IPv6 stream for pg0 -> pg1 interface.
563 - Create iACL with TCP IP protocol and defined sport.
564 - Send and verify received packets on pg1 interface.
567 # Basic iACL testing with TCP and sport
569 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
570 TCP(sport=sport, dport=5678))
571 self.pg0.add_stream(pkts)
574 self.create_classify_table(
575 key, self.build_ip6_mask(nh='ff', src_port='ffff'))
576 self.create_classify_session(
577 self.acl_tbl_idx.get(key),
578 self.build_ip6_match(nh=socket.IPPROTO_TCP, src_port=sport))
579 self.input_acl_set_interface(
580 self.pg0, self.acl_tbl_idx.get(key))
581 self.acl_active_table = key
583 self.pg_enable_capture(self.pg_interfaces)
586 pkts = self.pg1.get_capture(len(pkts))
587 self.verify_capture(self.pg1, pkts, TCP)
588 self.pg0.assert_nothing_captured(remark="packets forwarded")
589 self.pg2.assert_nothing_captured(remark="packets forwarded")
591 def test_iacl_proto_tcp_dport(self):
592 """ IP6 TCP destination port iACL test
594 Test scenario for basic protocol ACL with TCP and dport
595 - Create IPv6 stream for pg0 -> pg1 interface.
596 - Create iACL with TCP IP protocol and defined dport.
597 - Send and verify received packets on pg1 interface.
600 # Basic iACL testing with TCP and dport
602 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
603 TCP(sport=1234, dport=dport))
604 self.pg0.add_stream(pkts)
607 self.create_classify_table(
608 key, self.build_ip6_mask(nh='ff', dst_port='ffff'))
609 self.create_classify_session(
610 self.acl_tbl_idx.get(key),
611 self.build_ip6_match(nh=socket.IPPROTO_TCP, dst_port=dport))
612 self.input_acl_set_interface(
613 self.pg0, self.acl_tbl_idx.get(key))
614 self.acl_active_table = key
616 self.pg_enable_capture(self.pg_interfaces)
619 pkts = self.pg1.get_capture(len(pkts))
620 self.verify_capture(self.pg1, pkts, TCP)
621 self.pg0.assert_nothing_captured(remark="packets forwarded")
622 self.pg2.assert_nothing_captured(remark="packets forwarded")
624 def test_iacl_proto_tcp_sport_dport(self):
625 """ IP6 TCP source and destination ports iACL test
627 Test scenario for basic protocol ACL with TCP and sport and dport
628 - Create IPv6 stream for pg0 -> pg1 interface.
629 - Create iACL with TCP IP protocol and defined sport and dport.
630 - Send and verify received packets on pg1 interface.
633 # Basic iACL testing with TCP and sport and dport
636 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
637 TCP(sport=sport, dport=dport))
638 self.pg0.add_stream(pkts)
641 self.create_classify_table(
643 self.build_ip6_mask(nh='ff', src_port='ffff', dst_port='ffff'))
644 self.create_classify_session(
645 self.acl_tbl_idx.get(key),
646 self.build_ip6_match(nh=socket.IPPROTO_TCP, src_port=sport,
648 self.input_acl_set_interface(
649 self.pg0, self.acl_tbl_idx.get(key))
650 self.acl_active_table = key
652 self.pg_enable_capture(self.pg_interfaces)
655 pkts = self.pg1.get_capture(len(pkts))
656 self.verify_capture(self.pg1, pkts, TCP)
657 self.pg0.assert_nothing_captured(remark="packets forwarded")
658 self.pg2.assert_nothing_captured(remark="packets forwarded")
661 class TestClassifierIP6Out(TestClassifier):
662 """ Classifier output IP6 Test Case """
664 def test_acl_ip_out(self):
665 """ Output IP6 ACL test
667 Test scenario for basic IP ACL with source IP
668 - Create IPv6 stream for pg1 -> pg0 interface.
669 - Create ACL with source IP address.
670 - Send and verify received packets on pg0 interface.
673 # Basic oACL testing with source IP
674 pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes)
675 self.pg1.add_stream(pkts)
678 self.create_classify_table(
680 self.build_ip6_mask(src_ip='ffffffffffffffffffffffffffffffff'),
682 self.create_classify_session(
683 self.acl_tbl_idx.get(key),
684 self.build_ip6_match(src_ip=self.pg1.remote_ip6))
685 self.output_acl_set_interface(
686 self.pg0, self.acl_tbl_idx.get(key))
687 self.acl_active_table = key
689 self.pg_enable_capture(self.pg_interfaces)
692 pkts = self.pg0.get_capture(len(pkts))
693 self.verify_capture(self.pg0, pkts)
694 self.pg1.assert_nothing_captured(remark="packets forwarded")
695 self.pg2.assert_nothing_captured(remark="packets forwarded")
698 class TestClassifierIP6MAC(TestClassifier):
699 """ Classifier IP6 MAC Test Case """
701 def test_acl_mac(self):
702 """ IP6 MAC iACL test
704 Test scenario for basic MAC ACL with source MAC
705 - Create IPv6 stream for pg0 -> pg2 interface.
706 - Create ACL with source MAC address.
707 - Send and verify received packets on pg2 interface.
710 # Basic iACL testing with source MAC
711 pkts = self.create_stream(self.pg0, self.pg2, self.pg_if_packet_sizes)
712 self.pg0.add_stream(pkts)
715 self.create_classify_table(
716 key, self.build_mac_mask(src_mac='ffffffffffff'), data_offset=-14)
717 self.create_classify_session(
718 self.acl_tbl_idx.get(key),
719 self.build_mac_match(src_mac=self.pg0.remote_mac))
720 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
721 self.acl_active_table = key
723 self.pg_enable_capture(self.pg_interfaces)
726 pkts = self.pg2.get_capture(len(pkts))
727 self.verify_capture(self.pg2, pkts)
728 self.pg0.assert_nothing_captured(remark="packets forwarded")
729 self.pg1.assert_nothing_captured(remark="packets forwarded")
732 if __name__ == '__main__':
733 unittest.main(testRunner=VppTestRunner)