7 from framework import VppTestCase, VppTestRunner
9 from scapy.packet import Raw
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP, UDP, TCP
15 class TestClassifier(VppTestCase):
16 """ Classifier Test Case """
21 Perform standard class setup (defined by class method setUpClass in
22 class VppTestCase) before running the test case, set test case related
23 variables and configure VPP.
25 super(TestClassifier, cls).setUpClass()
26 cls.acl_active_table = ''
29 def tearDownClass(cls):
30 super(TestClassifier, cls).tearDownClass()
34 Perform test setup before test case.
37 - create 4 pg interfaces
38 - untagged pg0/pg1/pg2 interface
39 pg0 -------> pg1 (IP ACL)
45 - put it into UP state
47 - resolve neighbor address using ARP
49 :ivar list interfaces: pg interfaces.
50 :ivar list pg_if_packet_sizes: packet sizes in test.
51 :ivar dict acl_tbl_idx: ACL table index.
52 :ivar int pbr_vrfid: VRF id for PBR test.
54 self.reset_packet_infos()
55 super(TestClassifier, self).setUp()
57 # create 4 pg interfaces
58 self.create_pg_interfaces(range(4))
60 # packet sizes to test
61 self.pg_if_packet_sizes = [64, 9018]
63 self.interfaces = list(self.pg_interfaces)
69 # setup all interfaces
70 for intf in self.interfaces:
76 """Run standard test teardown and acl related log."""
78 self.logger.info(self.vapi.ppcli("show inacl type ip4"))
79 self.logger.info(self.vapi.ppcli("show outacl type ip4"))
80 self.logger.info(self.vapi.cli("show classify table verbose"))
81 self.logger.info(self.vapi.cli("show ip fib"))
82 if self.acl_active_table == 'ip_out':
83 self.output_acl_set_interface(
84 self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
85 self.acl_active_table = ''
86 elif self.acl_active_table != '':
87 self.input_acl_set_interface(
88 self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
89 self.acl_active_table = ''
90 for intf in self.interfaces:
94 super(TestClassifier, self).tearDown()
96 def config_pbr_fib_entry(self, intf, is_add=1):
97 """Configure fib entry to route traffic toward PBR VRF table
99 :param VppInterface intf: destination interface to be routed for PBR.
103 self.vapi.ip_add_del_route(dst_address=intf.local_ip4n,
104 dst_address_length=addr_len,
105 next_hop_address=intf.remote_ip4n,
106 table_id=self.pbr_vrfid, is_add=is_add)
108 def create_stream(self, src_if, dst_if, packet_sizes,
109 proto_l=UDP(sport=1234, dport=5678)):
110 """Create input packet stream for defined interfaces.
112 :param VppInterface src_if: Source Interface for packet stream.
113 :param VppInterface dst_if: Destination Interface for packet stream.
114 :param list packet_sizes: packet size to test.
115 :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
119 for size in packet_sizes:
120 info = self.create_packet_info(src_if, dst_if)
121 payload = self.info_to_payload(info)
122 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
123 IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) /
127 self.extend_packet(p, size)
131 def verify_capture(self, dst_if, capture, proto_l=UDP):
132 """Verify captured input packet stream for defined interface.
134 :param VppInterface dst_if: Interface to verify captured packet stream.
135 :param list capture: Captured packet stream.
136 :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
138 self.logger.info("Verifying capture on interface %s" % dst_if.name)
140 for i in self.interfaces:
141 last_info[i.sw_if_index] = None
142 dst_sw_if_index = dst_if.sw_if_index
143 for packet in capture:
145 ip_received = packet[IP]
146 proto_received = packet[proto_l]
147 payload_info = self.payload_to_info(packet[Raw])
148 packet_index = payload_info.index
149 self.assertEqual(payload_info.dst, dst_sw_if_index)
151 "Got packet on port %s: src=%u (id=%u)" %
152 (dst_if.name, payload_info.src, packet_index))
153 next_info = self.get_next_packet_info_for_interface2(
154 payload_info.src, dst_sw_if_index,
155 last_info[payload_info.src])
156 last_info[payload_info.src] = next_info
157 self.assertTrue(next_info is not None)
158 self.assertEqual(packet_index, next_info.index)
159 saved_packet = next_info.data
160 ip_saved = saved_packet[IP]
161 proto_saved = saved_packet[proto_l]
162 # Check standard fields
163 self.assertEqual(ip_received.src, ip_saved.src)
164 self.assertEqual(ip_received.dst, ip_saved.dst)
165 self.assertEqual(proto_received.sport, proto_saved.sport)
166 self.assertEqual(proto_received.dport, proto_saved.dport)
168 self.logger.error(ppp("Unexpected or invalid packet:", packet))
170 for i in self.interfaces:
171 remaining_packet = self.get_next_packet_info_for_interface2(
172 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
173 self.assertTrue(remaining_packet is None,
174 "Interface %s: Packet expected from interface %s "
175 "didn't arrive" % (dst_if.name, i.name))
177 def verify_vrf(self, vrf_id):
179 Check if the FIB table / VRF ID is configured.
181 :param int vrf_id: The FIB table / VRF ID to be verified.
182 :return: 1 if the FIB table / VRF ID is configured, otherwise return 0.
184 ip_fib_dump = self.vapi.ip_fib_dump()
186 for ip_fib_details in ip_fib_dump:
187 if ip_fib_details[2] == vrf_id:
190 self.logger.info("IPv4 VRF ID %d is not configured" % vrf_id)
193 self.logger.info("IPv4 VRF ID %d is configured" % vrf_id)
197 def build_ip_mask(proto='', src_ip='', dst_ip='',
198 src_port='', dst_port=''):
199 """Build IP ACL mask data with hexstring format.
201 :param str proto: protocol number <0-ff>
202 :param str src_ip: source ip address <0-ffffffff>
203 :param str dst_ip: destination ip address <0-ffffffff>
204 :param str src_port: source port number <0-ffff>
205 :param str dst_port: destination port number <0-ffff>
208 return ('{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}'.format(
209 proto, src_ip, dst_ip, src_port, dst_port)).rstrip('0')
212 def build_ip_match(proto=0, src_ip='', dst_ip='',
213 src_port=0, dst_port=0):
214 """Build IP ACL match data with hexstring format.
216 :param int proto: protocol number with valid option "x"
217 :param str src_ip: source ip address with format of "x.x.x.x"
218 :param str dst_ip: destination ip address with format of "x.x.x.x"
219 :param int src_port: source port number "x"
220 :param int dst_port: destination port number "x"
223 src_ip = binascii.hexlify(socket.inet_aton(src_ip))
225 dst_ip = binascii.hexlify(socket.inet_aton(dst_ip))
227 return ('{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}'.format(
228 hex(proto)[2:], src_ip, dst_ip, hex(src_port)[2:],
229 hex(dst_port)[2:])).rstrip('0')
232 def build_mac_mask(dst_mac='', src_mac='', ether_type=''):
233 """Build MAC ACL mask data with hexstring format.
235 :param str dst_mac: source MAC address <0-ffffffffffff>
236 :param str src_mac: destination MAC address <0-ffffffffffff>
237 :param str ether_type: ethernet type <0-ffff>
240 return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
241 dst_mac, src_mac, ether_type)).rstrip('0')
244 def build_mac_match(dst_mac='', src_mac='', ether_type=''):
245 """Build MAC ACL match data with hexstring format.
247 :param str dst_mac: source MAC address <x:x:x:x:x:x>
248 :param str src_mac: destination MAC address <x:x:x:x:x:x>
249 :param str ether_type: ethernet type <0-ffff>
252 dst_mac = dst_mac.replace(':', '')
254 src_mac = src_mac.replace(':', '')
256 return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
257 dst_mac, src_mac, ether_type)).rstrip('0')
259 def create_classify_table(self, key, mask, data_offset=0):
260 """Create Classify Table
262 :param str key: key for classify table (ex, ACL name).
263 :param str mask: mask value for interested traffic.
264 :param int data_offset:
266 r = self.vapi.classify_add_del_table(
268 mask=binascii.unhexlify(mask),
269 match_n_vectors=(len(mask) - 1) // 32 + 1,
272 current_data_offset=data_offset)
273 self.assertIsNotNone(r, 'No response msg for add_del_table')
274 self.acl_tbl_idx[key] = r.new_table_index
276 def create_classify_session(self, table_index, match, pbr_option=0,
278 """Create Classify Session
280 :param int table_index: table index to identify classify table.
281 :param str match: matched value for interested traffic.
282 :param int pbr_option: enable/disable PBR feature.
283 :param int vrfid: VRF id.
284 :param int is_add: option to configure classify session.
285 - create(1) or delete(0)
287 r = self.vapi.classify_add_del_session(
290 binascii.unhexlify(match),
294 self.assertIsNotNone(r, 'No response msg for add_del_session')
296 def input_acl_set_interface(self, intf, table_index, is_add=1):
297 """Configure Input ACL interface
299 :param VppInterface intf: Interface to apply Input ACL feature.
300 :param int table_index: table index to identify classify table.
301 :param int is_add: option to configure classify session.
302 - enable(1) or disable(0)
304 r = self.vapi.input_acl_set_interface(
307 ip4_table_index=table_index)
308 self.assertIsNotNone(r, 'No response msg for acl_set_interface')
310 def output_acl_set_interface(self, intf, table_index, is_add=1):
311 """Configure Output ACL interface
313 :param VppInterface intf: Interface to apply Output ACL feature.
314 :param int table_index: table index to identify classify table.
315 :param int is_add: option to configure classify session.
316 - enable(1) or disable(0)
318 r = self.vapi.output_acl_set_interface(
321 ip4_table_index=table_index)
322 self.assertIsNotNone(r, 'No response msg for acl_set_interface')
325 # Tests split to different test case classes because of issue reported in
327 class TestClassifierIP(TestClassifier):
328 """ Classifier IP Test Case """
332 super(TestClassifierIP, cls).setUpClass()
335 def tearDownClass(cls):
336 super(TestClassifierIP, cls).tearDownClass()
338 def test_iacl_src_ip(self):
339 """ Source IP iACL test
341 Test scenario for basic IP ACL with source IP
342 - Create IPv4 stream for pg0 -> pg1 interface.
343 - Create iACL with source IP address.
344 - Send and verify received packets on pg1 interface.
347 # Basic iACL testing with source IP
348 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
349 self.pg0.add_stream(pkts)
352 self.create_classify_table(key, self.build_ip_mask(src_ip='ffffffff'))
353 self.create_classify_session(
354 self.acl_tbl_idx.get(key),
355 self.build_ip_match(src_ip=self.pg0.remote_ip4))
356 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
357 self.acl_active_table = key
359 self.pg_enable_capture(self.pg_interfaces)
362 pkts = self.pg1.get_capture(len(pkts))
363 self.verify_capture(self.pg1, pkts)
364 self.pg0.assert_nothing_captured(remark="packets forwarded")
365 self.pg2.assert_nothing_captured(remark="packets forwarded")
366 self.pg3.assert_nothing_captured(remark="packets forwarded")
368 def test_iacl_dst_ip(self):
369 """ Destination IP iACL test
371 Test scenario for basic IP ACL with destination IP
372 - Create IPv4 stream for pg0 -> pg1 interface.
373 - Create iACL with destination IP address.
374 - Send and verify received packets on pg1 interface.
377 # Basic iACL testing with destination IP
378 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
379 self.pg0.add_stream(pkts)
382 self.create_classify_table(key, self.build_ip_mask(dst_ip='ffffffff'))
383 self.create_classify_session(
384 self.acl_tbl_idx.get(key),
385 self.build_ip_match(dst_ip=self.pg1.remote_ip4))
386 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
387 self.acl_active_table = key
389 self.pg_enable_capture(self.pg_interfaces)
392 pkts = self.pg1.get_capture(len(pkts))
393 self.verify_capture(self.pg1, pkts)
394 self.pg0.assert_nothing_captured(remark="packets forwarded")
395 self.pg2.assert_nothing_captured(remark="packets forwarded")
396 self.pg3.assert_nothing_captured(remark="packets forwarded")
398 def test_iacl_src_dst_ip(self):
399 """ Source and destination IP iACL test
401 Test scenario for basic IP ACL with source and destination IP
402 - Create IPv4 stream for pg0 -> pg1 interface.
403 - Create iACL with source and destination IP addresses.
404 - Send and verify received packets on pg1 interface.
407 # Basic iACL testing with source and destination IP
408 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
409 self.pg0.add_stream(pkts)
412 self.create_classify_table(
413 key, self.build_ip_mask(src_ip='ffffffff', dst_ip='ffffffff'))
414 self.create_classify_session(
415 self.acl_tbl_idx.get(key),
416 self.build_ip_match(src_ip=self.pg0.remote_ip4,
417 dst_ip=self.pg1.remote_ip4))
418 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
419 self.acl_active_table = key
421 self.pg_enable_capture(self.pg_interfaces)
424 pkts = self.pg1.get_capture(len(pkts))
425 self.verify_capture(self.pg1, pkts)
426 self.pg0.assert_nothing_captured(remark="packets forwarded")
427 self.pg2.assert_nothing_captured(remark="packets forwarded")
428 self.pg3.assert_nothing_captured(remark="packets forwarded")
431 class TestClassifierUDP(TestClassifier):
432 """ Classifier UDP proto Test Case """
436 super(TestClassifierUDP, cls).setUpClass()
439 def tearDownClass(cls):
440 super(TestClassifierUDP, cls).tearDownClass()
442 def test_iacl_proto_udp(self):
443 """ UDP protocol iACL test
445 Test scenario for basic protocol ACL with UDP protocol
446 - Create IPv4 stream for pg0 -> pg1 interface.
447 - Create iACL with UDP IP protocol.
448 - Send and verify received packets on pg1 interface.
451 # Basic iACL testing with UDP protocol
452 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
453 self.pg0.add_stream(pkts)
456 self.create_classify_table(key, self.build_ip_mask(proto='ff'))
457 self.create_classify_session(
458 self.acl_tbl_idx.get(key),
459 self.build_ip_match(proto=socket.IPPROTO_UDP))
460 self.input_acl_set_interface(
461 self.pg0, self.acl_tbl_idx.get(key))
462 self.acl_active_table = key
464 self.pg_enable_capture(self.pg_interfaces)
467 pkts = self.pg1.get_capture(len(pkts))
468 self.verify_capture(self.pg1, pkts)
469 self.pg0.assert_nothing_captured(remark="packets forwarded")
470 self.pg2.assert_nothing_captured(remark="packets forwarded")
471 self.pg3.assert_nothing_captured(remark="packets forwarded")
473 def test_iacl_proto_udp_sport(self):
474 """ UDP source port iACL test
476 Test scenario for basic protocol ACL with UDP and sport
477 - Create IPv4 stream for pg0 -> pg1 interface.
478 - Create iACL with UDP IP protocol and defined sport.
479 - Send and verify received packets on pg1 interface.
482 # Basic iACL testing with UDP and sport
484 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
485 UDP(sport=sport, dport=5678))
486 self.pg0.add_stream(pkts)
488 key = 'proto_udp_sport'
489 self.create_classify_table(
490 key, self.build_ip_mask(proto='ff', src_port='ffff'))
491 self.create_classify_session(
492 self.acl_tbl_idx.get(key),
493 self.build_ip_match(proto=socket.IPPROTO_UDP, src_port=sport))
494 self.input_acl_set_interface(
495 self.pg0, self.acl_tbl_idx.get(key))
496 self.acl_active_table = key
498 self.pg_enable_capture(self.pg_interfaces)
501 pkts = self.pg1.get_capture(len(pkts))
502 self.verify_capture(self.pg1, pkts)
503 self.pg0.assert_nothing_captured(remark="packets forwarded")
504 self.pg2.assert_nothing_captured(remark="packets forwarded")
505 self.pg3.assert_nothing_captured(remark="packets forwarded")
507 def test_iacl_proto_udp_dport(self):
508 """ UDP destination port iACL test
510 Test scenario for basic protocol ACL with UDP and dport
511 - Create IPv4 stream for pg0 -> pg1 interface.
512 - Create iACL with UDP IP protocol and defined dport.
513 - Send and verify received packets on pg1 interface.
516 # Basic iACL testing with UDP and dport
518 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
519 UDP(sport=1234, dport=dport))
520 self.pg0.add_stream(pkts)
522 key = 'proto_udp_dport'
523 self.create_classify_table(
524 key, self.build_ip_mask(proto='ff', dst_port='ffff'))
525 self.create_classify_session(
526 self.acl_tbl_idx.get(key),
527 self.build_ip_match(proto=socket.IPPROTO_UDP, dst_port=dport))
528 self.input_acl_set_interface(
529 self.pg0, self.acl_tbl_idx.get(key))
530 self.acl_active_table = key
532 self.pg_enable_capture(self.pg_interfaces)
535 pkts = self.pg1.get_capture(len(pkts))
536 self.verify_capture(self.pg1, pkts)
537 self.pg0.assert_nothing_captured(remark="packets forwarded")
538 self.pg2.assert_nothing_captured(remark="packets forwarded")
539 self.pg3.assert_nothing_captured(remark="packets forwarded")
541 def test_iacl_proto_udp_sport_dport(self):
542 """ UDP source and destination ports iACL test
544 Test scenario for basic protocol ACL with UDP and sport and dport
545 - Create IPv4 stream for pg0 -> pg1 interface.
546 - Create iACL with UDP IP protocol and defined sport and dport.
547 - Send and verify received packets on pg1 interface.
550 # Basic iACL testing with UDP and sport and dport
553 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
554 UDP(sport=sport, dport=dport))
555 self.pg0.add_stream(pkts)
557 key = 'proto_udp_ports'
558 self.create_classify_table(
560 self.build_ip_mask(proto='ff', src_port='ffff', dst_port='ffff'))
561 self.create_classify_session(
562 self.acl_tbl_idx.get(key),
563 self.build_ip_match(proto=socket.IPPROTO_UDP, src_port=sport,
565 self.input_acl_set_interface(
566 self.pg0, self.acl_tbl_idx.get(key))
567 self.acl_active_table = key
569 self.pg_enable_capture(self.pg_interfaces)
572 pkts = self.pg1.get_capture(len(pkts))
573 self.verify_capture(self.pg1, pkts)
574 self.pg0.assert_nothing_captured(remark="packets forwarded")
575 self.pg2.assert_nothing_captured(remark="packets forwarded")
576 self.pg3.assert_nothing_captured(remark="packets forwarded")
579 class TestClassifierTCP(TestClassifier):
580 """ Classifier TCP proto Test Case """
584 super(TestClassifierTCP, cls).setUpClass()
587 def tearDownClass(cls):
588 super(TestClassifierTCP, cls).tearDownClass()
590 def test_iacl_proto_tcp(self):
591 """ TCP protocol iACL test
593 Test scenario for basic protocol ACL with TCP protocol
594 - Create IPv4 stream for pg0 -> pg1 interface.
595 - Create iACL with TCP IP protocol.
596 - Send and verify received packets on pg1 interface.
599 # Basic iACL testing with TCP protocol
600 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
601 TCP(sport=1234, dport=5678))
602 self.pg0.add_stream(pkts)
605 self.create_classify_table(key, self.build_ip_mask(proto='ff'))
606 self.create_classify_session(
607 self.acl_tbl_idx.get(key),
608 self.build_ip_match(proto=socket.IPPROTO_TCP))
609 self.input_acl_set_interface(
610 self.pg0, self.acl_tbl_idx.get(key))
611 self.acl_active_table = key
613 self.pg_enable_capture(self.pg_interfaces)
616 pkts = self.pg1.get_capture(len(pkts))
617 self.verify_capture(self.pg1, pkts, TCP)
618 self.pg0.assert_nothing_captured(remark="packets forwarded")
619 self.pg2.assert_nothing_captured(remark="packets forwarded")
620 self.pg3.assert_nothing_captured(remark="packets forwarded")
622 def test_iacl_proto_tcp_sport(self):
623 """ TCP source port iACL test
625 Test scenario for basic protocol ACL with TCP and sport
626 - Create IPv4 stream for pg0 -> pg1 interface.
627 - Create iACL with TCP IP protocol and defined sport.
628 - Send and verify received packets on pg1 interface.
631 # Basic iACL testing with TCP and sport
633 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
634 TCP(sport=sport, dport=5678))
635 self.pg0.add_stream(pkts)
637 key = 'proto_tcp_sport'
638 self.create_classify_table(
639 key, self.build_ip_mask(proto='ff', src_port='ffff'))
640 self.create_classify_session(
641 self.acl_tbl_idx.get(key),
642 self.build_ip_match(proto=socket.IPPROTO_TCP, src_port=sport))
643 self.input_acl_set_interface(
644 self.pg0, self.acl_tbl_idx.get(key))
645 self.acl_active_table = key
647 self.pg_enable_capture(self.pg_interfaces)
650 pkts = self.pg1.get_capture(len(pkts))
651 self.verify_capture(self.pg1, pkts, TCP)
652 self.pg0.assert_nothing_captured(remark="packets forwarded")
653 self.pg2.assert_nothing_captured(remark="packets forwarded")
654 self.pg3.assert_nothing_captured(remark="packets forwarded")
656 def test_iacl_proto_tcp_dport(self):
657 """ TCP destination port iACL test
659 Test scenario for basic protocol ACL with TCP and dport
660 - Create IPv4 stream for pg0 -> pg1 interface.
661 - Create iACL with TCP IP protocol and defined dport.
662 - Send and verify received packets on pg1 interface.
665 # Basic iACL testing with TCP and dport
667 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
668 TCP(sport=1234, dport=dport))
669 self.pg0.add_stream(pkts)
671 key = 'proto_tcp_sport'
672 self.create_classify_table(
673 key, self.build_ip_mask(proto='ff', dst_port='ffff'))
674 self.create_classify_session(
675 self.acl_tbl_idx.get(key),
676 self.build_ip_match(proto=socket.IPPROTO_TCP, dst_port=dport))
677 self.input_acl_set_interface(
678 self.pg0, self.acl_tbl_idx.get(key))
679 self.acl_active_table = key
681 self.pg_enable_capture(self.pg_interfaces)
684 pkts = self.pg1.get_capture(len(pkts))
685 self.verify_capture(self.pg1, pkts, TCP)
686 self.pg0.assert_nothing_captured(remark="packets forwarded")
687 self.pg2.assert_nothing_captured(remark="packets forwarded")
688 self.pg3.assert_nothing_captured(remark="packets forwarded")
690 def test_iacl_proto_tcp_sport_dport(self):
691 """ TCP source and destination ports iACL test
693 Test scenario for basic protocol ACL with TCP and sport and dport
694 - Create IPv4 stream for pg0 -> pg1 interface.
695 - Create iACL with TCP IP protocol and defined sport and dport.
696 - Send and verify received packets on pg1 interface.
699 # Basic iACL testing with TCP and sport and dport
702 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
703 TCP(sport=sport, dport=dport))
704 self.pg0.add_stream(pkts)
706 key = 'proto_tcp_ports'
707 self.create_classify_table(
709 self.build_ip_mask(proto='ff', src_port='ffff', dst_port='ffff'))
710 self.create_classify_session(
711 self.acl_tbl_idx.get(key),
712 self.build_ip_match(proto=socket.IPPROTO_TCP, src_port=sport,
714 self.input_acl_set_interface(
715 self.pg0, self.acl_tbl_idx.get(key))
716 self.acl_active_table = key
718 self.pg_enable_capture(self.pg_interfaces)
721 pkts = self.pg1.get_capture(len(pkts))
722 self.verify_capture(self.pg1, pkts, TCP)
723 self.pg0.assert_nothing_captured(remark="packets forwarded")
724 self.pg2.assert_nothing_captured(remark="packets forwarded")
725 self.pg3.assert_nothing_captured(remark="packets forwarded")
728 class TestClassifierIPOut(TestClassifier):
729 """ Classifier output IP Test Case """
733 super(TestClassifierIPOut, cls).setUpClass()
736 def tearDownClass(cls):
737 super(TestClassifierIPOut, cls).tearDownClass()
739 def test_acl_ip_out(self):
740 """ Output IP ACL test
742 Test scenario for basic IP ACL with source IP
743 - Create IPv4 stream for pg1 -> pg0 interface.
744 - Create ACL with source IP address.
745 - Send and verify received packets on pg0 interface.
748 # Basic oACL testing with source IP
749 pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes)
750 self.pg1.add_stream(pkts)
753 self.create_classify_table(
754 key, self.build_ip_mask(src_ip='ffffffff'), data_offset=0)
755 self.create_classify_session(
756 self.acl_tbl_idx.get(key),
757 self.build_ip_match(src_ip=self.pg1.remote_ip4))
758 self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
759 self.acl_active_table = key
761 self.pg_enable_capture(self.pg_interfaces)
764 pkts = self.pg0.get_capture(len(pkts))
765 self.verify_capture(self.pg0, pkts)
766 self.pg1.assert_nothing_captured(remark="packets forwarded")
767 self.pg2.assert_nothing_captured(remark="packets forwarded")
768 self.pg3.assert_nothing_captured(remark="packets forwarded")
771 class TestClassifierMAC(TestClassifier):
772 """ Classifier MAC Test Case """
776 super(TestClassifierMAC, cls).setUpClass()
779 def tearDownClass(cls):
780 super(TestClassifierMAC, cls).tearDownClass()
782 def test_acl_mac(self):
785 Test scenario for basic MAC ACL with source MAC
786 - Create IPv4 stream for pg0 -> pg2 interface.
787 - Create ACL with source MAC address.
788 - Send and verify received packets on pg2 interface.
791 # Basic iACL testing with source MAC
792 pkts = self.create_stream(self.pg0, self.pg2, self.pg_if_packet_sizes)
793 self.pg0.add_stream(pkts)
796 self.create_classify_table(
797 key, self.build_mac_mask(src_mac='ffffffffffff'), data_offset=-14)
798 self.create_classify_session(
799 self.acl_tbl_idx.get(key),
800 self.build_mac_match(src_mac=self.pg0.remote_mac))
801 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
802 self.acl_active_table = key
804 self.pg_enable_capture(self.pg_interfaces)
807 pkts = self.pg2.get_capture(len(pkts))
808 self.verify_capture(self.pg2, pkts)
809 self.pg0.assert_nothing_captured(remark="packets forwarded")
810 self.pg1.assert_nothing_captured(remark="packets forwarded")
811 self.pg3.assert_nothing_captured(remark="packets forwarded")
814 class TestClassifierPBR(TestClassifier):
815 """ Classifier PBR Test Case """
819 super(TestClassifierPBR, cls).setUpClass()
822 def tearDownClass(cls):
823 super(TestClassifierPBR, cls).tearDownClass()
825 def test_acl_pbr(self):
828 Test scenario for PBR with source IP
829 - Create IPv4 stream for pg0 -> pg3 interface.
830 - Configure PBR fib entry for packet forwarding.
831 - Send and verify received packets on pg3 interface.
834 # PBR testing with source IP
835 pkts = self.create_stream(self.pg0, self.pg3, self.pg_if_packet_sizes)
836 self.pg0.add_stream(pkts)
839 self.create_classify_table(key, self.build_ip_mask(src_ip='ffffffff'))
841 # this will create the VRF/table in which we will insert the route
842 self.create_classify_session(
843 self.acl_tbl_idx.get(key),
844 self.build_ip_match(src_ip=self.pg0.remote_ip4),
845 pbr_option, self.pbr_vrfid)
846 self.assertTrue(self.verify_vrf(self.pbr_vrfid))
847 self.config_pbr_fib_entry(self.pg3)
848 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
850 self.pg_enable_capture(self.pg_interfaces)
853 pkts = self.pg3.get_capture(len(pkts))
854 self.verify_capture(self.pg3, pkts)
855 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key), 0)
856 self.pg0.assert_nothing_captured(remark="packets forwarded")
857 self.pg1.assert_nothing_captured(remark="packets forwarded")
858 self.pg2.assert_nothing_captured(remark="packets forwarded")
860 # remove the classify session and the route
861 self.config_pbr_fib_entry(self.pg3, is_add=0)
862 self.create_classify_session(
863 self.acl_tbl_idx.get(key),
864 self.build_ip_match(src_ip=self.pg0.remote_ip4),
865 pbr_option, self.pbr_vrfid, is_add=0)
867 # and the table should be gone.
868 self.assertFalse(self.verify_vrf(self.pbr_vrfid))
870 if __name__ == '__main__':
871 unittest.main(testRunner=VppTestRunner)