7 from framework import VppTestCase, VppTestRunner
9 from scapy.packet import Raw
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP, UDP, TCP
13 from vpp_ip_route import VppIpRoute, VppRoutePath
14 from vpp_ip import INVALID_INDEX
17 class TestClassifier(VppTestCase):
18 """ Classifier Test Case """
23 Perform standard class setup (defined by class method setUpClass in
24 class VppTestCase) before running the test case, set test case related
25 variables and configure VPP.
27 super(TestClassifier, cls).setUpClass()
28 cls.acl_active_table = ''
31 def tearDownClass(cls):
32 super(TestClassifier, cls).tearDownClass()
36 Perform test setup before test case.
39 - create 4 pg interfaces
40 - untagged pg0/pg1/pg2 interface
41 pg0 -------> pg1 (IP ACL)
47 - put it into UP state
49 - resolve neighbor address using ARP
51 :ivar list interfaces: pg interfaces.
52 :ivar list pg_if_packet_sizes: packet sizes in test.
53 :ivar dict acl_tbl_idx: ACL table index.
54 :ivar int pbr_vrfid: VRF id for PBR test.
56 self.reset_packet_infos()
57 super(TestClassifier, self).setUp()
59 # create 4 pg interfaces
60 self.create_pg_interfaces(range(4))
62 # packet sizes to test
63 self.pg_if_packet_sizes = [64, 9018]
65 self.interfaces = list(self.pg_interfaces)
71 # setup all interfaces
72 for intf in self.interfaces:
78 """Run standard test teardown and acl related log."""
80 if self.acl_active_table == 'ip_out':
81 self.output_acl_set_interface(
82 self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
83 self.acl_active_table = ''
84 elif self.acl_active_table != '':
85 self.input_acl_set_interface(
86 self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
87 self.acl_active_table = ''
88 for intf in self.interfaces:
92 super(TestClassifier, self).tearDown()
94 def show_commands_at_teardown(self):
95 self.logger.info(self.vapi.ppcli("show inacl type ip4"))
96 self.logger.info(self.vapi.ppcli("show outacl type ip4"))
97 self.logger.info(self.vapi.cli("show classify table verbose"))
98 self.logger.info(self.vapi.cli("show ip fib"))
100 def create_stream(self, src_if, dst_if, packet_sizes,
101 proto_l=UDP(sport=1234, dport=5678)):
102 """Create input packet stream for defined interfaces.
104 :param VppInterface src_if: Source Interface for packet stream.
105 :param VppInterface dst_if: Destination Interface for packet stream.
106 :param list packet_sizes: packet size to test.
107 :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
111 for size in packet_sizes:
112 info = self.create_packet_info(src_if, dst_if)
113 payload = self.info_to_payload(info)
114 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
115 IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) /
119 self.extend_packet(p, size)
123 def verify_capture(self, dst_if, capture, proto_l=UDP):
124 """Verify captured input packet stream for defined interface.
126 :param VppInterface dst_if: Interface to verify captured packet stream.
127 :param list capture: Captured packet stream.
128 :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
130 self.logger.info("Verifying capture on interface %s" % dst_if.name)
132 for i in self.interfaces:
133 last_info[i.sw_if_index] = None
134 dst_sw_if_index = dst_if.sw_if_index
135 for packet in capture:
137 ip_received = packet[IP]
138 proto_received = packet[proto_l]
139 payload_info = self.payload_to_info(packet[Raw])
140 packet_index = payload_info.index
141 self.assertEqual(payload_info.dst, dst_sw_if_index)
143 "Got packet on port %s: src=%u (id=%u)" %
144 (dst_if.name, payload_info.src, packet_index))
145 next_info = self.get_next_packet_info_for_interface2(
146 payload_info.src, dst_sw_if_index,
147 last_info[payload_info.src])
148 last_info[payload_info.src] = next_info
149 self.assertTrue(next_info is not None)
150 self.assertEqual(packet_index, next_info.index)
151 saved_packet = next_info.data
152 ip_saved = saved_packet[IP]
153 proto_saved = saved_packet[proto_l]
154 # Check standard fields
155 self.assertEqual(ip_received.src, ip_saved.src)
156 self.assertEqual(ip_received.dst, ip_saved.dst)
157 self.assertEqual(proto_received.sport, proto_saved.sport)
158 self.assertEqual(proto_received.dport, proto_saved.dport)
160 self.logger.error(ppp("Unexpected or invalid packet:", packet))
162 for i in self.interfaces:
163 remaining_packet = self.get_next_packet_info_for_interface2(
164 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
165 self.assertTrue(remaining_packet is None,
166 "Interface %s: Packet expected from interface %s "
167 "didn't arrive" % (dst_if.name, i.name))
169 def verify_vrf(self, vrf_id):
171 Check if the FIB table / VRF ID is configured.
173 :param int vrf_id: The FIB table / VRF ID to be verified.
174 :return: 1 if the FIB table / VRF ID is configured, otherwise return 0.
176 ip_fib_dump = self.vapi.ip_route_dump(vrf_id, False)
177 vrf_count = len(ip_fib_dump)
180 self.logger.info("IPv4 VRF ID %d is not configured" % vrf_id)
183 self.logger.info("IPv4 VRF ID %d is configured" % vrf_id)
187 def build_ip_mask(proto='', src_ip='', dst_ip='',
188 src_port='', dst_port=''):
189 """Build IP ACL mask data with hexstring format.
191 :param str proto: protocol number <0-ff>
192 :param str src_ip: source ip address <0-ffffffff>
193 :param str dst_ip: destination ip address <0-ffffffff>
194 :param str src_port: source port number <0-ffff>
195 :param str dst_port: destination port number <0-ffff>
198 return ('{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}'.format(
199 proto, src_ip, dst_ip, src_port, dst_port)).rstrip('0')
202 def build_ip_match(proto=0, src_ip='', dst_ip='',
203 src_port=0, dst_port=0):
204 """Build IP ACL match data with hexstring format.
206 :param int proto: protocol number with valid option "x"
207 :param str src_ip: source ip address with format of "x.x.x.x"
208 :param str dst_ip: destination ip address with format of "x.x.x.x"
209 :param int src_port: source port number "x"
210 :param int dst_port: destination port number "x"
213 src_ip = binascii.hexlify(socket.inet_aton(src_ip))
215 dst_ip = binascii.hexlify(socket.inet_aton(dst_ip))
217 return ('{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}'.format(
218 hex(proto)[2:], src_ip, dst_ip, hex(src_port)[2:],
219 hex(dst_port)[2:])).rstrip('0')
222 def build_mac_mask(dst_mac='', src_mac='', ether_type=''):
223 """Build MAC ACL mask data with hexstring format.
225 :param str dst_mac: source MAC address <0-ffffffffffff>
226 :param str src_mac: destination MAC address <0-ffffffffffff>
227 :param str ether_type: ethernet type <0-ffff>
230 return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
231 dst_mac, src_mac, ether_type)).rstrip('0')
234 def build_mac_match(dst_mac='', src_mac='', ether_type=''):
235 """Build MAC ACL match data with hexstring format.
237 :param str dst_mac: source MAC address <x:x:x:x:x:x>
238 :param str src_mac: destination MAC address <x:x:x:x:x:x>
239 :param str ether_type: ethernet type <0-ffff>
242 dst_mac = dst_mac.replace(':', '')
244 src_mac = src_mac.replace(':', '')
246 return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
247 dst_mac, src_mac, ether_type)).rstrip('0')
249 def create_classify_table(self, key, mask, data_offset=0):
250 """Create Classify Table
252 :param str key: key for classify table (ex, ACL name).
253 :param str mask: mask value for interested traffic.
254 :param int data_offset:
256 r = self.vapi.classify_add_del_table(
258 mask=binascii.unhexlify(mask),
259 match_n_vectors=(len(mask) - 1) // 32 + 1,
262 current_data_offset=data_offset)
263 self.assertIsNotNone(r, 'No response msg for add_del_table')
264 self.acl_tbl_idx[key] = r.new_table_index
266 def create_classify_session(self, table_index, match, pbr_option=0,
268 """Create Classify Session
270 :param int table_index: table index to identify classify table.
271 :param str match: matched value for interested traffic.
272 :param int pbr_option: enable/disable PBR feature.
273 :param int vrfid: VRF id.
274 :param int is_add: option to configure classify session.
275 - create(1) or delete(0)
277 r = self.vapi.classify_add_del_session(
280 binascii.unhexlify(match),
284 self.assertIsNotNone(r, 'No response msg for add_del_session')
286 def input_acl_set_interface(self, intf, table_index, is_add=1):
287 """Configure Input ACL interface
289 :param VppInterface intf: Interface to apply Input ACL feature.
290 :param int table_index: table index to identify classify table.
291 :param int is_add: option to configure classify session.
292 - enable(1) or disable(0)
294 r = self.vapi.input_acl_set_interface(
297 ip4_table_index=table_index)
298 self.assertIsNotNone(r, 'No response msg for acl_set_interface')
300 def output_acl_set_interface(self, intf, table_index, is_add=1):
301 """Configure Output ACL interface
303 :param VppInterface intf: Interface to apply Output ACL feature.
304 :param int table_index: table index to identify classify table.
305 :param int is_add: option to configure classify session.
306 - enable(1) or disable(0)
308 r = self.vapi.output_acl_set_interface(
311 ip4_table_index=table_index)
312 self.assertIsNotNone(r, 'No response msg for acl_set_interface')
315 # Tests split to different test case classes because of issue reported in
317 class TestClassifierIP(TestClassifier):
318 """ Classifier IP Test Case """
322 super(TestClassifierIP, cls).setUpClass()
325 def tearDownClass(cls):
326 super(TestClassifierIP, cls).tearDownClass()
328 def test_iacl_src_ip(self):
329 """ Source IP iACL test
331 Test scenario for basic IP ACL with source IP
332 - Create IPv4 stream for pg0 -> pg1 interface.
333 - Create iACL with source IP address.
334 - Send and verify received packets on pg1 interface.
337 # Basic iACL testing with source IP
338 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
339 self.pg0.add_stream(pkts)
342 self.create_classify_table(key, self.build_ip_mask(src_ip='ffffffff'))
343 self.create_classify_session(
344 self.acl_tbl_idx.get(key),
345 self.build_ip_match(src_ip=self.pg0.remote_ip4))
346 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
347 self.acl_active_table = key
349 self.pg_enable_capture(self.pg_interfaces)
352 pkts = self.pg1.get_capture(len(pkts))
353 self.verify_capture(self.pg1, pkts)
354 self.pg0.assert_nothing_captured(remark="packets forwarded")
355 self.pg2.assert_nothing_captured(remark="packets forwarded")
356 self.pg3.assert_nothing_captured(remark="packets forwarded")
358 def test_iacl_dst_ip(self):
359 """ Destination IP iACL test
361 Test scenario for basic IP ACL with destination IP
362 - Create IPv4 stream for pg0 -> pg1 interface.
363 - Create iACL with destination IP address.
364 - Send and verify received packets on pg1 interface.
367 # Basic iACL testing with destination IP
368 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
369 self.pg0.add_stream(pkts)
372 self.create_classify_table(key, self.build_ip_mask(dst_ip='ffffffff'))
373 self.create_classify_session(
374 self.acl_tbl_idx.get(key),
375 self.build_ip_match(dst_ip=self.pg1.remote_ip4))
376 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
377 self.acl_active_table = key
379 self.pg_enable_capture(self.pg_interfaces)
382 pkts = self.pg1.get_capture(len(pkts))
383 self.verify_capture(self.pg1, pkts)
384 self.pg0.assert_nothing_captured(remark="packets forwarded")
385 self.pg2.assert_nothing_captured(remark="packets forwarded")
386 self.pg3.assert_nothing_captured(remark="packets forwarded")
388 def test_iacl_src_dst_ip(self):
389 """ Source and destination IP iACL test
391 Test scenario for basic IP ACL with source and destination IP
392 - Create IPv4 stream for pg0 -> pg1 interface.
393 - Create iACL with source and destination IP addresses.
394 - Send and verify received packets on pg1 interface.
397 # Basic iACL testing with source and destination IP
398 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
399 self.pg0.add_stream(pkts)
402 self.create_classify_table(
403 key, self.build_ip_mask(src_ip='ffffffff', dst_ip='ffffffff'))
404 self.create_classify_session(
405 self.acl_tbl_idx.get(key),
406 self.build_ip_match(src_ip=self.pg0.remote_ip4,
407 dst_ip=self.pg1.remote_ip4))
408 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
409 self.acl_active_table = key
411 self.pg_enable_capture(self.pg_interfaces)
414 pkts = self.pg1.get_capture(len(pkts))
415 self.verify_capture(self.pg1, pkts)
416 self.pg0.assert_nothing_captured(remark="packets forwarded")
417 self.pg2.assert_nothing_captured(remark="packets forwarded")
418 self.pg3.assert_nothing_captured(remark="packets forwarded")
421 class TestClassifierUDP(TestClassifier):
422 """ Classifier UDP proto Test Case """
426 super(TestClassifierUDP, cls).setUpClass()
429 def tearDownClass(cls):
430 super(TestClassifierUDP, cls).tearDownClass()
432 def test_iacl_proto_udp(self):
433 """ UDP protocol iACL test
435 Test scenario for basic protocol ACL with UDP protocol
436 - Create IPv4 stream for pg0 -> pg1 interface.
437 - Create iACL with UDP IP protocol.
438 - Send and verify received packets on pg1 interface.
441 # Basic iACL testing with UDP protocol
442 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
443 self.pg0.add_stream(pkts)
446 self.create_classify_table(key, self.build_ip_mask(proto='ff'))
447 self.create_classify_session(
448 self.acl_tbl_idx.get(key),
449 self.build_ip_match(proto=socket.IPPROTO_UDP))
450 self.input_acl_set_interface(
451 self.pg0, self.acl_tbl_idx.get(key))
452 self.acl_active_table = key
454 self.pg_enable_capture(self.pg_interfaces)
457 pkts = self.pg1.get_capture(len(pkts))
458 self.verify_capture(self.pg1, pkts)
459 self.pg0.assert_nothing_captured(remark="packets forwarded")
460 self.pg2.assert_nothing_captured(remark="packets forwarded")
461 self.pg3.assert_nothing_captured(remark="packets forwarded")
463 def test_iacl_proto_udp_sport(self):
464 """ UDP source port iACL test
466 Test scenario for basic protocol ACL with UDP and sport
467 - Create IPv4 stream for pg0 -> pg1 interface.
468 - Create iACL with UDP IP protocol and defined sport.
469 - Send and verify received packets on pg1 interface.
472 # Basic iACL testing with UDP and sport
474 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
475 UDP(sport=sport, dport=5678))
476 self.pg0.add_stream(pkts)
478 key = 'proto_udp_sport'
479 self.create_classify_table(
480 key, self.build_ip_mask(proto='ff', src_port='ffff'))
481 self.create_classify_session(
482 self.acl_tbl_idx.get(key),
483 self.build_ip_match(proto=socket.IPPROTO_UDP, src_port=sport))
484 self.input_acl_set_interface(
485 self.pg0, self.acl_tbl_idx.get(key))
486 self.acl_active_table = key
488 self.pg_enable_capture(self.pg_interfaces)
491 pkts = self.pg1.get_capture(len(pkts))
492 self.verify_capture(self.pg1, pkts)
493 self.pg0.assert_nothing_captured(remark="packets forwarded")
494 self.pg2.assert_nothing_captured(remark="packets forwarded")
495 self.pg3.assert_nothing_captured(remark="packets forwarded")
497 def test_iacl_proto_udp_dport(self):
498 """ UDP destination port iACL test
500 Test scenario for basic protocol ACL with UDP and dport
501 - Create IPv4 stream for pg0 -> pg1 interface.
502 - Create iACL with UDP IP protocol and defined dport.
503 - Send and verify received packets on pg1 interface.
506 # Basic iACL testing with UDP and dport
508 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
509 UDP(sport=1234, dport=dport))
510 self.pg0.add_stream(pkts)
512 key = 'proto_udp_dport'
513 self.create_classify_table(
514 key, self.build_ip_mask(proto='ff', dst_port='ffff'))
515 self.create_classify_session(
516 self.acl_tbl_idx.get(key),
517 self.build_ip_match(proto=socket.IPPROTO_UDP, dst_port=dport))
518 self.input_acl_set_interface(
519 self.pg0, self.acl_tbl_idx.get(key))
520 self.acl_active_table = key
522 self.pg_enable_capture(self.pg_interfaces)
525 pkts = self.pg1.get_capture(len(pkts))
526 self.verify_capture(self.pg1, pkts)
527 self.pg0.assert_nothing_captured(remark="packets forwarded")
528 self.pg2.assert_nothing_captured(remark="packets forwarded")
529 self.pg3.assert_nothing_captured(remark="packets forwarded")
531 def test_iacl_proto_udp_sport_dport(self):
532 """ UDP source and destination ports iACL test
534 Test scenario for basic protocol ACL with UDP and sport and dport
535 - Create IPv4 stream for pg0 -> pg1 interface.
536 - Create iACL with UDP IP protocol and defined sport and dport.
537 - Send and verify received packets on pg1 interface.
540 # Basic iACL testing with UDP and sport and dport
543 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
544 UDP(sport=sport, dport=dport))
545 self.pg0.add_stream(pkts)
547 key = 'proto_udp_ports'
548 self.create_classify_table(
550 self.build_ip_mask(proto='ff', src_port='ffff', dst_port='ffff'))
551 self.create_classify_session(
552 self.acl_tbl_idx.get(key),
553 self.build_ip_match(proto=socket.IPPROTO_UDP, src_port=sport,
555 self.input_acl_set_interface(
556 self.pg0, self.acl_tbl_idx.get(key))
557 self.acl_active_table = key
559 self.pg_enable_capture(self.pg_interfaces)
562 pkts = self.pg1.get_capture(len(pkts))
563 self.verify_capture(self.pg1, pkts)
564 self.pg0.assert_nothing_captured(remark="packets forwarded")
565 self.pg2.assert_nothing_captured(remark="packets forwarded")
566 self.pg3.assert_nothing_captured(remark="packets forwarded")
569 class TestClassifierTCP(TestClassifier):
570 """ Classifier TCP proto Test Case """
574 super(TestClassifierTCP, cls).setUpClass()
577 def tearDownClass(cls):
578 super(TestClassifierTCP, cls).tearDownClass()
580 def test_iacl_proto_tcp(self):
581 """ TCP protocol iACL test
583 Test scenario for basic protocol ACL with TCP protocol
584 - Create IPv4 stream for pg0 -> pg1 interface.
585 - Create iACL with TCP IP protocol.
586 - Send and verify received packets on pg1 interface.
589 # Basic iACL testing with TCP protocol
590 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
591 TCP(sport=1234, dport=5678))
592 self.pg0.add_stream(pkts)
595 self.create_classify_table(key, self.build_ip_mask(proto='ff'))
596 self.create_classify_session(
597 self.acl_tbl_idx.get(key),
598 self.build_ip_match(proto=socket.IPPROTO_TCP))
599 self.input_acl_set_interface(
600 self.pg0, self.acl_tbl_idx.get(key))
601 self.acl_active_table = key
603 self.pg_enable_capture(self.pg_interfaces)
606 pkts = self.pg1.get_capture(len(pkts))
607 self.verify_capture(self.pg1, pkts, TCP)
608 self.pg0.assert_nothing_captured(remark="packets forwarded")
609 self.pg2.assert_nothing_captured(remark="packets forwarded")
610 self.pg3.assert_nothing_captured(remark="packets forwarded")
612 def test_iacl_proto_tcp_sport(self):
613 """ TCP source port iACL test
615 Test scenario for basic protocol ACL with TCP and sport
616 - Create IPv4 stream for pg0 -> pg1 interface.
617 - Create iACL with TCP IP protocol and defined sport.
618 - Send and verify received packets on pg1 interface.
621 # Basic iACL testing with TCP and sport
623 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
624 TCP(sport=sport, dport=5678))
625 self.pg0.add_stream(pkts)
627 key = 'proto_tcp_sport'
628 self.create_classify_table(
629 key, self.build_ip_mask(proto='ff', src_port='ffff'))
630 self.create_classify_session(
631 self.acl_tbl_idx.get(key),
632 self.build_ip_match(proto=socket.IPPROTO_TCP, src_port=sport))
633 self.input_acl_set_interface(
634 self.pg0, self.acl_tbl_idx.get(key))
635 self.acl_active_table = key
637 self.pg_enable_capture(self.pg_interfaces)
640 pkts = self.pg1.get_capture(len(pkts))
641 self.verify_capture(self.pg1, pkts, TCP)
642 self.pg0.assert_nothing_captured(remark="packets forwarded")
643 self.pg2.assert_nothing_captured(remark="packets forwarded")
644 self.pg3.assert_nothing_captured(remark="packets forwarded")
646 def test_iacl_proto_tcp_dport(self):
647 """ TCP destination port iACL test
649 Test scenario for basic protocol ACL with TCP and dport
650 - Create IPv4 stream for pg0 -> pg1 interface.
651 - Create iACL with TCP IP protocol and defined dport.
652 - Send and verify received packets on pg1 interface.
655 # Basic iACL testing with TCP and dport
657 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
658 TCP(sport=1234, dport=dport))
659 self.pg0.add_stream(pkts)
661 key = 'proto_tcp_sport'
662 self.create_classify_table(
663 key, self.build_ip_mask(proto='ff', dst_port='ffff'))
664 self.create_classify_session(
665 self.acl_tbl_idx.get(key),
666 self.build_ip_match(proto=socket.IPPROTO_TCP, dst_port=dport))
667 self.input_acl_set_interface(
668 self.pg0, self.acl_tbl_idx.get(key))
669 self.acl_active_table = key
671 self.pg_enable_capture(self.pg_interfaces)
674 pkts = self.pg1.get_capture(len(pkts))
675 self.verify_capture(self.pg1, pkts, TCP)
676 self.pg0.assert_nothing_captured(remark="packets forwarded")
677 self.pg2.assert_nothing_captured(remark="packets forwarded")
678 self.pg3.assert_nothing_captured(remark="packets forwarded")
680 def test_iacl_proto_tcp_sport_dport(self):
681 """ TCP source and destination ports iACL test
683 Test scenario for basic protocol ACL with TCP and sport and dport
684 - Create IPv4 stream for pg0 -> pg1 interface.
685 - Create iACL with TCP IP protocol and defined sport and dport.
686 - Send and verify received packets on pg1 interface.
689 # Basic iACL testing with TCP and sport and dport
692 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
693 TCP(sport=sport, dport=dport))
694 self.pg0.add_stream(pkts)
696 key = 'proto_tcp_ports'
697 self.create_classify_table(
699 self.build_ip_mask(proto='ff', src_port='ffff', dst_port='ffff'))
700 self.create_classify_session(
701 self.acl_tbl_idx.get(key),
702 self.build_ip_match(proto=socket.IPPROTO_TCP, src_port=sport,
704 self.input_acl_set_interface(
705 self.pg0, self.acl_tbl_idx.get(key))
706 self.acl_active_table = key
708 self.pg_enable_capture(self.pg_interfaces)
711 pkts = self.pg1.get_capture(len(pkts))
712 self.verify_capture(self.pg1, pkts, TCP)
713 self.pg0.assert_nothing_captured(remark="packets forwarded")
714 self.pg2.assert_nothing_captured(remark="packets forwarded")
715 self.pg3.assert_nothing_captured(remark="packets forwarded")
718 class TestClassifierIPOut(TestClassifier):
719 """ Classifier output IP Test Case """
723 super(TestClassifierIPOut, cls).setUpClass()
726 def tearDownClass(cls):
727 super(TestClassifierIPOut, cls).tearDownClass()
729 def test_acl_ip_out(self):
730 """ Output IP ACL test
732 Test scenario for basic IP ACL with source IP
733 - Create IPv4 stream for pg1 -> pg0 interface.
734 - Create ACL with source IP address.
735 - Send and verify received packets on pg0 interface.
738 # Basic oACL testing with source IP
739 pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes)
740 self.pg1.add_stream(pkts)
743 self.create_classify_table(
744 key, self.build_ip_mask(src_ip='ffffffff'), data_offset=0)
745 self.create_classify_session(
746 self.acl_tbl_idx.get(key),
747 self.build_ip_match(src_ip=self.pg1.remote_ip4))
748 self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
749 self.acl_active_table = key
751 self.pg_enable_capture(self.pg_interfaces)
754 pkts = self.pg0.get_capture(len(pkts))
755 self.verify_capture(self.pg0, pkts)
756 self.pg1.assert_nothing_captured(remark="packets forwarded")
757 self.pg2.assert_nothing_captured(remark="packets forwarded")
758 self.pg3.assert_nothing_captured(remark="packets forwarded")
761 class TestClassifierMAC(TestClassifier):
762 """ Classifier MAC Test Case """
766 super(TestClassifierMAC, cls).setUpClass()
769 def tearDownClass(cls):
770 super(TestClassifierMAC, cls).tearDownClass()
772 def test_acl_mac(self):
775 Test scenario for basic MAC ACL with source MAC
776 - Create IPv4 stream for pg0 -> pg2 interface.
777 - Create ACL with source MAC address.
778 - Send and verify received packets on pg2 interface.
781 # Basic iACL testing with source MAC
782 pkts = self.create_stream(self.pg0, self.pg2, self.pg_if_packet_sizes)
783 self.pg0.add_stream(pkts)
786 self.create_classify_table(
787 key, self.build_mac_mask(src_mac='ffffffffffff'), data_offset=-14)
788 self.create_classify_session(
789 self.acl_tbl_idx.get(key),
790 self.build_mac_match(src_mac=self.pg0.remote_mac))
791 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
792 self.acl_active_table = key
794 self.pg_enable_capture(self.pg_interfaces)
797 pkts = self.pg2.get_capture(len(pkts))
798 self.verify_capture(self.pg2, pkts)
799 self.pg0.assert_nothing_captured(remark="packets forwarded")
800 self.pg1.assert_nothing_captured(remark="packets forwarded")
801 self.pg3.assert_nothing_captured(remark="packets forwarded")
804 class TestClassifierPBR(TestClassifier):
805 """ Classifier PBR Test Case """
809 super(TestClassifierPBR, cls).setUpClass()
812 def tearDownClass(cls):
813 super(TestClassifierPBR, cls).tearDownClass()
815 def test_acl_pbr(self):
818 Test scenario for PBR with source IP
819 - Create IPv4 stream for pg0 -> pg3 interface.
820 - Configure PBR fib entry for packet forwarding.
821 - Send and verify received packets on pg3 interface.
824 # PBR testing with source IP
825 pkts = self.create_stream(self.pg0, self.pg3, self.pg_if_packet_sizes)
826 self.pg0.add_stream(pkts)
829 self.create_classify_table(key, self.build_ip_mask(src_ip='ffffffff'))
831 # this will create the VRF/table in which we will insert the route
832 self.create_classify_session(
833 self.acl_tbl_idx.get(key),
834 self.build_ip_match(src_ip=self.pg0.remote_ip4),
835 pbr_option, self.pbr_vrfid)
836 self.assertTrue(self.verify_vrf(self.pbr_vrfid))
837 r = VppIpRoute(self, self.pg3.local_ip4, 24,
838 [VppRoutePath(self.pg3.remote_ip4,
840 table_id=self.pbr_vrfid)
843 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
845 self.pg_enable_capture(self.pg_interfaces)
848 pkts = self.pg3.get_capture(len(pkts))
849 self.verify_capture(self.pg3, pkts)
850 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key), 0)
851 self.pg0.assert_nothing_captured(remark="packets forwarded")
852 self.pg1.assert_nothing_captured(remark="packets forwarded")
853 self.pg2.assert_nothing_captured(remark="packets forwarded")
855 # remove the classify session and the route
856 r.remove_vpp_config()
857 self.create_classify_session(
858 self.acl_tbl_idx.get(key),
859 self.build_ip_match(src_ip=self.pg0.remote_ip4),
860 pbr_option, self.pbr_vrfid, is_add=0)
862 # and the table should be gone.
863 self.assertFalse(self.verify_vrf(self.pbr_vrfid))
865 if __name__ == '__main__':
866 unittest.main(testRunner=VppTestRunner)