7 from framework import VppTestCase, VppTestRunner
9 from scapy.packet import Raw
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP, UDP, TCP
15 class TestClassifier(VppTestCase):
16 """ Classifier Test Case """
21 Perform standard class setup (defined by class method setUpClass in
22 class VppTestCase) before running the test case, set test case related
23 variables and configure VPP.
25 super(TestClassifier, cls).setUpClass()
26 cls.acl_active_table = ''
29 def tearDownClass(cls):
30 super(TestClassifier, cls).tearDownClass()
34 Perform test setup before test case.
37 - create 4 pg interfaces
38 - untagged pg0/pg1/pg2 interface
39 pg0 -------> pg1 (IP ACL)
45 - put it into UP state
47 - resolve neighbor address using ARP
49 :ivar list interfaces: pg interfaces.
50 :ivar list pg_if_packet_sizes: packet sizes in test.
51 :ivar dict acl_tbl_idx: ACL table index.
52 :ivar int pbr_vrfid: VRF id for PBR test.
54 self.reset_packet_infos()
55 super(TestClassifier, self).setUp()
57 # create 4 pg interfaces
58 self.create_pg_interfaces(range(4))
60 # packet sizes to test
61 self.pg_if_packet_sizes = [64, 9018]
63 self.interfaces = list(self.pg_interfaces)
69 # setup all interfaces
70 for intf in self.interfaces:
76 """Run standard test teardown and acl related log."""
78 if self.acl_active_table == 'ip_out':
79 self.output_acl_set_interface(
80 self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
81 self.acl_active_table = ''
82 elif self.acl_active_table != '':
83 self.input_acl_set_interface(
84 self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
85 self.acl_active_table = ''
86 for intf in self.interfaces:
90 super(TestClassifier, self).tearDown()
92 def show_commands_at_teardown(self):
93 self.logger.info(self.vapi.ppcli("show inacl type ip4"))
94 self.logger.info(self.vapi.ppcli("show outacl type ip4"))
95 self.logger.info(self.vapi.cli("show classify table verbose"))
96 self.logger.info(self.vapi.cli("show ip fib"))
98 def config_pbr_fib_entry(self, intf, is_add=1):
99 """Configure fib entry to route traffic toward PBR VRF table
101 :param VppInterface intf: destination interface to be routed for PBR.
105 self.vapi.ip_add_del_route(dst_address=intf.local_ip4n,
106 dst_address_length=addr_len,
107 next_hop_address=intf.remote_ip4n,
108 table_id=self.pbr_vrfid, is_add=is_add)
110 def create_stream(self, src_if, dst_if, packet_sizes,
111 proto_l=UDP(sport=1234, dport=5678)):
112 """Create input packet stream for defined interfaces.
114 :param VppInterface src_if: Source Interface for packet stream.
115 :param VppInterface dst_if: Destination Interface for packet stream.
116 :param list packet_sizes: packet size to test.
117 :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
121 for size in packet_sizes:
122 info = self.create_packet_info(src_if, dst_if)
123 payload = self.info_to_payload(info)
124 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
125 IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) /
129 self.extend_packet(p, size)
133 def verify_capture(self, dst_if, capture, proto_l=UDP):
134 """Verify captured input packet stream for defined interface.
136 :param VppInterface dst_if: Interface to verify captured packet stream.
137 :param list capture: Captured packet stream.
138 :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
140 self.logger.info("Verifying capture on interface %s" % dst_if.name)
142 for i in self.interfaces:
143 last_info[i.sw_if_index] = None
144 dst_sw_if_index = dst_if.sw_if_index
145 for packet in capture:
147 ip_received = packet[IP]
148 proto_received = packet[proto_l]
149 payload_info = self.payload_to_info(packet[Raw])
150 packet_index = payload_info.index
151 self.assertEqual(payload_info.dst, dst_sw_if_index)
153 "Got packet on port %s: src=%u (id=%u)" %
154 (dst_if.name, payload_info.src, packet_index))
155 next_info = self.get_next_packet_info_for_interface2(
156 payload_info.src, dst_sw_if_index,
157 last_info[payload_info.src])
158 last_info[payload_info.src] = next_info
159 self.assertTrue(next_info is not None)
160 self.assertEqual(packet_index, next_info.index)
161 saved_packet = next_info.data
162 ip_saved = saved_packet[IP]
163 proto_saved = saved_packet[proto_l]
164 # Check standard fields
165 self.assertEqual(ip_received.src, ip_saved.src)
166 self.assertEqual(ip_received.dst, ip_saved.dst)
167 self.assertEqual(proto_received.sport, proto_saved.sport)
168 self.assertEqual(proto_received.dport, proto_saved.dport)
170 self.logger.error(ppp("Unexpected or invalid packet:", packet))
172 for i in self.interfaces:
173 remaining_packet = self.get_next_packet_info_for_interface2(
174 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
175 self.assertTrue(remaining_packet is None,
176 "Interface %s: Packet expected from interface %s "
177 "didn't arrive" % (dst_if.name, i.name))
179 def verify_vrf(self, vrf_id):
181 Check if the FIB table / VRF ID is configured.
183 :param int vrf_id: The FIB table / VRF ID to be verified.
184 :return: 1 if the FIB table / VRF ID is configured, otherwise return 0.
186 ip_fib_dump = self.vapi.ip_fib_dump()
188 for ip_fib_details in ip_fib_dump:
189 if ip_fib_details[2] == vrf_id:
192 self.logger.info("IPv4 VRF ID %d is not configured" % vrf_id)
195 self.logger.info("IPv4 VRF ID %d is configured" % vrf_id)
199 def build_ip_mask(proto='', src_ip='', dst_ip='',
200 src_port='', dst_port=''):
201 """Build IP ACL mask data with hexstring format.
203 :param str proto: protocol number <0-ff>
204 :param str src_ip: source ip address <0-ffffffff>
205 :param str dst_ip: destination ip address <0-ffffffff>
206 :param str src_port: source port number <0-ffff>
207 :param str dst_port: destination port number <0-ffff>
210 return ('{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}'.format(
211 proto, src_ip, dst_ip, src_port, dst_port)).rstrip('0')
214 def build_ip_match(proto=0, src_ip='', dst_ip='',
215 src_port=0, dst_port=0):
216 """Build IP ACL match data with hexstring format.
218 :param int proto: protocol number with valid option "x"
219 :param str src_ip: source ip address with format of "x.x.x.x"
220 :param str dst_ip: destination ip address with format of "x.x.x.x"
221 :param int src_port: source port number "x"
222 :param int dst_port: destination port number "x"
225 src_ip = binascii.hexlify(socket.inet_aton(src_ip))
227 dst_ip = binascii.hexlify(socket.inet_aton(dst_ip))
229 return ('{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}'.format(
230 hex(proto)[2:], src_ip, dst_ip, hex(src_port)[2:],
231 hex(dst_port)[2:])).rstrip('0')
234 def build_mac_mask(dst_mac='', src_mac='', ether_type=''):
235 """Build MAC ACL mask data with hexstring format.
237 :param str dst_mac: source MAC address <0-ffffffffffff>
238 :param str src_mac: destination MAC address <0-ffffffffffff>
239 :param str ether_type: ethernet type <0-ffff>
242 return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
243 dst_mac, src_mac, ether_type)).rstrip('0')
246 def build_mac_match(dst_mac='', src_mac='', ether_type=''):
247 """Build MAC ACL match data with hexstring format.
249 :param str dst_mac: source MAC address <x:x:x:x:x:x>
250 :param str src_mac: destination MAC address <x:x:x:x:x:x>
251 :param str ether_type: ethernet type <0-ffff>
254 dst_mac = dst_mac.replace(':', '')
256 src_mac = src_mac.replace(':', '')
258 return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
259 dst_mac, src_mac, ether_type)).rstrip('0')
261 def create_classify_table(self, key, mask, data_offset=0):
262 """Create Classify Table
264 :param str key: key for classify table (ex, ACL name).
265 :param str mask: mask value for interested traffic.
266 :param int data_offset:
268 r = self.vapi.classify_add_del_table(
270 mask=binascii.unhexlify(mask),
271 match_n_vectors=(len(mask) - 1) // 32 + 1,
274 current_data_offset=data_offset)
275 self.assertIsNotNone(r, 'No response msg for add_del_table')
276 self.acl_tbl_idx[key] = r.new_table_index
278 def create_classify_session(self, table_index, match, pbr_option=0,
280 """Create Classify Session
282 :param int table_index: table index to identify classify table.
283 :param str match: matched value for interested traffic.
284 :param int pbr_option: enable/disable PBR feature.
285 :param int vrfid: VRF id.
286 :param int is_add: option to configure classify session.
287 - create(1) or delete(0)
289 r = self.vapi.classify_add_del_session(
292 binascii.unhexlify(match),
296 self.assertIsNotNone(r, 'No response msg for add_del_session')
298 def input_acl_set_interface(self, intf, table_index, is_add=1):
299 """Configure Input ACL interface
301 :param VppInterface intf: Interface to apply Input ACL feature.
302 :param int table_index: table index to identify classify table.
303 :param int is_add: option to configure classify session.
304 - enable(1) or disable(0)
306 r = self.vapi.input_acl_set_interface(
309 ip4_table_index=table_index)
310 self.assertIsNotNone(r, 'No response msg for acl_set_interface')
312 def output_acl_set_interface(self, intf, table_index, is_add=1):
313 """Configure Output ACL interface
315 :param VppInterface intf: Interface to apply Output ACL feature.
316 :param int table_index: table index to identify classify table.
317 :param int is_add: option to configure classify session.
318 - enable(1) or disable(0)
320 r = self.vapi.output_acl_set_interface(
323 ip4_table_index=table_index)
324 self.assertIsNotNone(r, 'No response msg for acl_set_interface')
327 # Tests split to different test case classes because of issue reported in
329 class TestClassifierIP(TestClassifier):
330 """ Classifier IP Test Case """
334 super(TestClassifierIP, cls).setUpClass()
337 def tearDownClass(cls):
338 super(TestClassifierIP, cls).tearDownClass()
340 def test_iacl_src_ip(self):
341 """ Source IP iACL test
343 Test scenario for basic IP ACL with source IP
344 - Create IPv4 stream for pg0 -> pg1 interface.
345 - Create iACL with source IP address.
346 - Send and verify received packets on pg1 interface.
349 # Basic iACL testing with source IP
350 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
351 self.pg0.add_stream(pkts)
354 self.create_classify_table(key, self.build_ip_mask(src_ip='ffffffff'))
355 self.create_classify_session(
356 self.acl_tbl_idx.get(key),
357 self.build_ip_match(src_ip=self.pg0.remote_ip4))
358 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
359 self.acl_active_table = key
361 self.pg_enable_capture(self.pg_interfaces)
364 pkts = self.pg1.get_capture(len(pkts))
365 self.verify_capture(self.pg1, pkts)
366 self.pg0.assert_nothing_captured(remark="packets forwarded")
367 self.pg2.assert_nothing_captured(remark="packets forwarded")
368 self.pg3.assert_nothing_captured(remark="packets forwarded")
370 def test_iacl_dst_ip(self):
371 """ Destination IP iACL test
373 Test scenario for basic IP ACL with destination IP
374 - Create IPv4 stream for pg0 -> pg1 interface.
375 - Create iACL with destination IP address.
376 - Send and verify received packets on pg1 interface.
379 # Basic iACL testing with destination IP
380 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
381 self.pg0.add_stream(pkts)
384 self.create_classify_table(key, self.build_ip_mask(dst_ip='ffffffff'))
385 self.create_classify_session(
386 self.acl_tbl_idx.get(key),
387 self.build_ip_match(dst_ip=self.pg1.remote_ip4))
388 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
389 self.acl_active_table = key
391 self.pg_enable_capture(self.pg_interfaces)
394 pkts = self.pg1.get_capture(len(pkts))
395 self.verify_capture(self.pg1, pkts)
396 self.pg0.assert_nothing_captured(remark="packets forwarded")
397 self.pg2.assert_nothing_captured(remark="packets forwarded")
398 self.pg3.assert_nothing_captured(remark="packets forwarded")
400 def test_iacl_src_dst_ip(self):
401 """ Source and destination IP iACL test
403 Test scenario for basic IP ACL with source and destination IP
404 - Create IPv4 stream for pg0 -> pg1 interface.
405 - Create iACL with source and destination IP addresses.
406 - Send and verify received packets on pg1 interface.
409 # Basic iACL testing with source and destination IP
410 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
411 self.pg0.add_stream(pkts)
414 self.create_classify_table(
415 key, self.build_ip_mask(src_ip='ffffffff', dst_ip='ffffffff'))
416 self.create_classify_session(
417 self.acl_tbl_idx.get(key),
418 self.build_ip_match(src_ip=self.pg0.remote_ip4,
419 dst_ip=self.pg1.remote_ip4))
420 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
421 self.acl_active_table = key
423 self.pg_enable_capture(self.pg_interfaces)
426 pkts = self.pg1.get_capture(len(pkts))
427 self.verify_capture(self.pg1, pkts)
428 self.pg0.assert_nothing_captured(remark="packets forwarded")
429 self.pg2.assert_nothing_captured(remark="packets forwarded")
430 self.pg3.assert_nothing_captured(remark="packets forwarded")
433 class TestClassifierUDP(TestClassifier):
434 """ Classifier UDP proto Test Case """
438 super(TestClassifierUDP, cls).setUpClass()
441 def tearDownClass(cls):
442 super(TestClassifierUDP, cls).tearDownClass()
444 def test_iacl_proto_udp(self):
445 """ UDP protocol iACL test
447 Test scenario for basic protocol ACL with UDP protocol
448 - Create IPv4 stream for pg0 -> pg1 interface.
449 - Create iACL with UDP IP protocol.
450 - Send and verify received packets on pg1 interface.
453 # Basic iACL testing with UDP protocol
454 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
455 self.pg0.add_stream(pkts)
458 self.create_classify_table(key, self.build_ip_mask(proto='ff'))
459 self.create_classify_session(
460 self.acl_tbl_idx.get(key),
461 self.build_ip_match(proto=socket.IPPROTO_UDP))
462 self.input_acl_set_interface(
463 self.pg0, self.acl_tbl_idx.get(key))
464 self.acl_active_table = key
466 self.pg_enable_capture(self.pg_interfaces)
469 pkts = self.pg1.get_capture(len(pkts))
470 self.verify_capture(self.pg1, pkts)
471 self.pg0.assert_nothing_captured(remark="packets forwarded")
472 self.pg2.assert_nothing_captured(remark="packets forwarded")
473 self.pg3.assert_nothing_captured(remark="packets forwarded")
475 def test_iacl_proto_udp_sport(self):
476 """ UDP source port iACL test
478 Test scenario for basic protocol ACL with UDP and sport
479 - Create IPv4 stream for pg0 -> pg1 interface.
480 - Create iACL with UDP IP protocol and defined sport.
481 - Send and verify received packets on pg1 interface.
484 # Basic iACL testing with UDP and sport
486 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
487 UDP(sport=sport, dport=5678))
488 self.pg0.add_stream(pkts)
490 key = 'proto_udp_sport'
491 self.create_classify_table(
492 key, self.build_ip_mask(proto='ff', src_port='ffff'))
493 self.create_classify_session(
494 self.acl_tbl_idx.get(key),
495 self.build_ip_match(proto=socket.IPPROTO_UDP, src_port=sport))
496 self.input_acl_set_interface(
497 self.pg0, self.acl_tbl_idx.get(key))
498 self.acl_active_table = key
500 self.pg_enable_capture(self.pg_interfaces)
503 pkts = self.pg1.get_capture(len(pkts))
504 self.verify_capture(self.pg1, pkts)
505 self.pg0.assert_nothing_captured(remark="packets forwarded")
506 self.pg2.assert_nothing_captured(remark="packets forwarded")
507 self.pg3.assert_nothing_captured(remark="packets forwarded")
509 def test_iacl_proto_udp_dport(self):
510 """ UDP destination port iACL test
512 Test scenario for basic protocol ACL with UDP and dport
513 - Create IPv4 stream for pg0 -> pg1 interface.
514 - Create iACL with UDP IP protocol and defined dport.
515 - Send and verify received packets on pg1 interface.
518 # Basic iACL testing with UDP and dport
520 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
521 UDP(sport=1234, dport=dport))
522 self.pg0.add_stream(pkts)
524 key = 'proto_udp_dport'
525 self.create_classify_table(
526 key, self.build_ip_mask(proto='ff', dst_port='ffff'))
527 self.create_classify_session(
528 self.acl_tbl_idx.get(key),
529 self.build_ip_match(proto=socket.IPPROTO_UDP, dst_port=dport))
530 self.input_acl_set_interface(
531 self.pg0, self.acl_tbl_idx.get(key))
532 self.acl_active_table = key
534 self.pg_enable_capture(self.pg_interfaces)
537 pkts = self.pg1.get_capture(len(pkts))
538 self.verify_capture(self.pg1, pkts)
539 self.pg0.assert_nothing_captured(remark="packets forwarded")
540 self.pg2.assert_nothing_captured(remark="packets forwarded")
541 self.pg3.assert_nothing_captured(remark="packets forwarded")
543 def test_iacl_proto_udp_sport_dport(self):
544 """ UDP source and destination ports iACL test
546 Test scenario for basic protocol ACL with UDP and sport and dport
547 - Create IPv4 stream for pg0 -> pg1 interface.
548 - Create iACL with UDP IP protocol and defined sport and dport.
549 - Send and verify received packets on pg1 interface.
552 # Basic iACL testing with UDP and sport and dport
555 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
556 UDP(sport=sport, dport=dport))
557 self.pg0.add_stream(pkts)
559 key = 'proto_udp_ports'
560 self.create_classify_table(
562 self.build_ip_mask(proto='ff', src_port='ffff', dst_port='ffff'))
563 self.create_classify_session(
564 self.acl_tbl_idx.get(key),
565 self.build_ip_match(proto=socket.IPPROTO_UDP, src_port=sport,
567 self.input_acl_set_interface(
568 self.pg0, self.acl_tbl_idx.get(key))
569 self.acl_active_table = key
571 self.pg_enable_capture(self.pg_interfaces)
574 pkts = self.pg1.get_capture(len(pkts))
575 self.verify_capture(self.pg1, pkts)
576 self.pg0.assert_nothing_captured(remark="packets forwarded")
577 self.pg2.assert_nothing_captured(remark="packets forwarded")
578 self.pg3.assert_nothing_captured(remark="packets forwarded")
581 class TestClassifierTCP(TestClassifier):
582 """ Classifier TCP proto Test Case """
586 super(TestClassifierTCP, cls).setUpClass()
589 def tearDownClass(cls):
590 super(TestClassifierTCP, cls).tearDownClass()
592 def test_iacl_proto_tcp(self):
593 """ TCP protocol iACL test
595 Test scenario for basic protocol ACL with TCP protocol
596 - Create IPv4 stream for pg0 -> pg1 interface.
597 - Create iACL with TCP IP protocol.
598 - Send and verify received packets on pg1 interface.
601 # Basic iACL testing with TCP protocol
602 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
603 TCP(sport=1234, dport=5678))
604 self.pg0.add_stream(pkts)
607 self.create_classify_table(key, self.build_ip_mask(proto='ff'))
608 self.create_classify_session(
609 self.acl_tbl_idx.get(key),
610 self.build_ip_match(proto=socket.IPPROTO_TCP))
611 self.input_acl_set_interface(
612 self.pg0, self.acl_tbl_idx.get(key))
613 self.acl_active_table = key
615 self.pg_enable_capture(self.pg_interfaces)
618 pkts = self.pg1.get_capture(len(pkts))
619 self.verify_capture(self.pg1, pkts, TCP)
620 self.pg0.assert_nothing_captured(remark="packets forwarded")
621 self.pg2.assert_nothing_captured(remark="packets forwarded")
622 self.pg3.assert_nothing_captured(remark="packets forwarded")
624 def test_iacl_proto_tcp_sport(self):
625 """ TCP source port iACL test
627 Test scenario for basic protocol ACL with TCP and sport
628 - Create IPv4 stream for pg0 -> pg1 interface.
629 - Create iACL with TCP IP protocol and defined sport.
630 - Send and verify received packets on pg1 interface.
633 # Basic iACL testing with TCP and sport
635 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
636 TCP(sport=sport, dport=5678))
637 self.pg0.add_stream(pkts)
639 key = 'proto_tcp_sport'
640 self.create_classify_table(
641 key, self.build_ip_mask(proto='ff', src_port='ffff'))
642 self.create_classify_session(
643 self.acl_tbl_idx.get(key),
644 self.build_ip_match(proto=socket.IPPROTO_TCP, src_port=sport))
645 self.input_acl_set_interface(
646 self.pg0, self.acl_tbl_idx.get(key))
647 self.acl_active_table = key
649 self.pg_enable_capture(self.pg_interfaces)
652 pkts = self.pg1.get_capture(len(pkts))
653 self.verify_capture(self.pg1, pkts, TCP)
654 self.pg0.assert_nothing_captured(remark="packets forwarded")
655 self.pg2.assert_nothing_captured(remark="packets forwarded")
656 self.pg3.assert_nothing_captured(remark="packets forwarded")
658 def test_iacl_proto_tcp_dport(self):
659 """ TCP destination port iACL test
661 Test scenario for basic protocol ACL with TCP and dport
662 - Create IPv4 stream for pg0 -> pg1 interface.
663 - Create iACL with TCP IP protocol and defined dport.
664 - Send and verify received packets on pg1 interface.
667 # Basic iACL testing with TCP and dport
669 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
670 TCP(sport=1234, dport=dport))
671 self.pg0.add_stream(pkts)
673 key = 'proto_tcp_sport'
674 self.create_classify_table(
675 key, self.build_ip_mask(proto='ff', dst_port='ffff'))
676 self.create_classify_session(
677 self.acl_tbl_idx.get(key),
678 self.build_ip_match(proto=socket.IPPROTO_TCP, dst_port=dport))
679 self.input_acl_set_interface(
680 self.pg0, self.acl_tbl_idx.get(key))
681 self.acl_active_table = key
683 self.pg_enable_capture(self.pg_interfaces)
686 pkts = self.pg1.get_capture(len(pkts))
687 self.verify_capture(self.pg1, pkts, TCP)
688 self.pg0.assert_nothing_captured(remark="packets forwarded")
689 self.pg2.assert_nothing_captured(remark="packets forwarded")
690 self.pg3.assert_nothing_captured(remark="packets forwarded")
692 def test_iacl_proto_tcp_sport_dport(self):
693 """ TCP source and destination ports iACL test
695 Test scenario for basic protocol ACL with TCP and sport and dport
696 - Create IPv4 stream for pg0 -> pg1 interface.
697 - Create iACL with TCP IP protocol and defined sport and dport.
698 - Send and verify received packets on pg1 interface.
701 # Basic iACL testing with TCP and sport and dport
704 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
705 TCP(sport=sport, dport=dport))
706 self.pg0.add_stream(pkts)
708 key = 'proto_tcp_ports'
709 self.create_classify_table(
711 self.build_ip_mask(proto='ff', src_port='ffff', dst_port='ffff'))
712 self.create_classify_session(
713 self.acl_tbl_idx.get(key),
714 self.build_ip_match(proto=socket.IPPROTO_TCP, src_port=sport,
716 self.input_acl_set_interface(
717 self.pg0, self.acl_tbl_idx.get(key))
718 self.acl_active_table = key
720 self.pg_enable_capture(self.pg_interfaces)
723 pkts = self.pg1.get_capture(len(pkts))
724 self.verify_capture(self.pg1, pkts, TCP)
725 self.pg0.assert_nothing_captured(remark="packets forwarded")
726 self.pg2.assert_nothing_captured(remark="packets forwarded")
727 self.pg3.assert_nothing_captured(remark="packets forwarded")
730 class TestClassifierIPOut(TestClassifier):
731 """ Classifier output IP Test Case """
735 super(TestClassifierIPOut, cls).setUpClass()
738 def tearDownClass(cls):
739 super(TestClassifierIPOut, cls).tearDownClass()
741 def test_acl_ip_out(self):
742 """ Output IP ACL test
744 Test scenario for basic IP ACL with source IP
745 - Create IPv4 stream for pg1 -> pg0 interface.
746 - Create ACL with source IP address.
747 - Send and verify received packets on pg0 interface.
750 # Basic oACL testing with source IP
751 pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes)
752 self.pg1.add_stream(pkts)
755 self.create_classify_table(
756 key, self.build_ip_mask(src_ip='ffffffff'), data_offset=0)
757 self.create_classify_session(
758 self.acl_tbl_idx.get(key),
759 self.build_ip_match(src_ip=self.pg1.remote_ip4))
760 self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
761 self.acl_active_table = key
763 self.pg_enable_capture(self.pg_interfaces)
766 pkts = self.pg0.get_capture(len(pkts))
767 self.verify_capture(self.pg0, pkts)
768 self.pg1.assert_nothing_captured(remark="packets forwarded")
769 self.pg2.assert_nothing_captured(remark="packets forwarded")
770 self.pg3.assert_nothing_captured(remark="packets forwarded")
773 class TestClassifierMAC(TestClassifier):
774 """ Classifier MAC Test Case """
778 super(TestClassifierMAC, cls).setUpClass()
781 def tearDownClass(cls):
782 super(TestClassifierMAC, cls).tearDownClass()
784 def test_acl_mac(self):
787 Test scenario for basic MAC ACL with source MAC
788 - Create IPv4 stream for pg0 -> pg2 interface.
789 - Create ACL with source MAC address.
790 - Send and verify received packets on pg2 interface.
793 # Basic iACL testing with source MAC
794 pkts = self.create_stream(self.pg0, self.pg2, self.pg_if_packet_sizes)
795 self.pg0.add_stream(pkts)
798 self.create_classify_table(
799 key, self.build_mac_mask(src_mac='ffffffffffff'), data_offset=-14)
800 self.create_classify_session(
801 self.acl_tbl_idx.get(key),
802 self.build_mac_match(src_mac=self.pg0.remote_mac))
803 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
804 self.acl_active_table = key
806 self.pg_enable_capture(self.pg_interfaces)
809 pkts = self.pg2.get_capture(len(pkts))
810 self.verify_capture(self.pg2, pkts)
811 self.pg0.assert_nothing_captured(remark="packets forwarded")
812 self.pg1.assert_nothing_captured(remark="packets forwarded")
813 self.pg3.assert_nothing_captured(remark="packets forwarded")
816 class TestClassifierPBR(TestClassifier):
817 """ Classifier PBR Test Case """
821 super(TestClassifierPBR, cls).setUpClass()
824 def tearDownClass(cls):
825 super(TestClassifierPBR, cls).tearDownClass()
827 def test_acl_pbr(self):
830 Test scenario for PBR with source IP
831 - Create IPv4 stream for pg0 -> pg3 interface.
832 - Configure PBR fib entry for packet forwarding.
833 - Send and verify received packets on pg3 interface.
836 # PBR testing with source IP
837 pkts = self.create_stream(self.pg0, self.pg3, self.pg_if_packet_sizes)
838 self.pg0.add_stream(pkts)
841 self.create_classify_table(key, self.build_ip_mask(src_ip='ffffffff'))
843 # this will create the VRF/table in which we will insert the route
844 self.create_classify_session(
845 self.acl_tbl_idx.get(key),
846 self.build_ip_match(src_ip=self.pg0.remote_ip4),
847 pbr_option, self.pbr_vrfid)
848 self.assertTrue(self.verify_vrf(self.pbr_vrfid))
849 self.config_pbr_fib_entry(self.pg3)
850 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
852 self.pg_enable_capture(self.pg_interfaces)
855 pkts = self.pg3.get_capture(len(pkts))
856 self.verify_capture(self.pg3, pkts)
857 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key), 0)
858 self.pg0.assert_nothing_captured(remark="packets forwarded")
859 self.pg1.assert_nothing_captured(remark="packets forwarded")
860 self.pg2.assert_nothing_captured(remark="packets forwarded")
862 # remove the classify session and the route
863 self.config_pbr_fib_entry(self.pg3, is_add=0)
864 self.create_classify_session(
865 self.acl_tbl_idx.get(key),
866 self.build_ip_match(src_ip=self.pg0.remote_ip4),
867 pbr_option, self.pbr_vrfid, is_add=0)
869 # and the table should be gone.
870 self.assertFalse(self.verify_vrf(self.pbr_vrfid))
872 if __name__ == '__main__':
873 unittest.main(testRunner=VppTestRunner)