8 from framework import VppTestCase, VppTestRunner
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether
12 from scapy.layers.inet import IP, UDP, TCP
16 class TestClassifier(VppTestCase):
17 """ Classifier Test Case """
22 Perform standard class setup (defined by class method setUpClass in
23 class VppTestCase) before running the test case, set test case related
24 variables and configure VPP.
26 super(TestClassifier, cls).setUpClass()
27 cls.acl_active_table = ''
31 Perform test setup before test case.
34 - create 4 pg interfaces
35 - untagged pg0/pg1/pg2 interface
36 pg0 -------> pg1 (IP ACL)
42 - put it into UP state
44 - resolve neighbor address using ARP
46 :ivar list interfaces: pg interfaces.
47 :ivar list pg_if_packet_sizes: packet sizes in test.
48 :ivar dict acl_tbl_idx: ACL table index.
49 :ivar int pbr_vrfid: VRF id for PBR test.
51 self.reset_packet_infos()
52 super(TestClassifier, self).setUp()
54 # create 4 pg interfaces
55 self.create_pg_interfaces(range(4))
57 # packet sizes to test
58 self.pg_if_packet_sizes = [64, 9018]
60 self.interfaces = list(self.pg_interfaces)
66 # setup all interfaces
67 for intf in self.interfaces:
73 """Run standard test teardown and acl related log."""
75 self.logger.info(self.vapi.ppcli("show inacl type ip4"))
76 self.logger.info(self.vapi.ppcli("show outacl type ip4"))
77 self.logger.info(self.vapi.cli("show classify table verbose"))
78 self.logger.info(self.vapi.cli("show ip fib"))
79 if self.acl_active_table == 'ip_out':
80 self.output_acl_set_interface(
81 self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
82 self.acl_active_table = ''
83 elif self.acl_active_table != '':
84 self.input_acl_set_interface(
85 self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
86 self.acl_active_table = ''
87 for intf in self.interfaces:
91 super(TestClassifier, self).tearDown()
93 def config_pbr_fib_entry(self, intf, is_add=1):
94 """Configure fib entry to route traffic toward PBR VRF table
96 :param VppInterface intf: destination interface to be routed for PBR.
100 self.vapi.ip_add_del_route(intf.local_ip4n,
103 table_id=self.pbr_vrfid,
106 def create_stream(self, src_if, dst_if, packet_sizes,
107 proto_l=UDP(sport=1234, dport=5678)):
108 """Create input packet stream for defined interfaces.
110 :param VppInterface src_if: Source Interface for packet stream.
111 :param VppInterface dst_if: Destination Interface for packet stream.
112 :param list packet_sizes: packet size to test.
113 :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
117 for size in packet_sizes:
118 info = self.create_packet_info(src_if, dst_if)
119 payload = self.info_to_payload(info)
120 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
121 IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) /
125 self.extend_packet(p, size)
129 def verify_capture(self, dst_if, capture, proto_l=UDP):
130 """Verify captured input packet stream for defined interface.
132 :param VppInterface dst_if: Interface to verify captured packet stream.
133 :param list capture: Captured packet stream.
134 :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
136 self.logger.info("Verifying capture on interface %s" % dst_if.name)
138 for i in self.interfaces:
139 last_info[i.sw_if_index] = None
140 dst_sw_if_index = dst_if.sw_if_index
141 for packet in capture:
143 ip_received = packet[IP]
144 proto_received = packet[proto_l]
145 payload_info = self.payload_to_info(str(packet[Raw]))
146 packet_index = payload_info.index
147 self.assertEqual(payload_info.dst, dst_sw_if_index)
149 "Got packet on port %s: src=%u (id=%u)" %
150 (dst_if.name, payload_info.src, packet_index))
151 next_info = self.get_next_packet_info_for_interface2(
152 payload_info.src, dst_sw_if_index,
153 last_info[payload_info.src])
154 last_info[payload_info.src] = next_info
155 self.assertTrue(next_info is not None)
156 self.assertEqual(packet_index, next_info.index)
157 saved_packet = next_info.data
158 ip_saved = saved_packet[IP]
159 proto_saved = saved_packet[proto_l]
160 # Check standard fields
161 self.assertEqual(ip_received.src, ip_saved.src)
162 self.assertEqual(ip_received.dst, ip_saved.dst)
163 self.assertEqual(proto_received.sport, proto_saved.sport)
164 self.assertEqual(proto_received.dport, proto_saved.dport)
166 self.logger.error(ppp("Unexpected or invalid packet:", packet))
168 for i in self.interfaces:
169 remaining_packet = self.get_next_packet_info_for_interface2(
170 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
171 self.assertTrue(remaining_packet is None,
172 "Interface %s: Packet expected from interface %s "
173 "didn't arrive" % (dst_if.name, i.name))
175 def verify_vrf(self, vrf_id):
177 Check if the FIB table / VRF ID is configured.
179 :param int vrf_id: The FIB table / VRF ID to be verified.
180 :return: 1 if the FIB table / VRF ID is configured, otherwise return 0.
182 ip_fib_dump = self.vapi.ip_fib_dump()
184 for ip_fib_details in ip_fib_dump:
185 if ip_fib_details[2] == vrf_id:
188 self.logger.info("IPv4 VRF ID %d is not configured" % vrf_id)
191 self.logger.info("IPv4 VRF ID %d is configured" % vrf_id)
195 def build_ip_mask(proto='', src_ip='', dst_ip='',
196 src_port='', dst_port=''):
197 """Build IP ACL mask data with hexstring format.
199 :param str proto: protocol number <0-ff>
200 :param str src_ip: source ip address <0-ffffffff>
201 :param str dst_ip: destination ip address <0-ffffffff>
202 :param str src_port: source port number <0-ffff>
203 :param str dst_port: destination port number <0-ffff>
206 return ('{:0>20}{:0>12}{:0>8}{:0>4}{:0>4}'.format(
207 proto, src_ip, dst_ip, src_port, dst_port)).rstrip('0')
210 def build_ip_match(proto=0, src_ip='', dst_ip='',
211 src_port=0, dst_port=0):
212 """Build IP ACL match data with hexstring format.
214 :param int proto: protocol number with valid option "x"
215 :param str src_ip: source ip address with format of "x.x.x.x"
216 :param str dst_ip: destination ip address with format of "x.x.x.x"
217 :param int src_port: source port number "x"
218 :param int dst_port: destination port number "x"
221 src_ip = socket.inet_aton(src_ip).encode('hex')
223 dst_ip = socket.inet_aton(dst_ip).encode('hex')
225 return ('{:0>20}{:0>12}{:0>8}{:0>4}{:0>4}'.format(
226 hex(proto)[2:], src_ip, dst_ip, hex(src_port)[2:],
227 hex(dst_port)[2:])).rstrip('0')
230 def build_mac_mask(dst_mac='', src_mac='', ether_type=''):
231 """Build MAC ACL mask data with hexstring format.
233 :param str dst_mac: source MAC address <0-ffffffffffff>
234 :param str src_mac: destination MAC address <0-ffffffffffff>
235 :param str ether_type: ethernet type <0-ffff>
238 return ('{:0>12}{:0>12}{:0>4}'.format(dst_mac, src_mac,
239 ether_type)).rstrip('0')
242 def build_mac_match(dst_mac='', src_mac='', ether_type=''):
243 """Build MAC ACL match data with hexstring format.
245 :param str dst_mac: source MAC address <x:x:x:x:x:x>
246 :param str src_mac: destination MAC address <x:x:x:x:x:x>
247 :param str ether_type: ethernet type <0-ffff>
250 dst_mac = dst_mac.replace(':', '')
252 src_mac = src_mac.replace(':', '')
254 return ('{:0>12}{:0>12}{:0>4}'.format(dst_mac, src_mac,
255 ether_type)).rstrip('0')
257 def create_classify_table(self, key, mask, data_offset=0):
258 """Create Classify Table
260 :param str key: key for classify table (ex, ACL name).
261 :param str mask: mask value for interested traffic.
262 :param int data_offset:
264 r = self.vapi.classify_add_del_table(
266 mask=binascii.unhexlify(mask),
267 match_n_vectors=(len(mask) - 1) // 32 + 1,
270 current_data_offset=data_offset)
271 self.assertIsNotNone(r, msg='No response msg for add_del_table')
272 self.acl_tbl_idx[key] = r.new_table_index
274 def create_classify_session(self, table_index, match, pbr_option=0,
276 """Create Classify Session
278 :param int table_index: table index to identify classify table.
279 :param str match: matched value for interested traffic.
280 :param int pbr_option: enable/disable PBR feature.
281 :param int vrfid: VRF id.
282 :param int is_add: option to configure classify session.
283 - create(1) or delete(0)
285 r = self.vapi.classify_add_del_session(
288 binascii.unhexlify(match),
292 self.assertIsNotNone(r, msg='No response msg for add_del_session')
294 def input_acl_set_interface(self, intf, table_index, is_add=1):
295 """Configure Input ACL interface
297 :param VppInterface intf: Interface to apply Input ACL feature.
298 :param int table_index: table index to identify classify table.
299 :param int is_add: option to configure classify session.
300 - enable(1) or disable(0)
302 r = self.vapi.input_acl_set_interface(
305 ip4_table_index=table_index)
306 self.assertIsNotNone(r, msg='No response msg for acl_set_interface')
308 def output_acl_set_interface(self, intf, table_index, is_add=1):
309 """Configure Output ACL interface
311 :param VppInterface intf: Interface to apply Output ACL feature.
312 :param int table_index: table index to identify classify table.
313 :param int is_add: option to configure classify session.
314 - enable(1) or disable(0)
316 r = self.vapi.output_acl_set_interface(
319 ip4_table_index=table_index)
320 self.assertIsNotNone(r, msg='No response msg for acl_set_interface')
323 # Tests split to different test case classes because of issue reported in
325 class TestClassifierIP(TestClassifier):
326 """ Classifier IP Test Case """
328 def test_iacl_src_ip(self):
329 """ Source IP iACL test
331 Test scenario for basic IP ACL with source IP
332 - Create IPv4 stream for pg0 -> pg1 interface.
333 - Create iACL with source IP address.
334 - Send and verify received packets on pg1 interface.
337 # Basic iACL testing with source IP
338 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
339 self.pg0.add_stream(pkts)
342 self.create_classify_table(key, self.build_ip_mask(src_ip='ffffffff'))
343 self.create_classify_session(
344 self.acl_tbl_idx.get(key),
345 self.build_ip_match(src_ip=self.pg0.remote_ip4))
346 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
347 self.acl_active_table = key
349 self.pg_enable_capture(self.pg_interfaces)
352 pkts = self.pg1.get_capture(len(pkts))
353 self.verify_capture(self.pg1, pkts)
354 self.pg0.assert_nothing_captured(remark="packets forwarded")
355 self.pg2.assert_nothing_captured(remark="packets forwarded")
356 self.pg3.assert_nothing_captured(remark="packets forwarded")
358 def test_iacl_dst_ip(self):
359 """ Destination IP iACL test
361 Test scenario for basic IP ACL with destination IP
362 - Create IPv4 stream for pg0 -> pg1 interface.
363 - Create iACL with destination IP address.
364 - Send and verify received packets on pg1 interface.
367 # Basic iACL testing with destination IP
368 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
369 self.pg0.add_stream(pkts)
372 self.create_classify_table(key, self.build_ip_mask(dst_ip='ffffffff'))
373 self.create_classify_session(
374 self.acl_tbl_idx.get(key),
375 self.build_ip_match(dst_ip=self.pg1.remote_ip4))
376 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
377 self.acl_active_table = key
379 self.pg_enable_capture(self.pg_interfaces)
382 pkts = self.pg1.get_capture(len(pkts))
383 self.verify_capture(self.pg1, pkts)
384 self.pg0.assert_nothing_captured(remark="packets forwarded")
385 self.pg2.assert_nothing_captured(remark="packets forwarded")
386 self.pg3.assert_nothing_captured(remark="packets forwarded")
388 def test_iacl_src_dst_ip(self):
389 """ Source and destination IP iACL test
391 Test scenario for basic IP ACL with source and destination IP
392 - Create IPv4 stream for pg0 -> pg1 interface.
393 - Create iACL with source and destination IP addresses.
394 - Send and verify received packets on pg1 interface.
397 # Basic iACL testing with source and destination IP
398 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
399 self.pg0.add_stream(pkts)
402 self.create_classify_table(
403 key, self.build_ip_mask(src_ip='ffffffff', dst_ip='ffffffff'))
404 self.create_classify_session(
405 self.acl_tbl_idx.get(key),
406 self.build_ip_match(src_ip=self.pg0.remote_ip4,
407 dst_ip=self.pg1.remote_ip4))
408 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
409 self.acl_active_table = key
411 self.pg_enable_capture(self.pg_interfaces)
414 pkts = self.pg1.get_capture(len(pkts))
415 self.verify_capture(self.pg1, pkts)
416 self.pg0.assert_nothing_captured(remark="packets forwarded")
417 self.pg2.assert_nothing_captured(remark="packets forwarded")
418 self.pg3.assert_nothing_captured(remark="packets forwarded")
421 class TestClassifierUDP(TestClassifier):
422 """ Classifier UDP proto Test Case """
424 def test_iacl_proto_udp(self):
425 """ UDP protocol iACL test
427 Test scenario for basic protocol ACL with UDP protocol
428 - Create IPv4 stream for pg0 -> pg1 interface.
429 - Create iACL with UDP IP protocol.
430 - Send and verify received packets on pg1 interface.
433 # Basic iACL testing with UDP protocol
434 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
435 self.pg0.add_stream(pkts)
438 self.create_classify_table(key, self.build_ip_mask(proto='ff'))
439 self.create_classify_session(
440 self.acl_tbl_idx.get(key),
441 self.build_ip_match(proto=socket.IPPROTO_UDP))
442 self.input_acl_set_interface(
443 self.pg0, self.acl_tbl_idx.get(key))
444 self.acl_active_table = key
446 self.pg_enable_capture(self.pg_interfaces)
449 pkts = self.pg1.get_capture(len(pkts))
450 self.verify_capture(self.pg1, pkts)
451 self.pg0.assert_nothing_captured(remark="packets forwarded")
452 self.pg2.assert_nothing_captured(remark="packets forwarded")
453 self.pg3.assert_nothing_captured(remark="packets forwarded")
455 def test_iacl_proto_udp_sport(self):
456 """ UDP source port iACL test
458 Test scenario for basic protocol ACL with UDP and sport
459 - Create IPv4 stream for pg0 -> pg1 interface.
460 - Create iACL with UDP IP protocol and defined sport.
461 - Send and verify received packets on pg1 interface.
464 # Basic iACL testing with UDP and sport
466 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
467 UDP(sport=sport, dport=5678))
468 self.pg0.add_stream(pkts)
470 key = 'proto_udp_sport'
471 self.create_classify_table(
472 key, self.build_ip_mask(proto='ff', src_port='ffff'))
473 self.create_classify_session(
474 self.acl_tbl_idx.get(key),
475 self.build_ip_match(proto=socket.IPPROTO_UDP, src_port=sport))
476 self.input_acl_set_interface(
477 self.pg0, self.acl_tbl_idx.get(key))
478 self.acl_active_table = key
480 self.pg_enable_capture(self.pg_interfaces)
483 pkts = self.pg1.get_capture(len(pkts))
484 self.verify_capture(self.pg1, pkts)
485 self.pg0.assert_nothing_captured(remark="packets forwarded")
486 self.pg2.assert_nothing_captured(remark="packets forwarded")
487 self.pg3.assert_nothing_captured(remark="packets forwarded")
489 def test_iacl_proto_udp_dport(self):
490 """ UDP destination port iACL test
492 Test scenario for basic protocol ACL with UDP and dport
493 - Create IPv4 stream for pg0 -> pg1 interface.
494 - Create iACL with UDP IP protocol and defined dport.
495 - Send and verify received packets on pg1 interface.
498 # Basic iACL testing with UDP and dport
500 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
501 UDP(sport=1234, dport=dport))
502 self.pg0.add_stream(pkts)
504 key = 'proto_udp_dport'
505 self.create_classify_table(
506 key, self.build_ip_mask(proto='ff', dst_port='ffff'))
507 self.create_classify_session(
508 self.acl_tbl_idx.get(key),
509 self.build_ip_match(proto=socket.IPPROTO_UDP, dst_port=dport))
510 self.input_acl_set_interface(
511 self.pg0, self.acl_tbl_idx.get(key))
512 self.acl_active_table = key
514 self.pg_enable_capture(self.pg_interfaces)
517 pkts = self.pg1.get_capture(len(pkts))
518 self.verify_capture(self.pg1, pkts)
519 self.pg0.assert_nothing_captured(remark="packets forwarded")
520 self.pg2.assert_nothing_captured(remark="packets forwarded")
521 self.pg3.assert_nothing_captured(remark="packets forwarded")
523 def test_iacl_proto_udp_sport_dport(self):
524 """ UDP source and destination ports iACL test
526 Test scenario for basic protocol ACL with UDP and sport and dport
527 - Create IPv4 stream for pg0 -> pg1 interface.
528 - Create iACL with UDP IP protocol and defined sport and dport.
529 - Send and verify received packets on pg1 interface.
532 # Basic iACL testing with UDP and sport and dport
535 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
536 UDP(sport=sport, dport=dport))
537 self.pg0.add_stream(pkts)
539 key = 'proto_udp_ports'
540 self.create_classify_table(
542 self.build_ip_mask(proto='ff', src_port='ffff', dst_port='ffff'))
543 self.create_classify_session(
544 self.acl_tbl_idx.get(key),
545 self.build_ip_match(proto=socket.IPPROTO_UDP, src_port=sport,
547 self.input_acl_set_interface(
548 self.pg0, self.acl_tbl_idx.get(key))
549 self.acl_active_table = key
551 self.pg_enable_capture(self.pg_interfaces)
554 pkts = self.pg1.get_capture(len(pkts))
555 self.verify_capture(self.pg1, pkts)
556 self.pg0.assert_nothing_captured(remark="packets forwarded")
557 self.pg2.assert_nothing_captured(remark="packets forwarded")
558 self.pg3.assert_nothing_captured(remark="packets forwarded")
561 class TestClassifierTCP(TestClassifier):
562 """ Classifier TCP proto Test Case """
564 def test_iacl_proto_tcp(self):
565 """ TCP protocol iACL test
567 Test scenario for basic protocol ACL with TCP protocol
568 - Create IPv4 stream for pg0 -> pg1 interface.
569 - Create iACL with TCP IP protocol.
570 - Send and verify received packets on pg1 interface.
573 # Basic iACL testing with TCP protocol
574 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
575 TCP(sport=1234, dport=5678))
576 self.pg0.add_stream(pkts)
579 self.create_classify_table(key, self.build_ip_mask(proto='ff'))
580 self.create_classify_session(
581 self.acl_tbl_idx.get(key),
582 self.build_ip_match(proto=socket.IPPROTO_TCP))
583 self.input_acl_set_interface(
584 self.pg0, self.acl_tbl_idx.get(key))
585 self.acl_active_table = key
587 self.pg_enable_capture(self.pg_interfaces)
590 pkts = self.pg1.get_capture(len(pkts))
591 self.verify_capture(self.pg1, pkts, TCP)
592 self.pg0.assert_nothing_captured(remark="packets forwarded")
593 self.pg2.assert_nothing_captured(remark="packets forwarded")
594 self.pg3.assert_nothing_captured(remark="packets forwarded")
596 def test_iacl_proto_tcp_sport(self):
597 """ TCP source port iACL test
599 Test scenario for basic protocol ACL with TCP and sport
600 - Create IPv4 stream for pg0 -> pg1 interface.
601 - Create iACL with TCP IP protocol and defined sport.
602 - Send and verify received packets on pg1 interface.
605 # Basic iACL testing with TCP and sport
607 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
608 TCP(sport=sport, dport=5678))
609 self.pg0.add_stream(pkts)
611 key = 'proto_tcp_sport'
612 self.create_classify_table(
613 key, self.build_ip_mask(proto='ff', src_port='ffff'))
614 self.create_classify_session(
615 self.acl_tbl_idx.get(key),
616 self.build_ip_match(proto=socket.IPPROTO_TCP, src_port=sport))
617 self.input_acl_set_interface(
618 self.pg0, self.acl_tbl_idx.get(key))
619 self.acl_active_table = key
621 self.pg_enable_capture(self.pg_interfaces)
624 pkts = self.pg1.get_capture(len(pkts))
625 self.verify_capture(self.pg1, pkts, TCP)
626 self.pg0.assert_nothing_captured(remark="packets forwarded")
627 self.pg2.assert_nothing_captured(remark="packets forwarded")
628 self.pg3.assert_nothing_captured(remark="packets forwarded")
630 def test_iacl_proto_tcp_dport(self):
631 """ TCP destination port iACL test
633 Test scenario for basic protocol ACL with TCP and dport
634 - Create IPv4 stream for pg0 -> pg1 interface.
635 - Create iACL with TCP IP protocol and defined dport.
636 - Send and verify received packets on pg1 interface.
639 # Basic iACL testing with TCP and dport
641 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
642 TCP(sport=1234, dport=dport))
643 self.pg0.add_stream(pkts)
645 key = 'proto_tcp_sport'
646 self.create_classify_table(
647 key, self.build_ip_mask(proto='ff', dst_port='ffff'))
648 self.create_classify_session(
649 self.acl_tbl_idx.get(key),
650 self.build_ip_match(proto=socket.IPPROTO_TCP, dst_port=dport))
651 self.input_acl_set_interface(
652 self.pg0, self.acl_tbl_idx.get(key))
653 self.acl_active_table = key
655 self.pg_enable_capture(self.pg_interfaces)
658 pkts = self.pg1.get_capture(len(pkts))
659 self.verify_capture(self.pg1, pkts, TCP)
660 self.pg0.assert_nothing_captured(remark="packets forwarded")
661 self.pg2.assert_nothing_captured(remark="packets forwarded")
662 self.pg3.assert_nothing_captured(remark="packets forwarded")
664 def test_iacl_proto_tcp_sport_dport(self):
665 """ TCP source and destination ports iACL test
667 Test scenario for basic protocol ACL with TCP and sport and dport
668 - Create IPv4 stream for pg0 -> pg1 interface.
669 - Create iACL with TCP IP protocol and defined sport and dport.
670 - Send and verify received packets on pg1 interface.
673 # Basic iACL testing with TCP and sport and dport
676 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
677 TCP(sport=sport, dport=dport))
678 self.pg0.add_stream(pkts)
680 key = 'proto_tcp_ports'
681 self.create_classify_table(
683 self.build_ip_mask(proto='ff', src_port='ffff', dst_port='ffff'))
684 self.create_classify_session(
685 self.acl_tbl_idx.get(key),
686 self.build_ip_match(proto=socket.IPPROTO_TCP, src_port=sport,
688 self.input_acl_set_interface(
689 self.pg0, self.acl_tbl_idx.get(key))
690 self.acl_active_table = key
692 self.pg_enable_capture(self.pg_interfaces)
695 pkts = self.pg1.get_capture(len(pkts))
696 self.verify_capture(self.pg1, pkts, TCP)
697 self.pg0.assert_nothing_captured(remark="packets forwarded")
698 self.pg2.assert_nothing_captured(remark="packets forwarded")
699 self.pg3.assert_nothing_captured(remark="packets forwarded")
702 class TestClassifierIPOut(TestClassifier):
703 """ Classifier output IP Test Case """
705 def test_acl_ip_out(self):
706 """ Output IP ACL test
708 Test scenario for basic IP ACL with source IP
709 - Create IPv4 stream for pg1 -> pg0 interface.
710 - Create ACL with source IP address.
711 - Send and verify received packets on pg0 interface.
714 # Basic oACL testing with source IP
715 pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes)
716 self.pg1.add_stream(pkts)
719 self.create_classify_table(
720 key, self.build_ip_mask(src_ip='ffffffff'), data_offset=0)
721 self.create_classify_session(
722 self.acl_tbl_idx.get(key),
723 self.build_ip_match(src_ip=self.pg1.remote_ip4))
724 self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
725 self.acl_active_table = key
727 self.pg_enable_capture(self.pg_interfaces)
730 pkts = self.pg0.get_capture(len(pkts))
731 self.verify_capture(self.pg0, pkts)
732 self.pg1.assert_nothing_captured(remark="packets forwarded")
733 self.pg2.assert_nothing_captured(remark="packets forwarded")
734 self.pg3.assert_nothing_captured(remark="packets forwarded")
737 class TestClassifierMAC(TestClassifier):
738 """ Classifier MAC Test Case """
740 def test_acl_mac(self):
743 Test scenario for basic MAC ACL with source MAC
744 - Create IPv4 stream for pg0 -> pg2 interface.
745 - Create ACL with source MAC address.
746 - Send and verify received packets on pg2 interface.
749 # Basic iACL testing with source MAC
750 pkts = self.create_stream(self.pg0, self.pg2, self.pg_if_packet_sizes)
751 self.pg0.add_stream(pkts)
754 self.create_classify_table(
755 key, self.build_mac_mask(src_mac='ffffffffffff'), data_offset=-14)
756 self.create_classify_session(
757 self.acl_tbl_idx.get(key),
758 self.build_mac_match(src_mac=self.pg0.remote_mac))
759 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
760 self.acl_active_table = key
762 self.pg_enable_capture(self.pg_interfaces)
765 pkts = self.pg2.get_capture(len(pkts))
766 self.verify_capture(self.pg2, pkts)
767 self.pg0.assert_nothing_captured(remark="packets forwarded")
768 self.pg1.assert_nothing_captured(remark="packets forwarded")
769 self.pg3.assert_nothing_captured(remark="packets forwarded")
772 class TestClassifierPBR(TestClassifier):
773 """ Classifier PBR Test Case """
775 def test_acl_pbr(self):
778 Test scenario for PBR with source IP
779 - Create IPv4 stream for pg0 -> pg3 interface.
780 - Configure PBR fib entry for packet forwarding.
781 - Send and verify received packets on pg3 interface.
784 # PBR testing with source IP
785 pkts = self.create_stream(self.pg0, self.pg3, self.pg_if_packet_sizes)
786 self.pg0.add_stream(pkts)
789 self.create_classify_table(key, self.build_ip_mask(src_ip='ffffffff'))
791 # this will create the VRF/table in which we will insert the route
792 self.create_classify_session(
793 self.acl_tbl_idx.get(key),
794 self.build_ip_match(src_ip=self.pg0.remote_ip4),
795 pbr_option, self.pbr_vrfid)
796 self.assertTrue(self.verify_vrf(self.pbr_vrfid))
797 self.config_pbr_fib_entry(self.pg3)
798 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
800 self.pg_enable_capture(self.pg_interfaces)
803 pkts = self.pg3.get_capture(len(pkts))
804 self.verify_capture(self.pg3, pkts)
805 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key), 0)
806 self.pg0.assert_nothing_captured(remark="packets forwarded")
807 self.pg1.assert_nothing_captured(remark="packets forwarded")
808 self.pg2.assert_nothing_captured(remark="packets forwarded")
810 # remove the classify session and the route
811 self.config_pbr_fib_entry(self.pg3, is_add=0)
812 self.create_classify_session(
813 self.acl_tbl_idx.get(key),
814 self.build_ip_match(src_ip=self.pg0.remote_ip4),
815 pbr_option, self.pbr_vrfid, is_add=0)
817 # and the table should be gone.
818 self.assertFalse(self.verify_vrf(self.pbr_vrfid))
820 if __name__ == '__main__':
821 unittest.main(testRunner=VppTestRunner)