7 from framework import VppTestCase, VppTestRunner
9 from scapy.packet import Raw
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP, UDP, TCP
15 class TestClassifier(VppTestCase):
16 """ Classifier Test Case """
21 Perform standard class setup (defined by class method setUpClass in
22 class VppTestCase) before running the test case, set test case related
23 variables and configure VPP.
25 super(TestClassifier, cls).setUpClass()
26 cls.acl_active_table = ''
30 Perform test setup before test case.
33 - create 4 pg interfaces
34 - untagged pg0/pg1/pg2 interface
35 pg0 -------> pg1 (IP ACL)
41 - put it into UP state
43 - resolve neighbor address using ARP
45 :ivar list interfaces: pg interfaces.
46 :ivar list pg_if_packet_sizes: packet sizes in test.
47 :ivar dict acl_tbl_idx: ACL table index.
48 :ivar int pbr_vrfid: VRF id for PBR test.
50 self.reset_packet_infos()
51 super(TestClassifier, self).setUp()
53 # create 4 pg interfaces
54 self.create_pg_interfaces(range(4))
56 # packet sizes to test
57 self.pg_if_packet_sizes = [64, 9018]
59 self.interfaces = list(self.pg_interfaces)
65 # setup all interfaces
66 for intf in self.interfaces:
72 """Run standard test teardown and acl related log."""
74 self.logger.info(self.vapi.ppcli("show inacl type ip4"))
75 self.logger.info(self.vapi.ppcli("show outacl type ip4"))
76 self.logger.info(self.vapi.cli("show classify table verbose"))
77 self.logger.info(self.vapi.cli("show ip fib"))
78 if self.acl_active_table == 'ip_out':
79 self.output_acl_set_interface(
80 self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
81 self.acl_active_table = ''
82 elif self.acl_active_table != '':
83 self.input_acl_set_interface(
84 self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
85 self.acl_active_table = ''
86 for intf in self.interfaces:
90 super(TestClassifier, self).tearDown()
92 def config_pbr_fib_entry(self, intf, is_add=1):
93 """Configure fib entry to route traffic toward PBR VRF table
95 :param VppInterface intf: destination interface to be routed for PBR.
99 self.vapi.ip_add_del_route(dst_address=intf.local_ip4n,
100 dst_address_length=addr_len,
101 next_hop_address=intf.remote_ip4n,
102 table_id=self.pbr_vrfid, is_add=is_add)
104 def create_stream(self, src_if, dst_if, packet_sizes,
105 proto_l=UDP(sport=1234, dport=5678)):
106 """Create input packet stream for defined interfaces.
108 :param VppInterface src_if: Source Interface for packet stream.
109 :param VppInterface dst_if: Destination Interface for packet stream.
110 :param list packet_sizes: packet size to test.
111 :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
115 for size in packet_sizes:
116 info = self.create_packet_info(src_if, dst_if)
117 payload = self.info_to_payload(info)
118 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
119 IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) /
123 self.extend_packet(p, size)
127 def verify_capture(self, dst_if, capture, proto_l=UDP):
128 """Verify captured input packet stream for defined interface.
130 :param VppInterface dst_if: Interface to verify captured packet stream.
131 :param list capture: Captured packet stream.
132 :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
134 self.logger.info("Verifying capture on interface %s" % dst_if.name)
136 for i in self.interfaces:
137 last_info[i.sw_if_index] = None
138 dst_sw_if_index = dst_if.sw_if_index
139 for packet in capture:
141 ip_received = packet[IP]
142 proto_received = packet[proto_l]
143 payload_info = self.payload_to_info(packet[Raw])
144 packet_index = payload_info.index
145 self.assertEqual(payload_info.dst, dst_sw_if_index)
147 "Got packet on port %s: src=%u (id=%u)" %
148 (dst_if.name, payload_info.src, packet_index))
149 next_info = self.get_next_packet_info_for_interface2(
150 payload_info.src, dst_sw_if_index,
151 last_info[payload_info.src])
152 last_info[payload_info.src] = next_info
153 self.assertTrue(next_info is not None)
154 self.assertEqual(packet_index, next_info.index)
155 saved_packet = next_info.data
156 ip_saved = saved_packet[IP]
157 proto_saved = saved_packet[proto_l]
158 # Check standard fields
159 self.assertEqual(ip_received.src, ip_saved.src)
160 self.assertEqual(ip_received.dst, ip_saved.dst)
161 self.assertEqual(proto_received.sport, proto_saved.sport)
162 self.assertEqual(proto_received.dport, proto_saved.dport)
164 self.logger.error(ppp("Unexpected or invalid packet:", packet))
166 for i in self.interfaces:
167 remaining_packet = self.get_next_packet_info_for_interface2(
168 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
169 self.assertTrue(remaining_packet is None,
170 "Interface %s: Packet expected from interface %s "
171 "didn't arrive" % (dst_if.name, i.name))
173 def verify_vrf(self, vrf_id):
175 Check if the FIB table / VRF ID is configured.
177 :param int vrf_id: The FIB table / VRF ID to be verified.
178 :return: 1 if the FIB table / VRF ID is configured, otherwise return 0.
180 ip_fib_dump = self.vapi.ip_fib_dump()
182 for ip_fib_details in ip_fib_dump:
183 if ip_fib_details[2] == vrf_id:
186 self.logger.info("IPv4 VRF ID %d is not configured" % vrf_id)
189 self.logger.info("IPv4 VRF ID %d is configured" % vrf_id)
193 def build_ip_mask(proto='', src_ip='', dst_ip='',
194 src_port='', dst_port=''):
195 """Build IP ACL mask data with hexstring format.
197 :param str proto: protocol number <0-ff>
198 :param str src_ip: source ip address <0-ffffffff>
199 :param str dst_ip: destination ip address <0-ffffffff>
200 :param str src_port: source port number <0-ffff>
201 :param str dst_port: destination port number <0-ffff>
204 return ('{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}'.format(
205 proto, src_ip, dst_ip, src_port, dst_port)).rstrip('0')
208 def build_ip_match(proto=0, src_ip='', dst_ip='',
209 src_port=0, dst_port=0):
210 """Build IP ACL match data with hexstring format.
212 :param int proto: protocol number with valid option "x"
213 :param str src_ip: source ip address with format of "x.x.x.x"
214 :param str dst_ip: destination ip address with format of "x.x.x.x"
215 :param int src_port: source port number "x"
216 :param int dst_port: destination port number "x"
219 src_ip = binascii.hexlify(socket.inet_aton(src_ip))
221 dst_ip = binascii.hexlify(socket.inet_aton(dst_ip))
223 return ('{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}'.format(
224 hex(proto)[2:], src_ip, dst_ip, hex(src_port)[2:],
225 hex(dst_port)[2:])).rstrip('0')
228 def build_mac_mask(dst_mac='', src_mac='', ether_type=''):
229 """Build MAC ACL mask data with hexstring format.
231 :param str dst_mac: source MAC address <0-ffffffffffff>
232 :param str src_mac: destination MAC address <0-ffffffffffff>
233 :param str ether_type: ethernet type <0-ffff>
236 return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
237 dst_mac, src_mac, ether_type)).rstrip('0')
240 def build_mac_match(dst_mac='', src_mac='', ether_type=''):
241 """Build MAC ACL match data with hexstring format.
243 :param str dst_mac: source MAC address <x:x:x:x:x:x>
244 :param str src_mac: destination MAC address <x:x:x:x:x:x>
245 :param str ether_type: ethernet type <0-ffff>
248 dst_mac = dst_mac.replace(':', '')
250 src_mac = src_mac.replace(':', '')
252 return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
253 dst_mac, src_mac, ether_type)).rstrip('0')
255 def create_classify_table(self, key, mask, data_offset=0):
256 """Create Classify Table
258 :param str key: key for classify table (ex, ACL name).
259 :param str mask: mask value for interested traffic.
260 :param int data_offset:
262 r = self.vapi.classify_add_del_table(
264 mask=binascii.unhexlify(mask),
265 match_n_vectors=(len(mask) - 1) // 32 + 1,
268 current_data_offset=data_offset)
269 self.assertIsNotNone(r, 'No response msg for add_del_table')
270 self.acl_tbl_idx[key] = r.new_table_index
272 def create_classify_session(self, table_index, match, pbr_option=0,
274 """Create Classify Session
276 :param int table_index: table index to identify classify table.
277 :param str match: matched value for interested traffic.
278 :param int pbr_option: enable/disable PBR feature.
279 :param int vrfid: VRF id.
280 :param int is_add: option to configure classify session.
281 - create(1) or delete(0)
283 r = self.vapi.classify_add_del_session(
286 binascii.unhexlify(match),
290 self.assertIsNotNone(r, 'No response msg for add_del_session')
292 def input_acl_set_interface(self, intf, table_index, is_add=1):
293 """Configure Input ACL interface
295 :param VppInterface intf: Interface to apply Input ACL feature.
296 :param int table_index: table index to identify classify table.
297 :param int is_add: option to configure classify session.
298 - enable(1) or disable(0)
300 r = self.vapi.input_acl_set_interface(
303 ip4_table_index=table_index)
304 self.assertIsNotNone(r, 'No response msg for acl_set_interface')
306 def output_acl_set_interface(self, intf, table_index, is_add=1):
307 """Configure Output ACL interface
309 :param VppInterface intf: Interface to apply Output ACL feature.
310 :param int table_index: table index to identify classify table.
311 :param int is_add: option to configure classify session.
312 - enable(1) or disable(0)
314 r = self.vapi.output_acl_set_interface(
317 ip4_table_index=table_index)
318 self.assertIsNotNone(r, 'No response msg for acl_set_interface')
321 # Tests split to different test case classes because of issue reported in
323 class TestClassifierIP(TestClassifier):
324 """ Classifier IP Test Case """
326 def test_iacl_src_ip(self):
327 """ Source IP iACL test
329 Test scenario for basic IP ACL with source IP
330 - Create IPv4 stream for pg0 -> pg1 interface.
331 - Create iACL with source IP address.
332 - Send and verify received packets on pg1 interface.
335 # Basic iACL testing with source IP
336 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
337 self.pg0.add_stream(pkts)
340 self.create_classify_table(key, self.build_ip_mask(src_ip='ffffffff'))
341 self.create_classify_session(
342 self.acl_tbl_idx.get(key),
343 self.build_ip_match(src_ip=self.pg0.remote_ip4))
344 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
345 self.acl_active_table = key
347 self.pg_enable_capture(self.pg_interfaces)
350 pkts = self.pg1.get_capture(len(pkts))
351 self.verify_capture(self.pg1, pkts)
352 self.pg0.assert_nothing_captured(remark="packets forwarded")
353 self.pg2.assert_nothing_captured(remark="packets forwarded")
354 self.pg3.assert_nothing_captured(remark="packets forwarded")
356 def test_iacl_dst_ip(self):
357 """ Destination IP iACL test
359 Test scenario for basic IP ACL with destination IP
360 - Create IPv4 stream for pg0 -> pg1 interface.
361 - Create iACL with destination IP address.
362 - Send and verify received packets on pg1 interface.
365 # Basic iACL testing with destination IP
366 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
367 self.pg0.add_stream(pkts)
370 self.create_classify_table(key, self.build_ip_mask(dst_ip='ffffffff'))
371 self.create_classify_session(
372 self.acl_tbl_idx.get(key),
373 self.build_ip_match(dst_ip=self.pg1.remote_ip4))
374 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
375 self.acl_active_table = key
377 self.pg_enable_capture(self.pg_interfaces)
380 pkts = self.pg1.get_capture(len(pkts))
381 self.verify_capture(self.pg1, pkts)
382 self.pg0.assert_nothing_captured(remark="packets forwarded")
383 self.pg2.assert_nothing_captured(remark="packets forwarded")
384 self.pg3.assert_nothing_captured(remark="packets forwarded")
386 def test_iacl_src_dst_ip(self):
387 """ Source and destination IP iACL test
389 Test scenario for basic IP ACL with source and destination IP
390 - Create IPv4 stream for pg0 -> pg1 interface.
391 - Create iACL with source and destination IP addresses.
392 - Send and verify received packets on pg1 interface.
395 # Basic iACL testing with source and destination IP
396 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
397 self.pg0.add_stream(pkts)
400 self.create_classify_table(
401 key, self.build_ip_mask(src_ip='ffffffff', dst_ip='ffffffff'))
402 self.create_classify_session(
403 self.acl_tbl_idx.get(key),
404 self.build_ip_match(src_ip=self.pg0.remote_ip4,
405 dst_ip=self.pg1.remote_ip4))
406 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
407 self.acl_active_table = key
409 self.pg_enable_capture(self.pg_interfaces)
412 pkts = self.pg1.get_capture(len(pkts))
413 self.verify_capture(self.pg1, pkts)
414 self.pg0.assert_nothing_captured(remark="packets forwarded")
415 self.pg2.assert_nothing_captured(remark="packets forwarded")
416 self.pg3.assert_nothing_captured(remark="packets forwarded")
419 class TestClassifierUDP(TestClassifier):
420 """ Classifier UDP proto Test Case """
422 def test_iacl_proto_udp(self):
423 """ UDP protocol iACL test
425 Test scenario for basic protocol ACL with UDP protocol
426 - Create IPv4 stream for pg0 -> pg1 interface.
427 - Create iACL with UDP IP protocol.
428 - Send and verify received packets on pg1 interface.
431 # Basic iACL testing with UDP protocol
432 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
433 self.pg0.add_stream(pkts)
436 self.create_classify_table(key, self.build_ip_mask(proto='ff'))
437 self.create_classify_session(
438 self.acl_tbl_idx.get(key),
439 self.build_ip_match(proto=socket.IPPROTO_UDP))
440 self.input_acl_set_interface(
441 self.pg0, self.acl_tbl_idx.get(key))
442 self.acl_active_table = key
444 self.pg_enable_capture(self.pg_interfaces)
447 pkts = self.pg1.get_capture(len(pkts))
448 self.verify_capture(self.pg1, pkts)
449 self.pg0.assert_nothing_captured(remark="packets forwarded")
450 self.pg2.assert_nothing_captured(remark="packets forwarded")
451 self.pg3.assert_nothing_captured(remark="packets forwarded")
453 def test_iacl_proto_udp_sport(self):
454 """ UDP source port iACL test
456 Test scenario for basic protocol ACL with UDP and sport
457 - Create IPv4 stream for pg0 -> pg1 interface.
458 - Create iACL with UDP IP protocol and defined sport.
459 - Send and verify received packets on pg1 interface.
462 # Basic iACL testing with UDP and sport
464 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
465 UDP(sport=sport, dport=5678))
466 self.pg0.add_stream(pkts)
468 key = 'proto_udp_sport'
469 self.create_classify_table(
470 key, self.build_ip_mask(proto='ff', src_port='ffff'))
471 self.create_classify_session(
472 self.acl_tbl_idx.get(key),
473 self.build_ip_match(proto=socket.IPPROTO_UDP, src_port=sport))
474 self.input_acl_set_interface(
475 self.pg0, self.acl_tbl_idx.get(key))
476 self.acl_active_table = key
478 self.pg_enable_capture(self.pg_interfaces)
481 pkts = self.pg1.get_capture(len(pkts))
482 self.verify_capture(self.pg1, pkts)
483 self.pg0.assert_nothing_captured(remark="packets forwarded")
484 self.pg2.assert_nothing_captured(remark="packets forwarded")
485 self.pg3.assert_nothing_captured(remark="packets forwarded")
487 def test_iacl_proto_udp_dport(self):
488 """ UDP destination port iACL test
490 Test scenario for basic protocol ACL with UDP and dport
491 - Create IPv4 stream for pg0 -> pg1 interface.
492 - Create iACL with UDP IP protocol and defined dport.
493 - Send and verify received packets on pg1 interface.
496 # Basic iACL testing with UDP and dport
498 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
499 UDP(sport=1234, dport=dport))
500 self.pg0.add_stream(pkts)
502 key = 'proto_udp_dport'
503 self.create_classify_table(
504 key, self.build_ip_mask(proto='ff', dst_port='ffff'))
505 self.create_classify_session(
506 self.acl_tbl_idx.get(key),
507 self.build_ip_match(proto=socket.IPPROTO_UDP, dst_port=dport))
508 self.input_acl_set_interface(
509 self.pg0, self.acl_tbl_idx.get(key))
510 self.acl_active_table = key
512 self.pg_enable_capture(self.pg_interfaces)
515 pkts = self.pg1.get_capture(len(pkts))
516 self.verify_capture(self.pg1, pkts)
517 self.pg0.assert_nothing_captured(remark="packets forwarded")
518 self.pg2.assert_nothing_captured(remark="packets forwarded")
519 self.pg3.assert_nothing_captured(remark="packets forwarded")
521 def test_iacl_proto_udp_sport_dport(self):
522 """ UDP source and destination ports iACL test
524 Test scenario for basic protocol ACL with UDP and sport and dport
525 - Create IPv4 stream for pg0 -> pg1 interface.
526 - Create iACL with UDP IP protocol and defined sport and dport.
527 - Send and verify received packets on pg1 interface.
530 # Basic iACL testing with UDP and sport and dport
533 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
534 UDP(sport=sport, dport=dport))
535 self.pg0.add_stream(pkts)
537 key = 'proto_udp_ports'
538 self.create_classify_table(
540 self.build_ip_mask(proto='ff', src_port='ffff', dst_port='ffff'))
541 self.create_classify_session(
542 self.acl_tbl_idx.get(key),
543 self.build_ip_match(proto=socket.IPPROTO_UDP, src_port=sport,
545 self.input_acl_set_interface(
546 self.pg0, self.acl_tbl_idx.get(key))
547 self.acl_active_table = key
549 self.pg_enable_capture(self.pg_interfaces)
552 pkts = self.pg1.get_capture(len(pkts))
553 self.verify_capture(self.pg1, pkts)
554 self.pg0.assert_nothing_captured(remark="packets forwarded")
555 self.pg2.assert_nothing_captured(remark="packets forwarded")
556 self.pg3.assert_nothing_captured(remark="packets forwarded")
559 class TestClassifierTCP(TestClassifier):
560 """ Classifier TCP proto Test Case """
562 def test_iacl_proto_tcp(self):
563 """ TCP protocol iACL test
565 Test scenario for basic protocol ACL with TCP protocol
566 - Create IPv4 stream for pg0 -> pg1 interface.
567 - Create iACL with TCP IP protocol.
568 - Send and verify received packets on pg1 interface.
571 # Basic iACL testing with TCP protocol
572 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
573 TCP(sport=1234, dport=5678))
574 self.pg0.add_stream(pkts)
577 self.create_classify_table(key, self.build_ip_mask(proto='ff'))
578 self.create_classify_session(
579 self.acl_tbl_idx.get(key),
580 self.build_ip_match(proto=socket.IPPROTO_TCP))
581 self.input_acl_set_interface(
582 self.pg0, self.acl_tbl_idx.get(key))
583 self.acl_active_table = key
585 self.pg_enable_capture(self.pg_interfaces)
588 pkts = self.pg1.get_capture(len(pkts))
589 self.verify_capture(self.pg1, pkts, TCP)
590 self.pg0.assert_nothing_captured(remark="packets forwarded")
591 self.pg2.assert_nothing_captured(remark="packets forwarded")
592 self.pg3.assert_nothing_captured(remark="packets forwarded")
594 def test_iacl_proto_tcp_sport(self):
595 """ TCP source port iACL test
597 Test scenario for basic protocol ACL with TCP and sport
598 - Create IPv4 stream for pg0 -> pg1 interface.
599 - Create iACL with TCP IP protocol and defined sport.
600 - Send and verify received packets on pg1 interface.
603 # Basic iACL testing with TCP and sport
605 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
606 TCP(sport=sport, dport=5678))
607 self.pg0.add_stream(pkts)
609 key = 'proto_tcp_sport'
610 self.create_classify_table(
611 key, self.build_ip_mask(proto='ff', src_port='ffff'))
612 self.create_classify_session(
613 self.acl_tbl_idx.get(key),
614 self.build_ip_match(proto=socket.IPPROTO_TCP, src_port=sport))
615 self.input_acl_set_interface(
616 self.pg0, self.acl_tbl_idx.get(key))
617 self.acl_active_table = key
619 self.pg_enable_capture(self.pg_interfaces)
622 pkts = self.pg1.get_capture(len(pkts))
623 self.verify_capture(self.pg1, pkts, TCP)
624 self.pg0.assert_nothing_captured(remark="packets forwarded")
625 self.pg2.assert_nothing_captured(remark="packets forwarded")
626 self.pg3.assert_nothing_captured(remark="packets forwarded")
628 def test_iacl_proto_tcp_dport(self):
629 """ TCP destination port iACL test
631 Test scenario for basic protocol ACL with TCP and dport
632 - Create IPv4 stream for pg0 -> pg1 interface.
633 - Create iACL with TCP IP protocol and defined dport.
634 - Send and verify received packets on pg1 interface.
637 # Basic iACL testing with TCP and dport
639 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
640 TCP(sport=1234, dport=dport))
641 self.pg0.add_stream(pkts)
643 key = 'proto_tcp_sport'
644 self.create_classify_table(
645 key, self.build_ip_mask(proto='ff', dst_port='ffff'))
646 self.create_classify_session(
647 self.acl_tbl_idx.get(key),
648 self.build_ip_match(proto=socket.IPPROTO_TCP, dst_port=dport))
649 self.input_acl_set_interface(
650 self.pg0, self.acl_tbl_idx.get(key))
651 self.acl_active_table = key
653 self.pg_enable_capture(self.pg_interfaces)
656 pkts = self.pg1.get_capture(len(pkts))
657 self.verify_capture(self.pg1, pkts, TCP)
658 self.pg0.assert_nothing_captured(remark="packets forwarded")
659 self.pg2.assert_nothing_captured(remark="packets forwarded")
660 self.pg3.assert_nothing_captured(remark="packets forwarded")
662 def test_iacl_proto_tcp_sport_dport(self):
663 """ TCP source and destination ports iACL test
665 Test scenario for basic protocol ACL with TCP and sport and dport
666 - Create IPv4 stream for pg0 -> pg1 interface.
667 - Create iACL with TCP IP protocol and defined sport and dport.
668 - Send and verify received packets on pg1 interface.
671 # Basic iACL testing with TCP and sport and dport
674 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
675 TCP(sport=sport, dport=dport))
676 self.pg0.add_stream(pkts)
678 key = 'proto_tcp_ports'
679 self.create_classify_table(
681 self.build_ip_mask(proto='ff', src_port='ffff', dst_port='ffff'))
682 self.create_classify_session(
683 self.acl_tbl_idx.get(key),
684 self.build_ip_match(proto=socket.IPPROTO_TCP, src_port=sport,
686 self.input_acl_set_interface(
687 self.pg0, self.acl_tbl_idx.get(key))
688 self.acl_active_table = key
690 self.pg_enable_capture(self.pg_interfaces)
693 pkts = self.pg1.get_capture(len(pkts))
694 self.verify_capture(self.pg1, pkts, TCP)
695 self.pg0.assert_nothing_captured(remark="packets forwarded")
696 self.pg2.assert_nothing_captured(remark="packets forwarded")
697 self.pg3.assert_nothing_captured(remark="packets forwarded")
700 class TestClassifierIPOut(TestClassifier):
701 """ Classifier output IP Test Case """
703 def test_acl_ip_out(self):
704 """ Output IP ACL test
706 Test scenario for basic IP ACL with source IP
707 - Create IPv4 stream for pg1 -> pg0 interface.
708 - Create ACL with source IP address.
709 - Send and verify received packets on pg0 interface.
712 # Basic oACL testing with source IP
713 pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes)
714 self.pg1.add_stream(pkts)
717 self.create_classify_table(
718 key, self.build_ip_mask(src_ip='ffffffff'), data_offset=0)
719 self.create_classify_session(
720 self.acl_tbl_idx.get(key),
721 self.build_ip_match(src_ip=self.pg1.remote_ip4))
722 self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
723 self.acl_active_table = key
725 self.pg_enable_capture(self.pg_interfaces)
728 pkts = self.pg0.get_capture(len(pkts))
729 self.verify_capture(self.pg0, pkts)
730 self.pg1.assert_nothing_captured(remark="packets forwarded")
731 self.pg2.assert_nothing_captured(remark="packets forwarded")
732 self.pg3.assert_nothing_captured(remark="packets forwarded")
735 class TestClassifierMAC(TestClassifier):
736 """ Classifier MAC Test Case """
738 def test_acl_mac(self):
741 Test scenario for basic MAC ACL with source MAC
742 - Create IPv4 stream for pg0 -> pg2 interface.
743 - Create ACL with source MAC address.
744 - Send and verify received packets on pg2 interface.
747 # Basic iACL testing with source MAC
748 pkts = self.create_stream(self.pg0, self.pg2, self.pg_if_packet_sizes)
749 self.pg0.add_stream(pkts)
752 self.create_classify_table(
753 key, self.build_mac_mask(src_mac='ffffffffffff'), data_offset=-14)
754 self.create_classify_session(
755 self.acl_tbl_idx.get(key),
756 self.build_mac_match(src_mac=self.pg0.remote_mac))
757 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
758 self.acl_active_table = key
760 self.pg_enable_capture(self.pg_interfaces)
763 pkts = self.pg2.get_capture(len(pkts))
764 self.verify_capture(self.pg2, pkts)
765 self.pg0.assert_nothing_captured(remark="packets forwarded")
766 self.pg1.assert_nothing_captured(remark="packets forwarded")
767 self.pg3.assert_nothing_captured(remark="packets forwarded")
770 class TestClassifierPBR(TestClassifier):
771 """ Classifier PBR Test Case """
773 def test_acl_pbr(self):
776 Test scenario for PBR with source IP
777 - Create IPv4 stream for pg0 -> pg3 interface.
778 - Configure PBR fib entry for packet forwarding.
779 - Send and verify received packets on pg3 interface.
782 # PBR testing with source IP
783 pkts = self.create_stream(self.pg0, self.pg3, self.pg_if_packet_sizes)
784 self.pg0.add_stream(pkts)
787 self.create_classify_table(key, self.build_ip_mask(src_ip='ffffffff'))
789 # this will create the VRF/table in which we will insert the route
790 self.create_classify_session(
791 self.acl_tbl_idx.get(key),
792 self.build_ip_match(src_ip=self.pg0.remote_ip4),
793 pbr_option, self.pbr_vrfid)
794 self.assertTrue(self.verify_vrf(self.pbr_vrfid))
795 self.config_pbr_fib_entry(self.pg3)
796 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
798 self.pg_enable_capture(self.pg_interfaces)
801 pkts = self.pg3.get_capture(len(pkts))
802 self.verify_capture(self.pg3, pkts)
803 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key), 0)
804 self.pg0.assert_nothing_captured(remark="packets forwarded")
805 self.pg1.assert_nothing_captured(remark="packets forwarded")
806 self.pg2.assert_nothing_captured(remark="packets forwarded")
808 # remove the classify session and the route
809 self.config_pbr_fib_entry(self.pg3, is_add=0)
810 self.create_classify_session(
811 self.acl_tbl_idx.get(key),
812 self.build_ip_match(src_ip=self.pg0.remote_ip4),
813 pbr_option, self.pbr_vrfid, is_add=0)
815 # and the table should be gone.
816 self.assertFalse(self.verify_vrf(self.pbr_vrfid))
818 if __name__ == '__main__':
819 unittest.main(testRunner=VppTestRunner)