7 from framework import VppTestCase, VppTestRunner
8 from scapy.packet import Raw, Packet
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP, UDP, TCP
13 from template_classifier import TestClassifier, VarMask, VarMatch
14 from vpp_ip_route import VppIpRoute, VppRoutePath
15 from vpp_ip import INVALID_INDEX
16 from vpp_papi import VppEnum
19 # Tests split to different test case classes because of issue reported in
21 class TestClassifierIP(TestClassifier):
22 """ Classifier IP Test Case """
26 super(TestClassifierIP, cls).setUpClass()
29 def tearDownClass(cls):
30 super(TestClassifierIP, cls).tearDownClass()
32 def test_iacl_src_ip(self):
33 """ Source IP iACL test
35 Test scenario for basic IP ACL with source IP
36 - Create IPv4 stream for pg0 -> pg1 interface.
37 - Create iACL with source IP address.
38 - Send and verify received packets on pg1 interface.
41 # Basic iACL testing with source IP
42 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
43 self.pg0.add_stream(pkts)
46 self.create_classify_table(key, self.build_ip_mask(src_ip='ffffffff'))
47 self.create_classify_session(
48 self.acl_tbl_idx.get(key),
49 self.build_ip_match(src_ip=self.pg0.remote_ip4))
50 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
51 self.acl_active_table = key
53 self.pg_enable_capture(self.pg_interfaces)
56 pkts = self.pg1.get_capture(len(pkts))
57 self.verify_capture(self.pg1, pkts)
58 self.pg0.assert_nothing_captured(remark="packets forwarded")
59 self.pg2.assert_nothing_captured(remark="packets forwarded")
60 self.pg3.assert_nothing_captured(remark="packets forwarded")
62 def test_iacl_dst_ip(self):
63 """ Destination IP iACL test
65 Test scenario for basic IP ACL with destination IP
66 - Create IPv4 stream for pg0 -> pg1 interface.
67 - Create iACL with destination IP address.
68 - Send and verify received packets on pg1 interface.
71 # Basic iACL testing with destination IP
72 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
73 self.pg0.add_stream(pkts)
76 self.create_classify_table(key, self.build_ip_mask(dst_ip='ffffffff'))
77 self.create_classify_session(
78 self.acl_tbl_idx.get(key),
79 self.build_ip_match(dst_ip=self.pg1.remote_ip4))
80 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
81 self.acl_active_table = key
83 self.pg_enable_capture(self.pg_interfaces)
86 pkts = self.pg1.get_capture(len(pkts))
87 self.verify_capture(self.pg1, pkts)
88 self.pg0.assert_nothing_captured(remark="packets forwarded")
89 self.pg2.assert_nothing_captured(remark="packets forwarded")
90 self.pg3.assert_nothing_captured(remark="packets forwarded")
92 def test_iacl_src_dst_ip(self):
93 """ Source and destination IP iACL test
95 Test scenario for basic IP ACL with source and destination IP
96 - Create IPv4 stream for pg0 -> pg1 interface.
97 - Create iACL with source and destination IP addresses.
98 - Send and verify received packets on pg1 interface.
101 # Basic iACL testing with source and destination IP
102 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
103 self.pg0.add_stream(pkts)
106 self.create_classify_table(
107 key, self.build_ip_mask(src_ip='ffffffff', dst_ip='ffffffff'))
108 self.create_classify_session(
109 self.acl_tbl_idx.get(key),
110 self.build_ip_match(src_ip=self.pg0.remote_ip4,
111 dst_ip=self.pg1.remote_ip4))
112 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
113 self.acl_active_table = key
115 self.pg_enable_capture(self.pg_interfaces)
118 pkts = self.pg1.get_capture(len(pkts))
119 self.verify_capture(self.pg1, pkts)
120 self.pg0.assert_nothing_captured(remark="packets forwarded")
121 self.pg2.assert_nothing_captured(remark="packets forwarded")
122 self.pg3.assert_nothing_captured(remark="packets forwarded")
125 class TestClassifierUDP(TestClassifier):
126 """ Classifier UDP proto Test Case """
130 super(TestClassifierUDP, cls).setUpClass()
133 def tearDownClass(cls):
134 super(TestClassifierUDP, cls).tearDownClass()
136 def test_iacl_proto_udp(self):
137 """ UDP protocol iACL test
139 Test scenario for basic protocol ACL with UDP protocol
140 - Create IPv4 stream for pg0 -> pg1 interface.
141 - Create iACL with UDP IP protocol.
142 - Send and verify received packets on pg1 interface.
145 # Basic iACL testing with UDP protocol
146 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
147 self.pg0.add_stream(pkts)
150 self.create_classify_table(key, self.build_ip_mask(proto='ff'))
151 self.create_classify_session(
152 self.acl_tbl_idx.get(key),
153 self.build_ip_match(proto=socket.IPPROTO_UDP))
154 self.input_acl_set_interface(
155 self.pg0, self.acl_tbl_idx.get(key))
156 self.acl_active_table = key
158 self.pg_enable_capture(self.pg_interfaces)
161 pkts = self.pg1.get_capture(len(pkts))
162 self.verify_capture(self.pg1, pkts)
163 self.pg0.assert_nothing_captured(remark="packets forwarded")
164 self.pg2.assert_nothing_captured(remark="packets forwarded")
165 self.pg3.assert_nothing_captured(remark="packets forwarded")
167 def test_iacl_proto_udp_sport(self):
168 """ UDP source port iACL test
170 Test scenario for basic protocol ACL with UDP and sport
171 - Create IPv4 stream for pg0 -> pg1 interface.
172 - Create iACL with UDP IP protocol and defined sport.
173 - Send and verify received packets on pg1 interface.
176 # Basic iACL testing with UDP and sport
178 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
179 UDP(sport=sport, dport=5678))
180 self.pg0.add_stream(pkts)
182 key = 'proto_udp_sport'
183 self.create_classify_table(
184 key, self.build_ip_mask(proto='ff', src_port='ffff'))
185 self.create_classify_session(
186 self.acl_tbl_idx.get(key),
187 self.build_ip_match(proto=socket.IPPROTO_UDP, src_port=sport))
188 self.input_acl_set_interface(
189 self.pg0, self.acl_tbl_idx.get(key))
190 self.acl_active_table = key
192 self.pg_enable_capture(self.pg_interfaces)
195 pkts = self.pg1.get_capture(len(pkts))
196 self.verify_capture(self.pg1, pkts)
197 self.pg0.assert_nothing_captured(remark="packets forwarded")
198 self.pg2.assert_nothing_captured(remark="packets forwarded")
199 self.pg3.assert_nothing_captured(remark="packets forwarded")
201 def test_iacl_proto_udp_dport(self):
202 """ UDP destination port iACL test
204 Test scenario for basic protocol ACL with UDP and dport
205 - Create IPv4 stream for pg0 -> pg1 interface.
206 - Create iACL with UDP IP protocol and defined dport.
207 - Send and verify received packets on pg1 interface.
210 # Basic iACL testing with UDP and dport
212 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
213 UDP(sport=1234, dport=dport))
214 self.pg0.add_stream(pkts)
216 key = 'proto_udp_dport'
217 self.create_classify_table(
218 key, self.build_ip_mask(proto='ff', dst_port='ffff'))
219 self.create_classify_session(
220 self.acl_tbl_idx.get(key),
221 self.build_ip_match(proto=socket.IPPROTO_UDP, dst_port=dport))
222 self.input_acl_set_interface(
223 self.pg0, self.acl_tbl_idx.get(key))
224 self.acl_active_table = key
226 self.pg_enable_capture(self.pg_interfaces)
229 pkts = self.pg1.get_capture(len(pkts))
230 self.verify_capture(self.pg1, pkts)
231 self.pg0.assert_nothing_captured(remark="packets forwarded")
232 self.pg2.assert_nothing_captured(remark="packets forwarded")
233 self.pg3.assert_nothing_captured(remark="packets forwarded")
235 def test_iacl_proto_udp_sport_dport(self):
236 """ UDP source and destination ports iACL test
238 Test scenario for basic protocol ACL with UDP and sport and dport
239 - Create IPv4 stream for pg0 -> pg1 interface.
240 - Create iACL with UDP IP protocol and defined sport and dport.
241 - Send and verify received packets on pg1 interface.
244 # Basic iACL testing with UDP and sport and dport
247 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
248 UDP(sport=sport, dport=dport))
249 self.pg0.add_stream(pkts)
251 key = 'proto_udp_ports'
252 self.create_classify_table(
254 self.build_ip_mask(proto='ff', src_port='ffff', dst_port='ffff'))
255 self.create_classify_session(
256 self.acl_tbl_idx.get(key),
257 self.build_ip_match(proto=socket.IPPROTO_UDP, src_port=sport,
259 self.input_acl_set_interface(
260 self.pg0, self.acl_tbl_idx.get(key))
261 self.acl_active_table = key
263 self.pg_enable_capture(self.pg_interfaces)
266 pkts = self.pg1.get_capture(len(pkts))
267 self.verify_capture(self.pg1, pkts)
268 self.pg0.assert_nothing_captured(remark="packets forwarded")
269 self.pg2.assert_nothing_captured(remark="packets forwarded")
270 self.pg3.assert_nothing_captured(remark="packets forwarded")
273 class TestClassifierTCP(TestClassifier):
274 """ Classifier TCP proto Test Case """
278 super(TestClassifierTCP, cls).setUpClass()
281 def tearDownClass(cls):
282 super(TestClassifierTCP, cls).tearDownClass()
284 def test_iacl_proto_tcp(self):
285 """ TCP protocol iACL test
287 Test scenario for basic protocol ACL with TCP protocol
288 - Create IPv4 stream for pg0 -> pg1 interface.
289 - Create iACL with TCP IP protocol.
290 - Send and verify received packets on pg1 interface.
293 # Basic iACL testing with TCP protocol
294 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
295 TCP(sport=1234, dport=5678))
296 self.pg0.add_stream(pkts)
299 self.create_classify_table(key, self.build_ip_mask(proto='ff'))
300 self.create_classify_session(
301 self.acl_tbl_idx.get(key),
302 self.build_ip_match(proto=socket.IPPROTO_TCP))
303 self.input_acl_set_interface(
304 self.pg0, self.acl_tbl_idx.get(key))
305 self.acl_active_table = key
307 self.pg_enable_capture(self.pg_interfaces)
310 pkts = self.pg1.get_capture(len(pkts))
311 self.verify_capture(self.pg1, pkts, TCP)
312 self.pg0.assert_nothing_captured(remark="packets forwarded")
313 self.pg2.assert_nothing_captured(remark="packets forwarded")
314 self.pg3.assert_nothing_captured(remark="packets forwarded")
316 def test_iacl_proto_tcp_sport(self):
317 """ TCP source port iACL test
319 Test scenario for basic protocol ACL with TCP and sport
320 - Create IPv4 stream for pg0 -> pg1 interface.
321 - Create iACL with TCP IP protocol and defined sport.
322 - Send and verify received packets on pg1 interface.
325 # Basic iACL testing with TCP and sport
327 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
328 TCP(sport=sport, dport=5678))
329 self.pg0.add_stream(pkts)
331 key = 'proto_tcp_sport'
332 self.create_classify_table(
333 key, self.build_ip_mask(proto='ff', src_port='ffff'))
334 self.create_classify_session(
335 self.acl_tbl_idx.get(key),
336 self.build_ip_match(proto=socket.IPPROTO_TCP, src_port=sport))
337 self.input_acl_set_interface(
338 self.pg0, self.acl_tbl_idx.get(key))
339 self.acl_active_table = key
341 self.pg_enable_capture(self.pg_interfaces)
344 pkts = self.pg1.get_capture(len(pkts))
345 self.verify_capture(self.pg1, pkts, TCP)
346 self.pg0.assert_nothing_captured(remark="packets forwarded")
347 self.pg2.assert_nothing_captured(remark="packets forwarded")
348 self.pg3.assert_nothing_captured(remark="packets forwarded")
350 def test_iacl_proto_tcp_dport(self):
351 """ TCP destination port iACL test
353 Test scenario for basic protocol ACL with TCP and dport
354 - Create IPv4 stream for pg0 -> pg1 interface.
355 - Create iACL with TCP IP protocol and defined dport.
356 - Send and verify received packets on pg1 interface.
359 # Basic iACL testing with TCP and dport
361 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
362 TCP(sport=1234, dport=dport))
363 self.pg0.add_stream(pkts)
365 key = 'proto_tcp_sport'
366 self.create_classify_table(
367 key, self.build_ip_mask(proto='ff', dst_port='ffff'))
368 self.create_classify_session(
369 self.acl_tbl_idx.get(key),
370 self.build_ip_match(proto=socket.IPPROTO_TCP, dst_port=dport))
371 self.input_acl_set_interface(
372 self.pg0, self.acl_tbl_idx.get(key))
373 self.acl_active_table = key
375 self.pg_enable_capture(self.pg_interfaces)
378 pkts = self.pg1.get_capture(len(pkts))
379 self.verify_capture(self.pg1, pkts, TCP)
380 self.pg0.assert_nothing_captured(remark="packets forwarded")
381 self.pg2.assert_nothing_captured(remark="packets forwarded")
382 self.pg3.assert_nothing_captured(remark="packets forwarded")
384 def test_iacl_proto_tcp_sport_dport(self):
385 """ TCP source and destination ports iACL test
387 Test scenario for basic protocol ACL with TCP and sport and dport
388 - Create IPv4 stream for pg0 -> pg1 interface.
389 - Create iACL with TCP IP protocol and defined sport and dport.
390 - Send and verify received packets on pg1 interface.
393 # Basic iACL testing with TCP and sport and dport
396 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
397 TCP(sport=sport, dport=dport))
398 self.pg0.add_stream(pkts)
400 key = 'proto_tcp_ports'
401 self.create_classify_table(
403 self.build_ip_mask(proto='ff', src_port='ffff', dst_port='ffff'))
404 self.create_classify_session(
405 self.acl_tbl_idx.get(key),
406 self.build_ip_match(proto=socket.IPPROTO_TCP, src_port=sport,
408 self.input_acl_set_interface(
409 self.pg0, self.acl_tbl_idx.get(key))
410 self.acl_active_table = key
412 self.pg_enable_capture(self.pg_interfaces)
415 pkts = self.pg1.get_capture(len(pkts))
416 self.verify_capture(self.pg1, pkts, TCP)
417 self.pg0.assert_nothing_captured(remark="packets forwarded")
418 self.pg2.assert_nothing_captured(remark="packets forwarded")
419 self.pg3.assert_nothing_captured(remark="packets forwarded")
422 class TestClassifierIPOut(TestClassifier):
423 """ Classifier output IP Test Case """
427 super(TestClassifierIPOut, cls).setUpClass()
430 def tearDownClass(cls):
431 super(TestClassifierIPOut, cls).tearDownClass()
433 def test_acl_ip_out(self):
434 """ Output IP ACL test
436 Test scenario for basic IP ACL with source IP
437 - Create IPv4 stream for pg1 -> pg0 interface.
438 - Create ACL with source IP address.
439 - Send and verify received packets on pg0 interface.
442 # Basic oACL testing with source IP
443 pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes)
444 self.pg1.add_stream(pkts)
447 self.create_classify_table(
448 key, self.build_ip_mask(src_ip='ffffffff'), data_offset=0)
449 self.create_classify_session(
450 self.acl_tbl_idx.get(key),
451 self.build_ip_match(src_ip=self.pg1.remote_ip4))
452 self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
453 self.acl_active_table = key
455 self.pg_enable_capture(self.pg_interfaces)
458 pkts = self.pg0.get_capture(len(pkts))
459 self.verify_capture(self.pg0, pkts)
460 self.pg1.assert_nothing_captured(remark="packets forwarded")
461 self.pg2.assert_nothing_captured(remark="packets forwarded")
462 self.pg3.assert_nothing_captured(remark="packets forwarded")
465 class TestClassifierMAC(TestClassifier):
466 """ Classifier MAC Test Case """
470 super(TestClassifierMAC, cls).setUpClass()
473 def tearDownClass(cls):
474 super(TestClassifierMAC, cls).tearDownClass()
476 def test_acl_mac(self):
479 Test scenario for basic MAC ACL with source MAC
480 - Create IPv4 stream for pg0 -> pg2 interface.
481 - Create ACL with source MAC address.
482 - Send and verify received packets on pg2 interface.
485 # Basic iACL testing with source MAC
486 pkts = self.create_stream(self.pg0, self.pg2, self.pg_if_packet_sizes)
487 self.pg0.add_stream(pkts)
490 self.create_classify_table(
491 key, self.build_mac_mask(src_mac='ffffffffffff'), data_offset=-14)
492 self.create_classify_session(
493 self.acl_tbl_idx.get(key),
494 self.build_mac_match(src_mac=self.pg0.remote_mac))
495 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
496 self.acl_active_table = key
498 self.pg_enable_capture(self.pg_interfaces)
501 pkts = self.pg2.get_capture(len(pkts))
502 self.verify_capture(self.pg2, pkts)
503 self.pg0.assert_nothing_captured(remark="packets forwarded")
504 self.pg1.assert_nothing_captured(remark="packets forwarded")
505 self.pg3.assert_nothing_captured(remark="packets forwarded")
508 class TestClassifierComplex(TestClassifier):
509 """ Large & Nested Classifiers Test Cases """
513 super(TestClassifierComplex, cls).setUpClass()
516 def tearDownClass(cls):
517 super(TestClassifierComplex, cls).tearDownClass()
519 def test_iacl_large(self):
520 """ Large input ACL test
522 Test scenario for Large ACL matching on ethernet+ip+udp headers
523 - Create IPv4 stream for pg0 -> pg1 interface.
524 - Create large acl matching on ethernet+ip+udp header fields
525 - Send and verify received packets on pg1 interface.
528 # 40b offset = 80bytes - (sizeof(UDP/IP/ETH) + 4b)
529 # + 4b as build_ip_ma*() func, do not match against UDP Len & Chksum
530 msk = VarMask(offset=40, spec='ffff')
531 mth = VarMatch(offset=40, value=0x1234, length=2)
533 payload_msk = self.build_payload_mask([msk])
534 payload_match = self.build_payload_match([mth])
539 # 36b offset = 80bytes - (sizeof(UDP/IP/ETH))
540 packet_ex = bytes.fromhex(('0' * 36) + '1234')
541 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
542 UDP(sport=sport, dport=dport),
544 self.pg0.add_stream(pkts)
547 self.create_classify_table(
549 self.build_mac_mask(src_mac='ffffffffffff',
550 dst_mac='ffffffffffff',
552 self.build_ip_mask(proto='ff',
560 self.create_classify_session(
561 self.acl_tbl_idx.get(key),
562 self.build_mac_match(src_mac=self.pg0.remote_mac,
563 dst_mac=self.pg0.local_mac,
566 self.build_ip_match(proto=socket.IPPROTO_UDP,
567 src_ip=self.pg0.remote_ip4,
568 dst_ip=self.pg1.remote_ip4,
574 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
575 self.acl_active_table = key
577 self.pg_enable_capture(self.pg_interfaces)
580 pkts = self.pg1.get_capture(len(pkts))
581 self.verify_capture(self.pg1, pkts)
582 self.pg0.assert_nothing_captured(remark="packets forwarded")
583 self.pg2.assert_nothing_captured(remark="packets forwarded")
584 self.pg3.assert_nothing_captured(remark="packets forwarded")
586 def test_oacl_large(self):
587 """ Large output ACL test
588 Test scenario for Large ACL matching on ethernet+ip+udp headers
589 - Create IPv4 stream for pg1 -> pg0 interface.
590 - Create large acl matching on ethernet+ip+udp header fields
591 - Send and verify received packets on pg0 interface.
594 # 40b offset = 80bytes - (sizeof(UDP/IP/ETH) + 4b)
595 # + 4b as build_ip_ma*() func, do not match against UDP Len & Chksum
596 msk = VarMask(offset=40, spec='ffff')
597 mth = VarMatch(offset=40, value=0x1234, length=2)
599 payload_msk = self.build_payload_mask([msk])
600 payload_match = self.build_payload_match([mth])
605 # 36b offset = 80bytes - (sizeof(UDP/IP/ETH))
606 packet_ex = bytes.fromhex(('0' * 36) + '1234')
607 pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes,
608 UDP(sport=sport, dport=dport),
610 self.pg1.add_stream(pkts)
613 self.create_classify_table(
615 self.build_mac_mask(src_mac='ffffffffffff',
616 dst_mac='ffffffffffff',
618 self.build_ip_mask(proto='ff',
626 self.create_classify_session(
627 self.acl_tbl_idx.get(key),
628 self.build_mac_match(src_mac=self.pg0.local_mac,
629 dst_mac=self.pg0.remote_mac,
632 self.build_ip_match(proto=socket.IPPROTO_UDP,
633 src_ip=self.pg1.remote_ip4,
634 dst_ip=self.pg0.remote_ip4,
639 self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
640 self.acl_active_table = key
642 self.pg_enable_capture(self.pg_interfaces)
645 pkts = self.pg0.get_capture(len(pkts))
646 self.verify_capture(self.pg0, pkts)
647 self.pg1.assert_nothing_captured(remark="packets forwarded")
648 self.pg2.assert_nothing_captured(remark="packets forwarded")
649 self.pg3.assert_nothing_captured(remark="packets forwarded")
651 def test_iacl_nested(self):
652 """ Nested input ACL test
654 Test scenario for Large ACL matching on ethernet+ip+udp headers
655 - Create IPv4 stream for pg0 -> pg1 interface.
656 - Create 1st classifier table, without any entries
657 - Create nested acl matching on ethernet+ip+udp header fields
658 - Send and verify received packets on pg1 interface.
663 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
664 UDP(sport=sport, dport=dport))
666 self.pg0.add_stream(pkts)
668 subtable_key = 'subtable_in'
669 self.create_classify_table(
671 self.build_mac_mask(src_mac='ffffffffffff',
672 dst_mac='ffffffffffff',
674 self.build_ip_mask(proto='ff',
682 self.create_classify_table(
684 self.build_mac_mask(src_mac='ffffffffffff',
685 dst_mac='ffffffffffff',
687 self.build_ip_mask(proto='ff',
692 next_table_index=self.acl_tbl_idx.get(subtable_key))
694 self.create_classify_session(
695 self.acl_tbl_idx.get(subtable_key),
696 self.build_mac_match(src_mac=self.pg0.remote_mac,
697 dst_mac=self.pg0.local_mac,
700 self.build_ip_match(proto=socket.IPPROTO_UDP,
701 src_ip=self.pg0.remote_ip4,
702 dst_ip=self.pg1.remote_ip4,
706 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
707 self.acl_active_table = key
709 self.pg_enable_capture(self.pg_interfaces)
712 pkts = self.pg1.get_capture(len(pkts))
713 self.verify_capture(self.pg1, pkts)
714 self.pg0.assert_nothing_captured(remark="packets forwarded")
715 self.pg2.assert_nothing_captured(remark="packets forwarded")
716 self.pg3.assert_nothing_captured(remark="packets forwarded")
718 def test_oacl_nested(self):
719 """ Nested output ACL test
721 Test scenario for Large ACL matching on ethernet+ip+udp headers
722 - Create IPv4 stream for pg1 -> pg0 interface.
723 - Create 1st classifier table, without any entries
724 - Create nested acl matching on ethernet+ip+udp header fields
725 - Send and verify received packets on pg0 interface.
730 pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes,
731 UDP(sport=sport, dport=dport))
732 self.pg1.add_stream(pkts)
734 subtable_key = 'subtable_out'
735 self.create_classify_table(
737 self.build_mac_mask(src_mac='ffffffffffff',
738 dst_mac='ffffffffffff',
740 self.build_ip_mask(proto='ff',
748 self.create_classify_table(
750 self.build_mac_mask(src_mac='ffffffffffff',
751 dst_mac='ffffffffffff',
753 self.build_ip_mask(proto='ff',
758 next_table_index=self.acl_tbl_idx.get(subtable_key),
761 self.create_classify_session(
762 self.acl_tbl_idx.get(subtable_key),
763 self.build_mac_match(src_mac=self.pg0.local_mac,
764 dst_mac=self.pg0.remote_mac,
767 self.build_ip_match(proto=socket.IPPROTO_UDP,
768 src_ip=self.pg1.remote_ip4,
769 dst_ip=self.pg0.remote_ip4,
773 self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
774 self.acl_active_table = key
776 self.pg_enable_capture(self.pg_interfaces)
779 pkts = self.pg0.get_capture(len(pkts))
780 self.verify_capture(self.pg0, pkts)
781 self.pg1.assert_nothing_captured(remark="packets forwarded")
782 self.pg2.assert_nothing_captured(remark="packets forwarded")
783 self.pg3.assert_nothing_captured(remark="packets forwarded")
786 class TestClassifierPBR(TestClassifier):
787 """ Classifier PBR Test Case """
791 super(TestClassifierPBR, cls).setUpClass()
794 def tearDownClass(cls):
795 super(TestClassifierPBR, cls).tearDownClass()
797 def test_acl_pbr(self):
800 Test scenario for PBR with source IP
801 - Create IPv4 stream for pg0 -> pg3 interface.
802 - Configure PBR fib entry for packet forwarding.
803 - Send and verify received packets on pg3 interface.
806 # PBR testing with source IP
807 pkts = self.create_stream(self.pg0, self.pg3, self.pg_if_packet_sizes)
808 self.pg0.add_stream(pkts)
811 self.create_classify_table(key, self.build_ip_mask(src_ip='ffffffff'))
813 # this will create the VRF/table in which we will insert the route
814 self.create_classify_session(
815 self.acl_tbl_idx.get(key),
816 self.build_ip_match(src_ip=self.pg0.remote_ip4),
817 pbr_option, self.pbr_vrfid)
818 self.assertTrue(self.verify_vrf(self.pbr_vrfid))
819 r = VppIpRoute(self, self.pg3.local_ip4, 24,
820 [VppRoutePath(self.pg3.remote_ip4,
822 table_id=self.pbr_vrfid)
825 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
827 self.pg_enable_capture(self.pg_interfaces)
830 pkts = self.pg3.get_capture(len(pkts))
831 self.verify_capture(self.pg3, pkts)
832 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key), 0)
833 self.pg0.assert_nothing_captured(remark="packets forwarded")
834 self.pg1.assert_nothing_captured(remark="packets forwarded")
835 self.pg2.assert_nothing_captured(remark="packets forwarded")
837 # remove the classify session and the route
838 r.remove_vpp_config()
839 self.create_classify_session(
840 self.acl_tbl_idx.get(key),
841 self.build_ip_match(src_ip=self.pg0.remote_ip4),
842 pbr_option, self.pbr_vrfid, is_add=0)
844 # and the table should be gone.
845 self.assertFalse(self.verify_vrf(self.pbr_vrfid))
848 class TestClassifierPunt(TestClassifier):
849 """ Classifier punt Test Case """
853 super(TestClassifierPunt, cls).setUpClass()
856 def tearDownClass(cls):
857 super(TestClassifierPunt, cls).tearDownClass()
859 def test_punt_udp(self):
860 """ IPv4/UDP protocol punt ACL test
862 Test scenario for basic punt ACL with UDP protocol
863 - Create IPv4 stream for pg0 -> pg1 interface.
864 - Create punt ACL with UDP IP protocol.
865 - Send and verify received packets on pg1 interface.
872 self.create_classify_table(
878 table_index = self.acl_tbl_idx.get(key)
879 self.vapi.punt_acl_add_del(ip4_table_index=table_index)
880 self.acl_active_table = key
882 # punt udp packets to dport received on pg0 through pg1
886 'type': VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4,
889 'af': VppEnum.vl_api_address_family_t.ADDRESS_IP4,
890 'protocol': VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP,
893 self.vapi.ip_punt_redirect(punt={
894 'rx_sw_if_index': self.pg0.sw_if_index,
895 'tx_sw_if_index': self.pg1.sw_if_index,
896 'nh': self.pg1.remote_ip4,
899 pkts = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
900 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) /
901 UDP(sport=sport, dport=dport) /
902 Raw('\x17' * 100))] * 2
904 # allow a session but not matching the stream: expect to drop
905 self.create_classify_session(
907 self.build_ip_match(src_ip=self.pg0.remote_ip4,
908 proto=socket.IPPROTO_UDP, src_port=sport + 10))
909 self.send_and_assert_no_replies(self.pg0, pkts)
911 # allow a session matching the stream: expect to pass
912 self.create_classify_session(
914 self.build_ip_match(src_ip=self.pg0.remote_ip4,
915 proto=socket.IPPROTO_UDP, src_port=sport))
916 self.send_and_expect_only(self.pg0, pkts, self.pg1)
919 self.acl_active_table = ''
920 self.vapi.punt_acl_add_del(ip4_table_index=table_index, is_add=0)
923 if __name__ == '__main__':
924 unittest.main(testRunner=VppTestRunner)