7 from framework import VppTestCase, VppTestRunner
8 from scapy.packet import Raw, Packet
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP, UDP, TCP
13 from template_classifier import TestClassifier, VarMask, VarMatch
14 from vpp_ip_route import VppIpRoute, VppRoutePath
15 from vpp_ip import INVALID_INDEX
16 from vpp_papi import VppEnum
19 # Tests split to different test case classes because of issue reported in
21 class TestClassifierIP(TestClassifier):
22 """Classifier IP Test Case"""
26 super(TestClassifierIP, cls).setUpClass()
29 def tearDownClass(cls):
30 super(TestClassifierIP, cls).tearDownClass()
32 def test_iacl_src_ip(self):
33 """Source IP iACL test
35 Test scenario for basic IP ACL with source IP
36 - Create IPv4 stream for pg0 -> pg1 interface.
37 - Create iACL with source IP address.
38 - Send and verify received packets on pg1 interface.
41 # Basic iACL testing with source IP
42 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
43 self.pg0.add_stream(pkts)
46 self.create_classify_table(key, self.build_ip_mask(src_ip="ffffffff"))
47 self.create_classify_session(
48 self.acl_tbl_idx.get(key), self.build_ip_match(src_ip=self.pg0.remote_ip4)
50 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
51 self.acl_active_table = key
53 self.pg_enable_capture(self.pg_interfaces)
56 pkts = self.pg1.get_capture(len(pkts))
57 self.verify_capture(self.pg1, pkts)
58 self.pg0.assert_nothing_captured(remark="packets forwarded")
59 self.pg2.assert_nothing_captured(remark="packets forwarded")
60 self.pg3.assert_nothing_captured(remark="packets forwarded")
62 def test_iacl_dst_ip(self):
63 """Destination IP iACL test
65 Test scenario for basic IP ACL with destination IP
66 - Create IPv4 stream for pg0 -> pg1 interface.
67 - Create iACL with destination IP address.
68 - Send and verify received packets on pg1 interface.
71 # Basic iACL testing with destination IP
72 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
73 self.pg0.add_stream(pkts)
76 self.create_classify_table(key, self.build_ip_mask(dst_ip="ffffffff"))
77 self.create_classify_session(
78 self.acl_tbl_idx.get(key), self.build_ip_match(dst_ip=self.pg1.remote_ip4)
80 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
81 self.acl_active_table = key
83 self.pg_enable_capture(self.pg_interfaces)
86 pkts = self.pg1.get_capture(len(pkts))
87 self.verify_capture(self.pg1, pkts)
88 self.pg0.assert_nothing_captured(remark="packets forwarded")
89 self.pg2.assert_nothing_captured(remark="packets forwarded")
90 self.pg3.assert_nothing_captured(remark="packets forwarded")
92 def test_iacl_src_dst_ip(self):
93 """Source and destination IP iACL test
95 Test scenario for basic IP ACL with source and destination IP
96 - Create IPv4 stream for pg0 -> pg1 interface.
97 - Create iACL with source and destination IP addresses.
98 - Send and verify received packets on pg1 interface.
101 # Basic iACL testing with source and destination IP
102 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
103 self.pg0.add_stream(pkts)
106 self.create_classify_table(
107 key, self.build_ip_mask(src_ip="ffffffff", dst_ip="ffffffff")
109 self.create_classify_session(
110 self.acl_tbl_idx.get(key),
111 self.build_ip_match(src_ip=self.pg0.remote_ip4, dst_ip=self.pg1.remote_ip4),
113 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
114 self.acl_active_table = key
116 self.pg_enable_capture(self.pg_interfaces)
119 pkts = self.pg1.get_capture(len(pkts))
120 self.verify_capture(self.pg1, pkts)
121 self.pg0.assert_nothing_captured(remark="packets forwarded")
122 self.pg2.assert_nothing_captured(remark="packets forwarded")
123 self.pg3.assert_nothing_captured(remark="packets forwarded")
126 class TestClassifierUDP(TestClassifier):
127 """Classifier UDP proto Test Case"""
131 super(TestClassifierUDP, cls).setUpClass()
134 def tearDownClass(cls):
135 super(TestClassifierUDP, cls).tearDownClass()
137 def test_iacl_proto_udp(self):
138 """UDP protocol iACL test
140 Test scenario for basic protocol ACL with UDP protocol
141 - Create IPv4 stream for pg0 -> pg1 interface.
142 - Create iACL with UDP IP protocol.
143 - Send and verify received packets on pg1 interface.
146 # Basic iACL testing with UDP protocol
147 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
148 self.pg0.add_stream(pkts)
151 self.create_classify_table(key, self.build_ip_mask(proto="ff"))
152 self.create_classify_session(
153 self.acl_tbl_idx.get(key), self.build_ip_match(proto=socket.IPPROTO_UDP)
155 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
156 self.acl_active_table = key
158 self.pg_enable_capture(self.pg_interfaces)
161 pkts = self.pg1.get_capture(len(pkts))
162 self.verify_capture(self.pg1, pkts)
163 self.pg0.assert_nothing_captured(remark="packets forwarded")
164 self.pg2.assert_nothing_captured(remark="packets forwarded")
165 self.pg3.assert_nothing_captured(remark="packets forwarded")
167 def test_iacl_proto_udp_sport(self):
168 """UDP source port iACL test
170 Test scenario for basic protocol ACL with UDP and sport
171 - Create IPv4 stream for pg0 -> pg1 interface.
172 - Create iACL with UDP IP protocol and defined sport.
173 - Send and verify received packets on pg1 interface.
176 # Basic iACL testing with UDP and sport
178 pkts = self.create_stream(
179 self.pg0, self.pg1, self.pg_if_packet_sizes, UDP(sport=sport, dport=5678)
181 self.pg0.add_stream(pkts)
183 key = "proto_udp_sport"
184 self.create_classify_table(key, self.build_ip_mask(proto="ff", src_port="ffff"))
185 self.create_classify_session(
186 self.acl_tbl_idx.get(key),
187 self.build_ip_match(proto=socket.IPPROTO_UDP, src_port=sport),
189 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
190 self.acl_active_table = key
192 self.pg_enable_capture(self.pg_interfaces)
195 pkts = self.pg1.get_capture(len(pkts))
196 self.verify_capture(self.pg1, pkts)
197 self.pg0.assert_nothing_captured(remark="packets forwarded")
198 self.pg2.assert_nothing_captured(remark="packets forwarded")
199 self.pg3.assert_nothing_captured(remark="packets forwarded")
201 def test_iacl_proto_udp_dport(self):
202 """UDP destination port iACL test
204 Test scenario for basic protocol ACL with UDP and dport
205 - Create IPv4 stream for pg0 -> pg1 interface.
206 - Create iACL with UDP IP protocol and defined dport.
207 - Send and verify received packets on pg1 interface.
210 # Basic iACL testing with UDP and dport
212 pkts = self.create_stream(
213 self.pg0, self.pg1, self.pg_if_packet_sizes, UDP(sport=1234, dport=dport)
215 self.pg0.add_stream(pkts)
217 key = "proto_udp_dport"
218 self.create_classify_table(key, self.build_ip_mask(proto="ff", dst_port="ffff"))
219 self.create_classify_session(
220 self.acl_tbl_idx.get(key),
221 self.build_ip_match(proto=socket.IPPROTO_UDP, dst_port=dport),
223 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
224 self.acl_active_table = key
226 self.pg_enable_capture(self.pg_interfaces)
229 pkts = self.pg1.get_capture(len(pkts))
230 self.verify_capture(self.pg1, pkts)
231 self.pg0.assert_nothing_captured(remark="packets forwarded")
232 self.pg2.assert_nothing_captured(remark="packets forwarded")
233 self.pg3.assert_nothing_captured(remark="packets forwarded")
235 def test_iacl_proto_udp_sport_dport(self):
236 """UDP source and destination ports iACL test
238 Test scenario for basic protocol ACL with UDP and sport and dport
239 - Create IPv4 stream for pg0 -> pg1 interface.
240 - Create iACL with UDP IP protocol and defined sport and dport.
241 - Send and verify received packets on pg1 interface.
244 # Basic iACL testing with UDP and sport and dport
247 pkts = self.create_stream(
248 self.pg0, self.pg1, self.pg_if_packet_sizes, UDP(sport=sport, dport=dport)
250 self.pg0.add_stream(pkts)
252 key = "proto_udp_ports"
253 self.create_classify_table(
254 key, self.build_ip_mask(proto="ff", src_port="ffff", dst_port="ffff")
256 self.create_classify_session(
257 self.acl_tbl_idx.get(key),
259 proto=socket.IPPROTO_UDP, src_port=sport, dst_port=dport
262 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
263 self.acl_active_table = key
265 self.pg_enable_capture(self.pg_interfaces)
268 pkts = self.pg1.get_capture(len(pkts))
269 self.verify_capture(self.pg1, pkts)
270 self.pg0.assert_nothing_captured(remark="packets forwarded")
271 self.pg2.assert_nothing_captured(remark="packets forwarded")
272 self.pg3.assert_nothing_captured(remark="packets forwarded")
275 class TestClassifierTCP(TestClassifier):
276 """Classifier TCP proto Test Case"""
280 super(TestClassifierTCP, cls).setUpClass()
283 def tearDownClass(cls):
284 super(TestClassifierTCP, cls).tearDownClass()
286 def test_iacl_proto_tcp(self):
287 """TCP protocol iACL test
289 Test scenario for basic protocol ACL with TCP protocol
290 - Create IPv4 stream for pg0 -> pg1 interface.
291 - Create iACL with TCP IP protocol.
292 - Send and verify received packets on pg1 interface.
295 # Basic iACL testing with TCP protocol
296 pkts = self.create_stream(
297 self.pg0, self.pg1, self.pg_if_packet_sizes, TCP(sport=1234, dport=5678)
299 self.pg0.add_stream(pkts)
302 self.create_classify_table(key, self.build_ip_mask(proto="ff"))
303 self.create_classify_session(
304 self.acl_tbl_idx.get(key), self.build_ip_match(proto=socket.IPPROTO_TCP)
306 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
307 self.acl_active_table = key
309 self.pg_enable_capture(self.pg_interfaces)
312 pkts = self.pg1.get_capture(len(pkts))
313 self.verify_capture(self.pg1, pkts, TCP)
314 self.pg0.assert_nothing_captured(remark="packets forwarded")
315 self.pg2.assert_nothing_captured(remark="packets forwarded")
316 self.pg3.assert_nothing_captured(remark="packets forwarded")
318 def test_iacl_proto_tcp_sport(self):
319 """TCP source port iACL test
321 Test scenario for basic protocol ACL with TCP and sport
322 - Create IPv4 stream for pg0 -> pg1 interface.
323 - Create iACL with TCP IP protocol and defined sport.
324 - Send and verify received packets on pg1 interface.
327 # Basic iACL testing with TCP and sport
329 pkts = self.create_stream(
330 self.pg0, self.pg1, self.pg_if_packet_sizes, TCP(sport=sport, dport=5678)
332 self.pg0.add_stream(pkts)
334 key = "proto_tcp_sport"
335 self.create_classify_table(key, self.build_ip_mask(proto="ff", src_port="ffff"))
336 self.create_classify_session(
337 self.acl_tbl_idx.get(key),
338 self.build_ip_match(proto=socket.IPPROTO_TCP, src_port=sport),
340 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
341 self.acl_active_table = key
343 self.pg_enable_capture(self.pg_interfaces)
346 pkts = self.pg1.get_capture(len(pkts))
347 self.verify_capture(self.pg1, pkts, TCP)
348 self.pg0.assert_nothing_captured(remark="packets forwarded")
349 self.pg2.assert_nothing_captured(remark="packets forwarded")
350 self.pg3.assert_nothing_captured(remark="packets forwarded")
352 def test_iacl_proto_tcp_dport(self):
353 """TCP destination port iACL test
355 Test scenario for basic protocol ACL with TCP and dport
356 - Create IPv4 stream for pg0 -> pg1 interface.
357 - Create iACL with TCP IP protocol and defined dport.
358 - Send and verify received packets on pg1 interface.
361 # Basic iACL testing with TCP and dport
363 pkts = self.create_stream(
364 self.pg0, self.pg1, self.pg_if_packet_sizes, TCP(sport=1234, dport=dport)
366 self.pg0.add_stream(pkts)
368 key = "proto_tcp_sport"
369 self.create_classify_table(key, self.build_ip_mask(proto="ff", dst_port="ffff"))
370 self.create_classify_session(
371 self.acl_tbl_idx.get(key),
372 self.build_ip_match(proto=socket.IPPROTO_TCP, dst_port=dport),
374 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
375 self.acl_active_table = key
377 self.pg_enable_capture(self.pg_interfaces)
380 pkts = self.pg1.get_capture(len(pkts))
381 self.verify_capture(self.pg1, pkts, TCP)
382 self.pg0.assert_nothing_captured(remark="packets forwarded")
383 self.pg2.assert_nothing_captured(remark="packets forwarded")
384 self.pg3.assert_nothing_captured(remark="packets forwarded")
386 def test_iacl_proto_tcp_sport_dport(self):
387 """TCP source and destination ports iACL test
389 Test scenario for basic protocol ACL with TCP and sport and dport
390 - Create IPv4 stream for pg0 -> pg1 interface.
391 - Create iACL with TCP IP protocol and defined sport and dport.
392 - Send and verify received packets on pg1 interface.
395 # Basic iACL testing with TCP and sport and dport
398 pkts = self.create_stream(
399 self.pg0, self.pg1, self.pg_if_packet_sizes, TCP(sport=sport, dport=dport)
401 self.pg0.add_stream(pkts)
403 key = "proto_tcp_ports"
404 self.create_classify_table(
405 key, self.build_ip_mask(proto="ff", src_port="ffff", dst_port="ffff")
407 self.create_classify_session(
408 self.acl_tbl_idx.get(key),
410 proto=socket.IPPROTO_TCP, src_port=sport, dst_port=dport
413 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
414 self.acl_active_table = key
416 self.pg_enable_capture(self.pg_interfaces)
419 pkts = self.pg1.get_capture(len(pkts))
420 self.verify_capture(self.pg1, pkts, TCP)
421 self.pg0.assert_nothing_captured(remark="packets forwarded")
422 self.pg2.assert_nothing_captured(remark="packets forwarded")
423 self.pg3.assert_nothing_captured(remark="packets forwarded")
426 class TestClassifierIPOut(TestClassifier):
427 """Classifier output IP Test Case"""
431 super(TestClassifierIPOut, cls).setUpClass()
434 def tearDownClass(cls):
435 super(TestClassifierIPOut, cls).tearDownClass()
437 def test_acl_ip_out(self):
438 """Output IP ACL test
440 Test scenario for basic IP ACL with source IP
441 - Create IPv4 stream for pg1 -> pg0 interface.
442 - Create ACL with source IP address.
443 - Send and verify received packets on pg0 interface.
446 # Basic oACL testing with source IP
447 pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes)
448 self.pg1.add_stream(pkts)
451 self.create_classify_table(
452 key, self.build_ip_mask(src_ip="ffffffff"), data_offset=0
454 self.create_classify_session(
455 self.acl_tbl_idx.get(key), self.build_ip_match(src_ip=self.pg1.remote_ip4)
457 self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
458 self.acl_active_table = key
460 self.pg_enable_capture(self.pg_interfaces)
463 pkts = self.pg0.get_capture(len(pkts))
464 self.verify_capture(self.pg0, pkts)
465 self.pg1.assert_nothing_captured(remark="packets forwarded")
466 self.pg2.assert_nothing_captured(remark="packets forwarded")
467 self.pg3.assert_nothing_captured(remark="packets forwarded")
470 class TestClassifierMAC(TestClassifier):
471 """Classifier MAC Test Case"""
475 super(TestClassifierMAC, cls).setUpClass()
478 def tearDownClass(cls):
479 super(TestClassifierMAC, cls).tearDownClass()
481 def test_acl_mac(self):
484 Test scenario for basic MAC ACL with source MAC
485 - Create IPv4 stream for pg0 -> pg2 interface.
486 - Create ACL with source MAC address.
487 - Send and verify received packets on pg2 interface.
490 # Basic iACL testing with source MAC
491 pkts = self.create_stream(self.pg0, self.pg2, self.pg_if_packet_sizes)
492 self.pg0.add_stream(pkts)
495 self.create_classify_table(
496 key, self.build_mac_mask(src_mac="ffffffffffff"), data_offset=-14
498 self.create_classify_session(
499 self.acl_tbl_idx.get(key), self.build_mac_match(src_mac=self.pg0.remote_mac)
501 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
502 self.acl_active_table = key
504 self.pg_enable_capture(self.pg_interfaces)
507 pkts = self.pg2.get_capture(len(pkts))
508 self.verify_capture(self.pg2, pkts)
509 self.pg0.assert_nothing_captured(remark="packets forwarded")
510 self.pg1.assert_nothing_captured(remark="packets forwarded")
511 self.pg3.assert_nothing_captured(remark="packets forwarded")
514 class TestClassifierComplex(TestClassifier):
515 """Large & Nested Classifiers Test Cases"""
519 super(TestClassifierComplex, cls).setUpClass()
522 def tearDownClass(cls):
523 super(TestClassifierComplex, cls).tearDownClass()
525 def test_iacl_large(self):
526 """Large input ACL test
528 Test scenario for Large ACL matching on ethernet+ip+udp headers
529 - Create IPv4 stream for pg0 -> pg1 interface.
530 - Create large acl matching on ethernet+ip+udp header fields
531 - Send and verify received packets on pg1 interface.
534 # 40b offset = 80bytes - (sizeof(UDP/IP/ETH) + 4b)
535 # + 4b as build_ip_ma*() func, do not match against UDP Len & Chksum
536 msk = VarMask(offset=40, spec="ffff")
537 mth = VarMatch(offset=40, value=0x1234, length=2)
539 payload_msk = self.build_payload_mask([msk])
540 payload_match = self.build_payload_match([mth])
545 # 36b offset = 80bytes - (sizeof(UDP/IP/ETH))
546 packet_ex = bytes.fromhex(("0" * 36) + "1234")
547 pkts = self.create_stream(
550 self.pg_if_packet_sizes,
551 UDP(sport=sport, dport=dport),
554 self.pg0.add_stream(pkts)
557 self.create_classify_table(
560 src_mac="ffffffffffff", dst_mac="ffffffffffff", ether_type="ffff"
562 + self.build_ip_mask(
573 self.create_classify_session(
574 self.acl_tbl_idx.get(key),
575 self.build_mac_match(
576 src_mac=self.pg0.remote_mac,
577 dst_mac=self.pg0.local_mac,
581 + self.build_ip_match(
582 proto=socket.IPPROTO_UDP,
583 src_ip=self.pg0.remote_ip4,
584 dst_ip=self.pg1.remote_ip4,
591 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
592 self.acl_active_table = key
594 self.pg_enable_capture(self.pg_interfaces)
597 pkts = self.pg1.get_capture(len(pkts))
598 self.verify_capture(self.pg1, pkts)
599 self.pg0.assert_nothing_captured(remark="packets forwarded")
600 self.pg2.assert_nothing_captured(remark="packets forwarded")
601 self.pg3.assert_nothing_captured(remark="packets forwarded")
603 def test_oacl_large(self):
604 """Large output ACL test
605 Test scenario for Large ACL matching on ethernet+ip+udp headers
606 - Create IPv4 stream for pg1 -> pg0 interface.
607 - Create large acl matching on ethernet+ip+udp header fields
608 - Send and verify received packets on pg0 interface.
611 # 40b offset = 80bytes - (sizeof(UDP/IP/ETH) + 4b)
612 # + 4b as build_ip_ma*() func, do not match against UDP Len & Chksum
613 msk = VarMask(offset=40, spec="ffff")
614 mth = VarMatch(offset=40, value=0x1234, length=2)
616 payload_msk = self.build_payload_mask([msk])
617 payload_match = self.build_payload_match([mth])
622 # 36b offset = 80bytes - (sizeof(UDP/IP/ETH))
623 packet_ex = bytes.fromhex(("0" * 36) + "1234")
624 pkts = self.create_stream(
627 self.pg_if_packet_sizes,
628 UDP(sport=sport, dport=dport),
631 self.pg1.add_stream(pkts)
634 self.create_classify_table(
637 src_mac="ffffffffffff", dst_mac="ffffffffffff", ether_type="ffff"
639 + self.build_ip_mask(
650 self.create_classify_session(
651 self.acl_tbl_idx.get(key),
652 self.build_mac_match(
653 src_mac=self.pg0.local_mac,
654 dst_mac=self.pg0.remote_mac,
658 + self.build_ip_match(
659 proto=socket.IPPROTO_UDP,
660 src_ip=self.pg1.remote_ip4,
661 dst_ip=self.pg0.remote_ip4,
668 self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
669 self.acl_active_table = key
671 self.pg_enable_capture(self.pg_interfaces)
674 pkts = self.pg0.get_capture(len(pkts))
675 self.verify_capture(self.pg0, pkts)
676 self.pg1.assert_nothing_captured(remark="packets forwarded")
677 self.pg2.assert_nothing_captured(remark="packets forwarded")
678 self.pg3.assert_nothing_captured(remark="packets forwarded")
680 def test_iacl_nested(self):
681 """Nested input ACL test
683 Test scenario for Large ACL matching on ethernet+ip+udp headers
684 - Create IPv4 stream for pg0 -> pg1 interface.
685 - Create 1st classifier table, without any entries
686 - Create nested acl matching on ethernet+ip+udp header fields
687 - Send and verify received packets on pg1 interface.
692 pkts = self.create_stream(
693 self.pg0, self.pg1, self.pg_if_packet_sizes, UDP(sport=sport, dport=dport)
696 self.pg0.add_stream(pkts)
698 subtable_key = "subtable_in"
699 self.create_classify_table(
702 src_mac="ffffffffffff", dst_mac="ffffffffffff", ether_type="ffff"
704 + self.build_ip_mask(
715 self.create_classify_table(
718 src_mac="ffffffffffff", dst_mac="ffffffffffff", ether_type="ffff"
720 + self.build_ip_mask(
727 next_table_index=self.acl_tbl_idx.get(subtable_key),
730 self.create_classify_session(
731 self.acl_tbl_idx.get(subtable_key),
732 self.build_mac_match(
733 src_mac=self.pg0.remote_mac,
734 dst_mac=self.pg0.local_mac,
738 + self.build_ip_match(
739 proto=socket.IPPROTO_UDP,
740 src_ip=self.pg0.remote_ip4,
741 dst_ip=self.pg1.remote_ip4,
747 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
748 self.acl_active_table = key
750 self.pg_enable_capture(self.pg_interfaces)
753 pkts = self.pg1.get_capture(len(pkts))
754 self.verify_capture(self.pg1, pkts)
755 self.pg0.assert_nothing_captured(remark="packets forwarded")
756 self.pg2.assert_nothing_captured(remark="packets forwarded")
757 self.pg3.assert_nothing_captured(remark="packets forwarded")
759 def test_oacl_nested(self):
760 """Nested output ACL test
762 Test scenario for Large ACL matching on ethernet+ip+udp headers
763 - Create IPv4 stream for pg1 -> pg0 interface.
764 - Create 1st classifier table, without any entries
765 - Create nested acl matching on ethernet+ip+udp header fields
766 - Send and verify received packets on pg0 interface.
771 pkts = self.create_stream(
772 self.pg1, self.pg0, self.pg_if_packet_sizes, UDP(sport=sport, dport=dport)
774 self.pg1.add_stream(pkts)
776 subtable_key = "subtable_out"
777 self.create_classify_table(
780 src_mac="ffffffffffff", dst_mac="ffffffffffff", ether_type="ffff"
782 + self.build_ip_mask(
793 self.create_classify_table(
796 src_mac="ffffffffffff", dst_mac="ffffffffffff", ether_type="ffff"
798 + self.build_ip_mask(
805 next_table_index=self.acl_tbl_idx.get(subtable_key),
809 self.create_classify_session(
810 self.acl_tbl_idx.get(subtable_key),
811 self.build_mac_match(
812 src_mac=self.pg0.local_mac,
813 dst_mac=self.pg0.remote_mac,
817 + self.build_ip_match(
818 proto=socket.IPPROTO_UDP,
819 src_ip=self.pg1.remote_ip4,
820 dst_ip=self.pg0.remote_ip4,
826 self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
827 self.acl_active_table = key
829 self.pg_enable_capture(self.pg_interfaces)
832 pkts = self.pg0.get_capture(len(pkts))
833 self.verify_capture(self.pg0, pkts)
834 self.pg1.assert_nothing_captured(remark="packets forwarded")
835 self.pg2.assert_nothing_captured(remark="packets forwarded")
836 self.pg3.assert_nothing_captured(remark="packets forwarded")
839 class TestClassifierPBR(TestClassifier):
840 """Classifier PBR Test Case"""
844 super(TestClassifierPBR, cls).setUpClass()
847 def tearDownClass(cls):
848 super(TestClassifierPBR, cls).tearDownClass()
850 def test_acl_pbr(self):
853 Test scenario for PBR with source IP
854 - Create IPv4 stream for pg0 -> pg3 interface.
855 - Configure PBR fib entry for packet forwarding.
856 - Send and verify received packets on pg3 interface.
859 # PBR testing with source IP
860 pkts = self.create_stream(self.pg0, self.pg3, self.pg_if_packet_sizes)
861 self.pg0.add_stream(pkts)
864 self.create_classify_table(key, self.build_ip_mask(src_ip="ffffffff"))
866 # this will create the VRF/table in which we will insert the route
867 self.create_classify_session(
868 self.acl_tbl_idx.get(key),
869 self.build_ip_match(src_ip=self.pg0.remote_ip4),
873 self.assertTrue(self.verify_vrf(self.pbr_vrfid))
878 [VppRoutePath(self.pg3.remote_ip4, INVALID_INDEX)],
879 table_id=self.pbr_vrfid,
883 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
885 self.pg_enable_capture(self.pg_interfaces)
888 pkts = self.pg3.get_capture(len(pkts))
889 self.verify_capture(self.pg3, pkts)
890 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key), 0)
891 self.pg0.assert_nothing_captured(remark="packets forwarded")
892 self.pg1.assert_nothing_captured(remark="packets forwarded")
893 self.pg2.assert_nothing_captured(remark="packets forwarded")
895 # remove the classify session and the route
896 r.remove_vpp_config()
897 self.create_classify_session(
898 self.acl_tbl_idx.get(key),
899 self.build_ip_match(src_ip=self.pg0.remote_ip4),
905 # and the table should be gone.
906 self.assertFalse(self.verify_vrf(self.pbr_vrfid))
909 class TestClassifierPunt(TestClassifier):
910 """Classifier punt Test Case"""
914 super(TestClassifierPunt, cls).setUpClass()
917 def tearDownClass(cls):
918 super(TestClassifierPunt, cls).tearDownClass()
920 def test_punt_udp(self):
921 """IPv4/UDP protocol punt ACL test
923 Test scenario for basic punt ACL with UDP protocol
924 - Create IPv4 stream for pg0 -> pg1 interface.
925 - Create punt ACL with UDP IP protocol.
926 - Send and verify received packets on pg1 interface.
933 self.create_classify_table(
934 key, self.build_ip_mask(src_ip="ffffffff", proto="ff", src_port="ffff")
936 table_index = self.acl_tbl_idx.get(key)
937 self.vapi.punt_acl_add_del(ip4_table_index=table_index)
938 self.acl_active_table = key
940 # punt udp packets to dport received on pg0 through pg1
944 "type": VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4,
947 "af": VppEnum.vl_api_address_family_t.ADDRESS_IP4,
948 "protocol": VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP,
954 self.vapi.ip_punt_redirect(
956 "rx_sw_if_index": self.pg0.sw_if_index,
957 "tx_sw_if_index": self.pg1.sw_if_index,
958 "nh": self.pg1.remote_ip4,
964 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
965 / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
966 / UDP(sport=sport, dport=dport)
971 # allow a session but not matching the stream: expect to drop
972 self.create_classify_session(
975 src_ip=self.pg0.remote_ip4,
976 proto=socket.IPPROTO_UDP,
980 self.send_and_assert_no_replies(self.pg0, pkts)
982 # allow a session matching the stream: expect to pass
983 self.create_classify_session(
986 src_ip=self.pg0.remote_ip4, proto=socket.IPPROTO_UDP, src_port=sport
989 self.send_and_expect_only(self.pg0, pkts, self.pg1)
991 # test dump api: ip4 is set, ip6 is not
992 r = self.vapi.punt_acl_get()
993 self.assertEqual(r.ip4_table_index, table_index)
994 self.assertEqual(r.ip6_table_index, 0xFFFFFFFF)
997 self.acl_active_table = ""
998 self.vapi.punt_acl_add_del(ip4_table_index=table_index, is_add=0)
1000 # test dump api: nothing set
1001 r = self.vapi.punt_acl_get()
1002 self.assertEqual(r.ip4_table_index, 0xFFFFFFFF)
1003 self.assertEqual(r.ip6_table_index, 0xFFFFFFFF)
1006 if __name__ == "__main__":
1007 unittest.main(testRunner=VppTestRunner)