6 from asfframework import VppTestRunner
7 from scapy.packet import Raw
9 from scapy.layers.l2 import Ether
10 from scapy.layers.inet import IP, UDP, TCP
11 from template_classifier import TestClassifier, VarMask, VarMatch
12 from vpp_ip_route import VppIpRoute, VppRoutePath
13 from vpp_ip import INVALID_INDEX
14 from vpp_papi import VppEnum
17 # Tests split to different test case classes because of issue reported in
19 class TestClassifierIP(TestClassifier):
20 """Classifier IP Test Case"""
24 super(TestClassifierIP, cls).setUpClass()
27 def tearDownClass(cls):
28 super(TestClassifierIP, cls).tearDownClass()
30 def test_iacl_src_ip(self):
31 """Source IP iACL test
33 Test scenario for basic IP ACL with source IP
34 - Create IPv4 stream for pg0 -> pg1 interface.
35 - Create iACL with source IP address.
36 - Send and verify received packets on pg1 interface.
39 # Basic iACL testing with source IP
40 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
41 self.pg0.add_stream(pkts)
44 self.create_classify_table(key, self.build_ip_mask(src_ip="ffffffff"))
45 self.create_classify_session(
46 self.acl_tbl_idx.get(key), self.build_ip_match(src_ip=self.pg0.remote_ip4)
48 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
49 self.acl_active_table = key
51 self.pg_enable_capture(self.pg_interfaces)
54 pkts = self.pg1.get_capture(len(pkts))
55 self.verify_capture(self.pg1, pkts)
56 self.pg0.assert_nothing_captured(remark="packets forwarded")
57 self.pg2.assert_nothing_captured(remark="packets forwarded")
58 self.pg3.assert_nothing_captured(remark="packets forwarded")
60 def test_iacl_dst_ip(self):
61 """Destination IP iACL test
63 Test scenario for basic IP ACL with destination IP
64 - Create IPv4 stream for pg0 -> pg1 interface.
65 - Create iACL with destination IP address.
66 - Send and verify received packets on pg1 interface.
69 # Basic iACL testing with destination IP
70 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
71 self.pg0.add_stream(pkts)
74 self.create_classify_table(key, self.build_ip_mask(dst_ip="ffffffff"))
75 self.create_classify_session(
76 self.acl_tbl_idx.get(key), self.build_ip_match(dst_ip=self.pg1.remote_ip4)
78 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
79 self.acl_active_table = key
81 self.pg_enable_capture(self.pg_interfaces)
84 pkts = self.pg1.get_capture(len(pkts))
85 self.verify_capture(self.pg1, pkts)
86 self.pg0.assert_nothing_captured(remark="packets forwarded")
87 self.pg2.assert_nothing_captured(remark="packets forwarded")
88 self.pg3.assert_nothing_captured(remark="packets forwarded")
90 def test_iacl_src_dst_ip(self):
91 """Source and destination IP iACL test
93 Test scenario for basic IP ACL with source and destination IP
94 - Create IPv4 stream for pg0 -> pg1 interface.
95 - Create iACL with source and destination IP addresses.
96 - Send and verify received packets on pg1 interface.
99 # Basic iACL testing with source and destination IP
100 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
101 self.pg0.add_stream(pkts)
104 self.create_classify_table(
105 key, self.build_ip_mask(src_ip="ffffffff", dst_ip="ffffffff")
107 self.create_classify_session(
108 self.acl_tbl_idx.get(key),
109 self.build_ip_match(src_ip=self.pg0.remote_ip4, dst_ip=self.pg1.remote_ip4),
111 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
112 self.acl_active_table = key
114 self.pg_enable_capture(self.pg_interfaces)
117 pkts = self.pg1.get_capture(len(pkts))
118 self.verify_capture(self.pg1, pkts)
119 self.pg0.assert_nothing_captured(remark="packets forwarded")
120 self.pg2.assert_nothing_captured(remark="packets forwarded")
121 self.pg3.assert_nothing_captured(remark="packets forwarded")
124 class TestClassifierUDP(TestClassifier):
125 """Classifier UDP proto Test Case"""
129 super(TestClassifierUDP, cls).setUpClass()
132 def tearDownClass(cls):
133 super(TestClassifierUDP, cls).tearDownClass()
135 def test_iacl_proto_udp(self):
136 """UDP protocol iACL test
138 Test scenario for basic protocol ACL with UDP protocol
139 - Create IPv4 stream for pg0 -> pg1 interface.
140 - Create iACL with UDP IP protocol.
141 - Send and verify received packets on pg1 interface.
144 # Basic iACL testing with UDP protocol
145 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
146 self.pg0.add_stream(pkts)
149 self.create_classify_table(key, self.build_ip_mask(proto="ff"))
150 self.create_classify_session(
151 self.acl_tbl_idx.get(key), self.build_ip_match(proto=socket.IPPROTO_UDP)
153 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
154 self.acl_active_table = key
156 self.pg_enable_capture(self.pg_interfaces)
159 pkts = self.pg1.get_capture(len(pkts))
160 self.verify_capture(self.pg1, pkts)
161 self.pg0.assert_nothing_captured(remark="packets forwarded")
162 self.pg2.assert_nothing_captured(remark="packets forwarded")
163 self.pg3.assert_nothing_captured(remark="packets forwarded")
165 def test_iacl_proto_udp_sport(self):
166 """UDP source port iACL test
168 Test scenario for basic protocol ACL with UDP and sport
169 - Create IPv4 stream for pg0 -> pg1 interface.
170 - Create iACL with UDP IP protocol and defined sport.
171 - Send and verify received packets on pg1 interface.
174 # Basic iACL testing with UDP and sport
176 pkts = self.create_stream(
177 self.pg0, self.pg1, self.pg_if_packet_sizes, UDP(sport=sport, dport=5678)
179 self.pg0.add_stream(pkts)
181 key = "proto_udp_sport"
182 self.create_classify_table(key, self.build_ip_mask(proto="ff", src_port="ffff"))
183 self.create_classify_session(
184 self.acl_tbl_idx.get(key),
185 self.build_ip_match(proto=socket.IPPROTO_UDP, src_port=sport),
187 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
188 self.acl_active_table = key
190 self.pg_enable_capture(self.pg_interfaces)
193 pkts = self.pg1.get_capture(len(pkts))
194 self.verify_capture(self.pg1, pkts)
195 self.pg0.assert_nothing_captured(remark="packets forwarded")
196 self.pg2.assert_nothing_captured(remark="packets forwarded")
197 self.pg3.assert_nothing_captured(remark="packets forwarded")
199 def test_iacl_proto_udp_dport(self):
200 """UDP destination port iACL test
202 Test scenario for basic protocol ACL with UDP and dport
203 - Create IPv4 stream for pg0 -> pg1 interface.
204 - Create iACL with UDP IP protocol and defined dport.
205 - Send and verify received packets on pg1 interface.
208 # Basic iACL testing with UDP and dport
210 pkts = self.create_stream(
211 self.pg0, self.pg1, self.pg_if_packet_sizes, UDP(sport=1234, dport=dport)
213 self.pg0.add_stream(pkts)
215 key = "proto_udp_dport"
216 self.create_classify_table(key, self.build_ip_mask(proto="ff", dst_port="ffff"))
217 self.create_classify_session(
218 self.acl_tbl_idx.get(key),
219 self.build_ip_match(proto=socket.IPPROTO_UDP, dst_port=dport),
221 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
222 self.acl_active_table = key
224 self.pg_enable_capture(self.pg_interfaces)
227 pkts = self.pg1.get_capture(len(pkts))
228 self.verify_capture(self.pg1, pkts)
229 self.pg0.assert_nothing_captured(remark="packets forwarded")
230 self.pg2.assert_nothing_captured(remark="packets forwarded")
231 self.pg3.assert_nothing_captured(remark="packets forwarded")
233 def test_iacl_proto_udp_sport_dport(self):
234 """UDP source and destination ports iACL test
236 Test scenario for basic protocol ACL with UDP and sport and dport
237 - Create IPv4 stream for pg0 -> pg1 interface.
238 - Create iACL with UDP IP protocol and defined sport and dport.
239 - Send and verify received packets on pg1 interface.
242 # Basic iACL testing with UDP and sport and dport
245 pkts = self.create_stream(
246 self.pg0, self.pg1, self.pg_if_packet_sizes, UDP(sport=sport, dport=dport)
248 self.pg0.add_stream(pkts)
250 key = "proto_udp_ports"
251 self.create_classify_table(
252 key, self.build_ip_mask(proto="ff", src_port="ffff", dst_port="ffff")
254 self.create_classify_session(
255 self.acl_tbl_idx.get(key),
257 proto=socket.IPPROTO_UDP, src_port=sport, dst_port=dport
260 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
261 self.acl_active_table = key
263 self.pg_enable_capture(self.pg_interfaces)
266 pkts = self.pg1.get_capture(len(pkts))
267 self.verify_capture(self.pg1, pkts)
268 self.pg0.assert_nothing_captured(remark="packets forwarded")
269 self.pg2.assert_nothing_captured(remark="packets forwarded")
270 self.pg3.assert_nothing_captured(remark="packets forwarded")
273 class TestClassifierTCP(TestClassifier):
274 """Classifier TCP proto Test Case"""
278 super(TestClassifierTCP, cls).setUpClass()
281 def tearDownClass(cls):
282 super(TestClassifierTCP, cls).tearDownClass()
284 def test_iacl_proto_tcp(self):
285 """TCP protocol iACL test
287 Test scenario for basic protocol ACL with TCP protocol
288 - Create IPv4 stream for pg0 -> pg1 interface.
289 - Create iACL with TCP IP protocol.
290 - Send and verify received packets on pg1 interface.
293 # Basic iACL testing with TCP protocol
294 pkts = self.create_stream(
295 self.pg0, self.pg1, self.pg_if_packet_sizes, TCP(sport=1234, dport=5678)
297 self.pg0.add_stream(pkts)
300 self.create_classify_table(key, self.build_ip_mask(proto="ff"))
301 self.create_classify_session(
302 self.acl_tbl_idx.get(key), self.build_ip_match(proto=socket.IPPROTO_TCP)
304 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
305 self.acl_active_table = key
307 self.pg_enable_capture(self.pg_interfaces)
310 pkts = self.pg1.get_capture(len(pkts))
311 self.verify_capture(self.pg1, pkts, TCP)
312 self.pg0.assert_nothing_captured(remark="packets forwarded")
313 self.pg2.assert_nothing_captured(remark="packets forwarded")
314 self.pg3.assert_nothing_captured(remark="packets forwarded")
316 def test_iacl_proto_tcp_sport(self):
317 """TCP source port iACL test
319 Test scenario for basic protocol ACL with TCP and sport
320 - Create IPv4 stream for pg0 -> pg1 interface.
321 - Create iACL with TCP IP protocol and defined sport.
322 - Send and verify received packets on pg1 interface.
325 # Basic iACL testing with TCP and sport
327 pkts = self.create_stream(
328 self.pg0, self.pg1, self.pg_if_packet_sizes, TCP(sport=sport, dport=5678)
330 self.pg0.add_stream(pkts)
332 key = "proto_tcp_sport"
333 self.create_classify_table(key, self.build_ip_mask(proto="ff", src_port="ffff"))
334 self.create_classify_session(
335 self.acl_tbl_idx.get(key),
336 self.build_ip_match(proto=socket.IPPROTO_TCP, src_port=sport),
338 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
339 self.acl_active_table = key
341 self.pg_enable_capture(self.pg_interfaces)
344 pkts = self.pg1.get_capture(len(pkts))
345 self.verify_capture(self.pg1, pkts, TCP)
346 self.pg0.assert_nothing_captured(remark="packets forwarded")
347 self.pg2.assert_nothing_captured(remark="packets forwarded")
348 self.pg3.assert_nothing_captured(remark="packets forwarded")
350 def test_iacl_proto_tcp_dport(self):
351 """TCP destination port iACL test
353 Test scenario for basic protocol ACL with TCP and dport
354 - Create IPv4 stream for pg0 -> pg1 interface.
355 - Create iACL with TCP IP protocol and defined dport.
356 - Send and verify received packets on pg1 interface.
359 # Basic iACL testing with TCP and dport
361 pkts = self.create_stream(
362 self.pg0, self.pg1, self.pg_if_packet_sizes, TCP(sport=1234, dport=dport)
364 self.pg0.add_stream(pkts)
366 key = "proto_tcp_sport"
367 self.create_classify_table(key, self.build_ip_mask(proto="ff", dst_port="ffff"))
368 self.create_classify_session(
369 self.acl_tbl_idx.get(key),
370 self.build_ip_match(proto=socket.IPPROTO_TCP, dst_port=dport),
372 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
373 self.acl_active_table = key
375 self.pg_enable_capture(self.pg_interfaces)
378 pkts = self.pg1.get_capture(len(pkts))
379 self.verify_capture(self.pg1, pkts, TCP)
380 self.pg0.assert_nothing_captured(remark="packets forwarded")
381 self.pg2.assert_nothing_captured(remark="packets forwarded")
382 self.pg3.assert_nothing_captured(remark="packets forwarded")
384 def test_iacl_proto_tcp_sport_dport(self):
385 """TCP source and destination ports iACL test
387 Test scenario for basic protocol ACL with TCP and sport and dport
388 - Create IPv4 stream for pg0 -> pg1 interface.
389 - Create iACL with TCP IP protocol and defined sport and dport.
390 - Send and verify received packets on pg1 interface.
393 # Basic iACL testing with TCP and sport and dport
396 pkts = self.create_stream(
397 self.pg0, self.pg1, self.pg_if_packet_sizes, TCP(sport=sport, dport=dport)
399 self.pg0.add_stream(pkts)
401 key = "proto_tcp_ports"
402 self.create_classify_table(
403 key, self.build_ip_mask(proto="ff", src_port="ffff", dst_port="ffff")
405 self.create_classify_session(
406 self.acl_tbl_idx.get(key),
408 proto=socket.IPPROTO_TCP, src_port=sport, dst_port=dport
411 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
412 self.acl_active_table = key
414 self.pg_enable_capture(self.pg_interfaces)
417 pkts = self.pg1.get_capture(len(pkts))
418 self.verify_capture(self.pg1, pkts, TCP)
419 self.pg0.assert_nothing_captured(remark="packets forwarded")
420 self.pg2.assert_nothing_captured(remark="packets forwarded")
421 self.pg3.assert_nothing_captured(remark="packets forwarded")
424 class TestClassifierIPOut(TestClassifier):
425 """Classifier output IP Test Case"""
429 super(TestClassifierIPOut, cls).setUpClass()
432 def tearDownClass(cls):
433 super(TestClassifierIPOut, cls).tearDownClass()
435 def test_acl_ip_out(self):
436 """Output IP ACL test
438 Test scenario for basic IP ACL with source IP
439 - Create IPv4 stream for pg1 -> pg0 interface.
440 - Create ACL with source IP address.
441 - Send and verify received packets on pg0 interface.
444 # Basic oACL testing with source IP
445 pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes)
446 self.pg1.add_stream(pkts)
449 self.create_classify_table(
450 key, self.build_ip_mask(src_ip="ffffffff"), data_offset=0
452 self.create_classify_session(
453 self.acl_tbl_idx.get(key), self.build_ip_match(src_ip=self.pg1.remote_ip4)
455 self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
456 self.acl_active_table = key
458 self.pg_enable_capture(self.pg_interfaces)
461 pkts = self.pg0.get_capture(len(pkts))
462 self.verify_capture(self.pg0, pkts)
463 self.pg1.assert_nothing_captured(remark="packets forwarded")
464 self.pg2.assert_nothing_captured(remark="packets forwarded")
465 self.pg3.assert_nothing_captured(remark="packets forwarded")
468 class TestClassifierMAC(TestClassifier):
469 """Classifier MAC Test Case"""
473 super(TestClassifierMAC, cls).setUpClass()
476 def tearDownClass(cls):
477 super(TestClassifierMAC, cls).tearDownClass()
479 def test_acl_mac(self):
482 Test scenario for basic MAC ACL with source MAC
483 - Create IPv4 stream for pg0 -> pg2 interface.
484 - Create ACL with source MAC address.
485 - Send and verify received packets on pg2 interface.
488 # Basic iACL testing with source MAC
489 pkts = self.create_stream(self.pg0, self.pg2, self.pg_if_packet_sizes)
490 self.pg0.add_stream(pkts)
493 self.create_classify_table(
494 key, self.build_mac_mask(src_mac="ffffffffffff"), data_offset=-14
496 self.create_classify_session(
497 self.acl_tbl_idx.get(key), self.build_mac_match(src_mac=self.pg0.remote_mac)
499 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
500 self.acl_active_table = key
502 self.pg_enable_capture(self.pg_interfaces)
505 pkts = self.pg2.get_capture(len(pkts))
506 self.verify_capture(self.pg2, pkts)
507 self.pg0.assert_nothing_captured(remark="packets forwarded")
508 self.pg1.assert_nothing_captured(remark="packets forwarded")
509 self.pg3.assert_nothing_captured(remark="packets forwarded")
512 class TestClassifierComplex(TestClassifier):
513 """Large & Nested Classifiers Test Cases"""
517 super(TestClassifierComplex, cls).setUpClass()
520 def tearDownClass(cls):
521 super(TestClassifierComplex, cls).tearDownClass()
523 def test_iacl_large(self):
524 """Large input ACL test
526 Test scenario for Large ACL matching on ethernet+ip+udp headers
527 - Create IPv4 stream for pg0 -> pg1 interface.
528 - Create large acl matching on ethernet+ip+udp header fields
529 - Send and verify received packets on pg1 interface.
532 # 40b offset = 80bytes - (sizeof(UDP/IP/ETH) + 4b)
533 # + 4b as build_ip_ma*() func, do not match against UDP Len & Chksum
534 msk = VarMask(offset=40, spec="ffff")
535 mth = VarMatch(offset=40, value=0x1234, length=2)
537 payload_msk = self.build_payload_mask([msk])
538 payload_match = self.build_payload_match([mth])
543 # 36b offset = 80bytes - (sizeof(UDP/IP/ETH))
544 packet_ex = bytes.fromhex(("0" * 36) + "1234")
545 pkts = self.create_stream(
548 self.pg_if_packet_sizes,
549 UDP(sport=sport, dport=dport),
552 self.pg0.add_stream(pkts)
555 self.create_classify_table(
558 src_mac="ffffffffffff", dst_mac="ffffffffffff", ether_type="ffff"
560 + self.build_ip_mask(
571 self.create_classify_session(
572 self.acl_tbl_idx.get(key),
573 self.build_mac_match(
574 src_mac=self.pg0.remote_mac,
575 dst_mac=self.pg0.local_mac,
579 + self.build_ip_match(
580 proto=socket.IPPROTO_UDP,
581 src_ip=self.pg0.remote_ip4,
582 dst_ip=self.pg1.remote_ip4,
589 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
590 self.acl_active_table = key
592 self.pg_enable_capture(self.pg_interfaces)
595 pkts = self.pg1.get_capture(len(pkts))
596 self.verify_capture(self.pg1, pkts)
597 self.pg0.assert_nothing_captured(remark="packets forwarded")
598 self.pg2.assert_nothing_captured(remark="packets forwarded")
599 self.pg3.assert_nothing_captured(remark="packets forwarded")
601 def test_oacl_large(self):
602 """Large output ACL test
603 Test scenario for Large ACL matching on ethernet+ip+udp headers
604 - Create IPv4 stream for pg1 -> pg0 interface.
605 - Create large acl matching on ethernet+ip+udp header fields
606 - Send and verify received packets on pg0 interface.
609 # 40b offset = 80bytes - (sizeof(UDP/IP/ETH) + 4b)
610 # + 4b as build_ip_ma*() func, do not match against UDP Len & Chksum
611 msk = VarMask(offset=40, spec="ffff")
612 mth = VarMatch(offset=40, value=0x1234, length=2)
614 payload_msk = self.build_payload_mask([msk])
615 payload_match = self.build_payload_match([mth])
620 # 36b offset = 80bytes - (sizeof(UDP/IP/ETH))
621 packet_ex = bytes.fromhex(("0" * 36) + "1234")
622 pkts = self.create_stream(
625 self.pg_if_packet_sizes,
626 UDP(sport=sport, dport=dport),
629 self.pg1.add_stream(pkts)
632 self.create_classify_table(
635 src_mac="ffffffffffff", dst_mac="ffffffffffff", ether_type="ffff"
637 + self.build_ip_mask(
648 self.create_classify_session(
649 self.acl_tbl_idx.get(key),
650 self.build_mac_match(
651 src_mac=self.pg0.local_mac,
652 dst_mac=self.pg0.remote_mac,
656 + self.build_ip_match(
657 proto=socket.IPPROTO_UDP,
658 src_ip=self.pg1.remote_ip4,
659 dst_ip=self.pg0.remote_ip4,
666 self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
667 self.acl_active_table = key
669 self.pg_enable_capture(self.pg_interfaces)
672 pkts = self.pg0.get_capture(len(pkts))
673 self.verify_capture(self.pg0, pkts)
674 self.pg1.assert_nothing_captured(remark="packets forwarded")
675 self.pg2.assert_nothing_captured(remark="packets forwarded")
676 self.pg3.assert_nothing_captured(remark="packets forwarded")
678 def test_iacl_nested(self):
679 """Nested input ACL test
681 Test scenario for Large ACL matching on ethernet+ip+udp headers
682 - Create IPv4 stream for pg0 -> pg1 interface.
683 - Create 1st classifier table, without any entries
684 - Create nested acl matching on ethernet+ip+udp header fields
685 - Send and verify received packets on pg1 interface.
690 pkts = self.create_stream(
691 self.pg0, self.pg1, self.pg_if_packet_sizes, UDP(sport=sport, dport=dport)
694 self.pg0.add_stream(pkts)
696 subtable_key = "subtable_in"
697 self.create_classify_table(
700 src_mac="ffffffffffff", dst_mac="ffffffffffff", ether_type="ffff"
702 + self.build_ip_mask(
713 self.create_classify_table(
716 src_mac="ffffffffffff", dst_mac="ffffffffffff", ether_type="ffff"
718 + self.build_ip_mask(
725 next_table_index=self.acl_tbl_idx.get(subtable_key),
728 self.create_classify_session(
729 self.acl_tbl_idx.get(subtable_key),
730 self.build_mac_match(
731 src_mac=self.pg0.remote_mac,
732 dst_mac=self.pg0.local_mac,
736 + self.build_ip_match(
737 proto=socket.IPPROTO_UDP,
738 src_ip=self.pg0.remote_ip4,
739 dst_ip=self.pg1.remote_ip4,
745 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
746 self.acl_active_table = key
748 self.pg_enable_capture(self.pg_interfaces)
751 pkts = self.pg1.get_capture(len(pkts))
752 self.verify_capture(self.pg1, pkts)
753 self.pg0.assert_nothing_captured(remark="packets forwarded")
754 self.pg2.assert_nothing_captured(remark="packets forwarded")
755 self.pg3.assert_nothing_captured(remark="packets forwarded")
757 def test_oacl_nested(self):
758 """Nested output ACL test
760 Test scenario for Large ACL matching on ethernet+ip+udp headers
761 - Create IPv4 stream for pg1 -> pg0 interface.
762 - Create 1st classifier table, without any entries
763 - Create nested acl matching on ethernet+ip+udp header fields
764 - Send and verify received packets on pg0 interface.
769 pkts = self.create_stream(
770 self.pg1, self.pg0, self.pg_if_packet_sizes, UDP(sport=sport, dport=dport)
772 self.pg1.add_stream(pkts)
774 subtable_key = "subtable_out"
775 self.create_classify_table(
778 src_mac="ffffffffffff", dst_mac="ffffffffffff", ether_type="ffff"
780 + self.build_ip_mask(
791 self.create_classify_table(
794 src_mac="ffffffffffff", dst_mac="ffffffffffff", ether_type="ffff"
796 + self.build_ip_mask(
803 next_table_index=self.acl_tbl_idx.get(subtable_key),
807 self.create_classify_session(
808 self.acl_tbl_idx.get(subtable_key),
809 self.build_mac_match(
810 src_mac=self.pg0.local_mac,
811 dst_mac=self.pg0.remote_mac,
815 + self.build_ip_match(
816 proto=socket.IPPROTO_UDP,
817 src_ip=self.pg1.remote_ip4,
818 dst_ip=self.pg0.remote_ip4,
824 self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
825 self.acl_active_table = key
827 self.pg_enable_capture(self.pg_interfaces)
830 pkts = self.pg0.get_capture(len(pkts))
831 self.verify_capture(self.pg0, pkts)
832 self.pg1.assert_nothing_captured(remark="packets forwarded")
833 self.pg2.assert_nothing_captured(remark="packets forwarded")
834 self.pg3.assert_nothing_captured(remark="packets forwarded")
837 class TestClassifierPBR(TestClassifier):
838 """Classifier PBR Test Case"""
842 super(TestClassifierPBR, cls).setUpClass()
845 def tearDownClass(cls):
846 super(TestClassifierPBR, cls).tearDownClass()
848 def test_acl_pbr(self):
851 Test scenario for PBR with source IP
852 - Create IPv4 stream for pg0 -> pg3 interface.
853 - Configure PBR fib entry for packet forwarding.
854 - Send and verify received packets on pg3 interface.
857 # PBR testing with source IP
858 pkts = self.create_stream(self.pg0, self.pg3, self.pg_if_packet_sizes)
859 self.pg0.add_stream(pkts)
862 self.create_classify_table(key, self.build_ip_mask(src_ip="ffffffff"))
864 # this will create the VRF/table in which we will insert the route
865 self.create_classify_session(
866 self.acl_tbl_idx.get(key),
867 self.build_ip_match(src_ip=self.pg0.remote_ip4),
871 self.assertTrue(self.verify_vrf(self.pbr_vrfid))
876 [VppRoutePath(self.pg3.remote_ip4, INVALID_INDEX)],
877 table_id=self.pbr_vrfid,
881 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
883 self.pg_enable_capture(self.pg_interfaces)
886 pkts = self.pg3.get_capture(len(pkts))
887 self.verify_capture(self.pg3, pkts)
888 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key), 0)
889 self.pg0.assert_nothing_captured(remark="packets forwarded")
890 self.pg1.assert_nothing_captured(remark="packets forwarded")
891 self.pg2.assert_nothing_captured(remark="packets forwarded")
893 # remove the classify session and the route
894 r.remove_vpp_config()
895 self.create_classify_session(
896 self.acl_tbl_idx.get(key),
897 self.build_ip_match(src_ip=self.pg0.remote_ip4),
903 # and the table should be gone.
904 self.assertFalse(self.verify_vrf(self.pbr_vrfid))
907 class TestClassifierPunt(TestClassifier):
908 """Classifier punt Test Case"""
912 super(TestClassifierPunt, cls).setUpClass()
915 def tearDownClass(cls):
916 super(TestClassifierPunt, cls).tearDownClass()
918 def test_punt_udp(self):
919 """IPv4/UDP protocol punt ACL test
921 Test scenario for basic punt ACL with UDP protocol
922 - Create IPv4 stream for pg0 -> pg1 interface.
923 - Create punt ACL with UDP IP protocol.
924 - Send and verify received packets on pg1 interface.
931 self.create_classify_table(
932 key, self.build_ip_mask(src_ip="ffffffff", proto="ff", src_port="ffff")
934 table_index = self.acl_tbl_idx.get(key)
935 self.vapi.punt_acl_add_del(ip4_table_index=table_index)
936 self.acl_active_table = key
938 # punt udp packets to dport received on pg0 through pg1
942 "type": VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4,
945 "af": VppEnum.vl_api_address_family_t.ADDRESS_IP4,
946 "protocol": VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP,
952 self.vapi.ip_punt_redirect(
954 "rx_sw_if_index": self.pg0.sw_if_index,
955 "tx_sw_if_index": self.pg1.sw_if_index,
956 "nh": self.pg1.remote_ip4,
962 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
963 / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
964 / UDP(sport=sport, dport=dport)
969 # allow a session but not matching the stream: expect to drop
970 self.create_classify_session(
973 src_ip=self.pg0.remote_ip4,
974 proto=socket.IPPROTO_UDP,
978 self.send_and_assert_no_replies(self.pg0, pkts)
980 # allow a session matching the stream: expect to pass
981 self.create_classify_session(
984 src_ip=self.pg0.remote_ip4, proto=socket.IPPROTO_UDP, src_port=sport
987 self.send_and_expect_only(self.pg0, pkts, self.pg1)
989 # test dump api: ip4 is set, ip6 is not
990 r = self.vapi.punt_acl_get()
991 self.assertEqual(r.ip4_table_index, table_index)
992 self.assertEqual(r.ip6_table_index, 0xFFFFFFFF)
995 self.acl_active_table = ""
996 self.vapi.punt_acl_add_del(ip4_table_index=table_index, is_add=0)
998 # test dump api: nothing set
999 r = self.vapi.punt_acl_get()
1000 self.assertEqual(r.ip4_table_index, 0xFFFFFFFF)
1001 self.assertEqual(r.ip6_table_index, 0xFFFFFFFF)
1004 if __name__ == "__main__":
1005 unittest.main(testRunner=VppTestRunner)