7 from framework import VppTestCase, VppTestRunner
8 from scapy.packet import Raw, Packet
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP, UDP, TCP
13 from template_classifier import TestClassifier, VarMask, VarMatch
14 from vpp_ip_route import VppIpRoute, VppRoutePath
15 from vpp_ip import INVALID_INDEX
18 # Tests split to different test case classes because of issue reported in
20 class TestClassifierIP(TestClassifier):
21 """ Classifier IP Test Case """
25 super(TestClassifierIP, cls).setUpClass()
28 def tearDownClass(cls):
29 super(TestClassifierIP, cls).tearDownClass()
31 def test_iacl_src_ip(self):
32 """ Source IP iACL test
34 Test scenario for basic IP ACL with source IP
35 - Create IPv4 stream for pg0 -> pg1 interface.
36 - Create iACL with source IP address.
37 - Send and verify received packets on pg1 interface.
40 # Basic iACL testing with source IP
41 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
42 self.pg0.add_stream(pkts)
45 self.create_classify_table(key, self.build_ip_mask(src_ip='ffffffff'))
46 self.create_classify_session(
47 self.acl_tbl_idx.get(key),
48 self.build_ip_match(src_ip=self.pg0.remote_ip4))
49 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
50 self.acl_active_table = key
52 self.pg_enable_capture(self.pg_interfaces)
55 pkts = self.pg1.get_capture(len(pkts))
56 self.verify_capture(self.pg1, pkts)
57 self.pg0.assert_nothing_captured(remark="packets forwarded")
58 self.pg2.assert_nothing_captured(remark="packets forwarded")
59 self.pg3.assert_nothing_captured(remark="packets forwarded")
61 def test_iacl_dst_ip(self):
62 """ Destination IP iACL test
64 Test scenario for basic IP ACL with destination IP
65 - Create IPv4 stream for pg0 -> pg1 interface.
66 - Create iACL with destination IP address.
67 - Send and verify received packets on pg1 interface.
70 # Basic iACL testing with destination IP
71 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
72 self.pg0.add_stream(pkts)
75 self.create_classify_table(key, self.build_ip_mask(dst_ip='ffffffff'))
76 self.create_classify_session(
77 self.acl_tbl_idx.get(key),
78 self.build_ip_match(dst_ip=self.pg1.remote_ip4))
79 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
80 self.acl_active_table = key
82 self.pg_enable_capture(self.pg_interfaces)
85 pkts = self.pg1.get_capture(len(pkts))
86 self.verify_capture(self.pg1, pkts)
87 self.pg0.assert_nothing_captured(remark="packets forwarded")
88 self.pg2.assert_nothing_captured(remark="packets forwarded")
89 self.pg3.assert_nothing_captured(remark="packets forwarded")
91 def test_iacl_src_dst_ip(self):
92 """ Source and destination IP iACL test
94 Test scenario for basic IP ACL with source and destination IP
95 - Create IPv4 stream for pg0 -> pg1 interface.
96 - Create iACL with source and destination IP addresses.
97 - Send and verify received packets on pg1 interface.
100 # Basic iACL testing with source and destination IP
101 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
102 self.pg0.add_stream(pkts)
105 self.create_classify_table(
106 key, self.build_ip_mask(src_ip='ffffffff', dst_ip='ffffffff'))
107 self.create_classify_session(
108 self.acl_tbl_idx.get(key),
109 self.build_ip_match(src_ip=self.pg0.remote_ip4,
110 dst_ip=self.pg1.remote_ip4))
111 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
112 self.acl_active_table = key
114 self.pg_enable_capture(self.pg_interfaces)
117 pkts = self.pg1.get_capture(len(pkts))
118 self.verify_capture(self.pg1, pkts)
119 self.pg0.assert_nothing_captured(remark="packets forwarded")
120 self.pg2.assert_nothing_captured(remark="packets forwarded")
121 self.pg3.assert_nothing_captured(remark="packets forwarded")
124 class TestClassifierUDP(TestClassifier):
125 """ Classifier UDP proto Test Case """
129 super(TestClassifierUDP, cls).setUpClass()
132 def tearDownClass(cls):
133 super(TestClassifierUDP, cls).tearDownClass()
135 def test_iacl_proto_udp(self):
136 """ UDP protocol iACL test
138 Test scenario for basic protocol ACL with UDP protocol
139 - Create IPv4 stream for pg0 -> pg1 interface.
140 - Create iACL with UDP IP protocol.
141 - Send and verify received packets on pg1 interface.
144 # Basic iACL testing with UDP protocol
145 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
146 self.pg0.add_stream(pkts)
149 self.create_classify_table(key, self.build_ip_mask(proto='ff'))
150 self.create_classify_session(
151 self.acl_tbl_idx.get(key),
152 self.build_ip_match(proto=socket.IPPROTO_UDP))
153 self.input_acl_set_interface(
154 self.pg0, self.acl_tbl_idx.get(key))
155 self.acl_active_table = key
157 self.pg_enable_capture(self.pg_interfaces)
160 pkts = self.pg1.get_capture(len(pkts))
161 self.verify_capture(self.pg1, pkts)
162 self.pg0.assert_nothing_captured(remark="packets forwarded")
163 self.pg2.assert_nothing_captured(remark="packets forwarded")
164 self.pg3.assert_nothing_captured(remark="packets forwarded")
166 def test_iacl_proto_udp_sport(self):
167 """ UDP source port iACL test
169 Test scenario for basic protocol ACL with UDP and sport
170 - Create IPv4 stream for pg0 -> pg1 interface.
171 - Create iACL with UDP IP protocol and defined sport.
172 - Send and verify received packets on pg1 interface.
175 # Basic iACL testing with UDP and sport
177 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
178 UDP(sport=sport, dport=5678))
179 self.pg0.add_stream(pkts)
181 key = 'proto_udp_sport'
182 self.create_classify_table(
183 key, self.build_ip_mask(proto='ff', src_port='ffff'))
184 self.create_classify_session(
185 self.acl_tbl_idx.get(key),
186 self.build_ip_match(proto=socket.IPPROTO_UDP, src_port=sport))
187 self.input_acl_set_interface(
188 self.pg0, self.acl_tbl_idx.get(key))
189 self.acl_active_table = key
191 self.pg_enable_capture(self.pg_interfaces)
194 pkts = self.pg1.get_capture(len(pkts))
195 self.verify_capture(self.pg1, pkts)
196 self.pg0.assert_nothing_captured(remark="packets forwarded")
197 self.pg2.assert_nothing_captured(remark="packets forwarded")
198 self.pg3.assert_nothing_captured(remark="packets forwarded")
200 def test_iacl_proto_udp_dport(self):
201 """ UDP destination port iACL test
203 Test scenario for basic protocol ACL with UDP and dport
204 - Create IPv4 stream for pg0 -> pg1 interface.
205 - Create iACL with UDP IP protocol and defined dport.
206 - Send and verify received packets on pg1 interface.
209 # Basic iACL testing with UDP and dport
211 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
212 UDP(sport=1234, dport=dport))
213 self.pg0.add_stream(pkts)
215 key = 'proto_udp_dport'
216 self.create_classify_table(
217 key, self.build_ip_mask(proto='ff', dst_port='ffff'))
218 self.create_classify_session(
219 self.acl_tbl_idx.get(key),
220 self.build_ip_match(proto=socket.IPPROTO_UDP, dst_port=dport))
221 self.input_acl_set_interface(
222 self.pg0, self.acl_tbl_idx.get(key))
223 self.acl_active_table = key
225 self.pg_enable_capture(self.pg_interfaces)
228 pkts = self.pg1.get_capture(len(pkts))
229 self.verify_capture(self.pg1, pkts)
230 self.pg0.assert_nothing_captured(remark="packets forwarded")
231 self.pg2.assert_nothing_captured(remark="packets forwarded")
232 self.pg3.assert_nothing_captured(remark="packets forwarded")
234 def test_iacl_proto_udp_sport_dport(self):
235 """ UDP source and destination ports iACL test
237 Test scenario for basic protocol ACL with UDP and sport and dport
238 - Create IPv4 stream for pg0 -> pg1 interface.
239 - Create iACL with UDP IP protocol and defined sport and dport.
240 - Send and verify received packets on pg1 interface.
243 # Basic iACL testing with UDP and sport and dport
246 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
247 UDP(sport=sport, dport=dport))
248 self.pg0.add_stream(pkts)
250 key = 'proto_udp_ports'
251 self.create_classify_table(
253 self.build_ip_mask(proto='ff', src_port='ffff', dst_port='ffff'))
254 self.create_classify_session(
255 self.acl_tbl_idx.get(key),
256 self.build_ip_match(proto=socket.IPPROTO_UDP, src_port=sport,
258 self.input_acl_set_interface(
259 self.pg0, self.acl_tbl_idx.get(key))
260 self.acl_active_table = key
262 self.pg_enable_capture(self.pg_interfaces)
265 pkts = self.pg1.get_capture(len(pkts))
266 self.verify_capture(self.pg1, pkts)
267 self.pg0.assert_nothing_captured(remark="packets forwarded")
268 self.pg2.assert_nothing_captured(remark="packets forwarded")
269 self.pg3.assert_nothing_captured(remark="packets forwarded")
272 class TestClassifierTCP(TestClassifier):
273 """ Classifier TCP proto Test Case """
277 super(TestClassifierTCP, cls).setUpClass()
280 def tearDownClass(cls):
281 super(TestClassifierTCP, cls).tearDownClass()
283 def test_iacl_proto_tcp(self):
284 """ TCP protocol iACL test
286 Test scenario for basic protocol ACL with TCP protocol
287 - Create IPv4 stream for pg0 -> pg1 interface.
288 - Create iACL with TCP IP protocol.
289 - Send and verify received packets on pg1 interface.
292 # Basic iACL testing with TCP protocol
293 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
294 TCP(sport=1234, dport=5678))
295 self.pg0.add_stream(pkts)
298 self.create_classify_table(key, self.build_ip_mask(proto='ff'))
299 self.create_classify_session(
300 self.acl_tbl_idx.get(key),
301 self.build_ip_match(proto=socket.IPPROTO_TCP))
302 self.input_acl_set_interface(
303 self.pg0, self.acl_tbl_idx.get(key))
304 self.acl_active_table = key
306 self.pg_enable_capture(self.pg_interfaces)
309 pkts = self.pg1.get_capture(len(pkts))
310 self.verify_capture(self.pg1, pkts, TCP)
311 self.pg0.assert_nothing_captured(remark="packets forwarded")
312 self.pg2.assert_nothing_captured(remark="packets forwarded")
313 self.pg3.assert_nothing_captured(remark="packets forwarded")
315 def test_iacl_proto_tcp_sport(self):
316 """ TCP source port iACL test
318 Test scenario for basic protocol ACL with TCP and sport
319 - Create IPv4 stream for pg0 -> pg1 interface.
320 - Create iACL with TCP IP protocol and defined sport.
321 - Send and verify received packets on pg1 interface.
324 # Basic iACL testing with TCP and sport
326 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
327 TCP(sport=sport, dport=5678))
328 self.pg0.add_stream(pkts)
330 key = 'proto_tcp_sport'
331 self.create_classify_table(
332 key, self.build_ip_mask(proto='ff', src_port='ffff'))
333 self.create_classify_session(
334 self.acl_tbl_idx.get(key),
335 self.build_ip_match(proto=socket.IPPROTO_TCP, src_port=sport))
336 self.input_acl_set_interface(
337 self.pg0, self.acl_tbl_idx.get(key))
338 self.acl_active_table = key
340 self.pg_enable_capture(self.pg_interfaces)
343 pkts = self.pg1.get_capture(len(pkts))
344 self.verify_capture(self.pg1, pkts, TCP)
345 self.pg0.assert_nothing_captured(remark="packets forwarded")
346 self.pg2.assert_nothing_captured(remark="packets forwarded")
347 self.pg3.assert_nothing_captured(remark="packets forwarded")
349 def test_iacl_proto_tcp_dport(self):
350 """ TCP destination port iACL test
352 Test scenario for basic protocol ACL with TCP and dport
353 - Create IPv4 stream for pg0 -> pg1 interface.
354 - Create iACL with TCP IP protocol and defined dport.
355 - Send and verify received packets on pg1 interface.
358 # Basic iACL testing with TCP and dport
360 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
361 TCP(sport=1234, dport=dport))
362 self.pg0.add_stream(pkts)
364 key = 'proto_tcp_sport'
365 self.create_classify_table(
366 key, self.build_ip_mask(proto='ff', dst_port='ffff'))
367 self.create_classify_session(
368 self.acl_tbl_idx.get(key),
369 self.build_ip_match(proto=socket.IPPROTO_TCP, dst_port=dport))
370 self.input_acl_set_interface(
371 self.pg0, self.acl_tbl_idx.get(key))
372 self.acl_active_table = key
374 self.pg_enable_capture(self.pg_interfaces)
377 pkts = self.pg1.get_capture(len(pkts))
378 self.verify_capture(self.pg1, pkts, TCP)
379 self.pg0.assert_nothing_captured(remark="packets forwarded")
380 self.pg2.assert_nothing_captured(remark="packets forwarded")
381 self.pg3.assert_nothing_captured(remark="packets forwarded")
383 def test_iacl_proto_tcp_sport_dport(self):
384 """ TCP source and destination ports iACL test
386 Test scenario for basic protocol ACL with TCP and sport and dport
387 - Create IPv4 stream for pg0 -> pg1 interface.
388 - Create iACL with TCP IP protocol and defined sport and dport.
389 - Send and verify received packets on pg1 interface.
392 # Basic iACL testing with TCP and sport and dport
395 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
396 TCP(sport=sport, dport=dport))
397 self.pg0.add_stream(pkts)
399 key = 'proto_tcp_ports'
400 self.create_classify_table(
402 self.build_ip_mask(proto='ff', src_port='ffff', dst_port='ffff'))
403 self.create_classify_session(
404 self.acl_tbl_idx.get(key),
405 self.build_ip_match(proto=socket.IPPROTO_TCP, src_port=sport,
407 self.input_acl_set_interface(
408 self.pg0, self.acl_tbl_idx.get(key))
409 self.acl_active_table = key
411 self.pg_enable_capture(self.pg_interfaces)
414 pkts = self.pg1.get_capture(len(pkts))
415 self.verify_capture(self.pg1, pkts, TCP)
416 self.pg0.assert_nothing_captured(remark="packets forwarded")
417 self.pg2.assert_nothing_captured(remark="packets forwarded")
418 self.pg3.assert_nothing_captured(remark="packets forwarded")
421 class TestClassifierIPOut(TestClassifier):
422 """ Classifier output IP Test Case """
426 super(TestClassifierIPOut, cls).setUpClass()
429 def tearDownClass(cls):
430 super(TestClassifierIPOut, cls).tearDownClass()
432 def test_acl_ip_out(self):
433 """ Output IP ACL test
435 Test scenario for basic IP ACL with source IP
436 - Create IPv4 stream for pg1 -> pg0 interface.
437 - Create ACL with source IP address.
438 - Send and verify received packets on pg0 interface.
441 # Basic oACL testing with source IP
442 pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes)
443 self.pg1.add_stream(pkts)
446 self.create_classify_table(
447 key, self.build_ip_mask(src_ip='ffffffff'), data_offset=0)
448 self.create_classify_session(
449 self.acl_tbl_idx.get(key),
450 self.build_ip_match(src_ip=self.pg1.remote_ip4))
451 self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
452 self.acl_active_table = key
454 self.pg_enable_capture(self.pg_interfaces)
457 pkts = self.pg0.get_capture(len(pkts))
458 self.verify_capture(self.pg0, pkts)
459 self.pg1.assert_nothing_captured(remark="packets forwarded")
460 self.pg2.assert_nothing_captured(remark="packets forwarded")
461 self.pg3.assert_nothing_captured(remark="packets forwarded")
464 class TestClassifierMAC(TestClassifier):
465 """ Classifier MAC Test Case """
469 super(TestClassifierMAC, cls).setUpClass()
472 def tearDownClass(cls):
473 super(TestClassifierMAC, cls).tearDownClass()
475 def test_acl_mac(self):
478 Test scenario for basic MAC ACL with source MAC
479 - Create IPv4 stream for pg0 -> pg2 interface.
480 - Create ACL with source MAC address.
481 - Send and verify received packets on pg2 interface.
484 # Basic iACL testing with source MAC
485 pkts = self.create_stream(self.pg0, self.pg2, self.pg_if_packet_sizes)
486 self.pg0.add_stream(pkts)
489 self.create_classify_table(
490 key, self.build_mac_mask(src_mac='ffffffffffff'), data_offset=-14)
491 self.create_classify_session(
492 self.acl_tbl_idx.get(key),
493 self.build_mac_match(src_mac=self.pg0.remote_mac))
494 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
495 self.acl_active_table = key
497 self.pg_enable_capture(self.pg_interfaces)
500 pkts = self.pg2.get_capture(len(pkts))
501 self.verify_capture(self.pg2, pkts)
502 self.pg0.assert_nothing_captured(remark="packets forwarded")
503 self.pg1.assert_nothing_captured(remark="packets forwarded")
504 self.pg3.assert_nothing_captured(remark="packets forwarded")
507 class TestClassifierComplex(TestClassifier):
508 """ Large & Nested Classifiers Test Cases """
512 super(TestClassifierComplex, cls).setUpClass()
515 def tearDownClass(cls):
516 super(TestClassifierComplex, cls).tearDownClass()
518 def test_iacl_large(self):
519 """ Large input ACL test
521 Test scenario for Large ACL matching on ethernet+ip+udp headers
522 - Create IPv4 stream for pg0 -> pg1 interface.
523 - Create large acl matching on ethernet+ip+udp header fields
524 - Send and verify received packets on pg1 interface.
527 # 40b offset = 80bytes - (sizeof(UDP/IP/ETH) + 4b)
528 # + 4b as build_ip_ma*() func, do not match against UDP Len & Chksum
529 msk = VarMask(offset=40, spec='ffff')
530 mth = VarMatch(offset=40, value=0x1234, length=2)
532 payload_msk = self.build_payload_mask([msk])
533 payload_match = self.build_payload_match([mth])
538 # 36b offset = 80bytes - (sizeof(UDP/IP/ETH))
539 packet_ex = bytes.fromhex(('0' * 36) + '1234')
540 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
541 UDP(sport=sport, dport=dport),
543 self.pg0.add_stream(pkts)
546 self.create_classify_table(
548 self.build_mac_mask(src_mac='ffffffffffff',
549 dst_mac='ffffffffffff',
551 self.build_ip_mask(proto='ff',
559 self.create_classify_session(
560 self.acl_tbl_idx.get(key),
561 self.build_mac_match(src_mac=self.pg0.remote_mac,
562 dst_mac=self.pg0.local_mac,
565 self.build_ip_match(proto=socket.IPPROTO_UDP,
566 src_ip=self.pg0.remote_ip4,
567 dst_ip=self.pg1.remote_ip4,
573 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
574 self.acl_active_table = key
576 self.pg_enable_capture(self.pg_interfaces)
579 pkts = self.pg1.get_capture(len(pkts))
580 self.verify_capture(self.pg1, pkts)
581 self.pg0.assert_nothing_captured(remark="packets forwarded")
582 self.pg2.assert_nothing_captured(remark="packets forwarded")
583 self.pg3.assert_nothing_captured(remark="packets forwarded")
585 def test_oacl_large(self):
586 """ Large output ACL test
587 Test scenario for Large ACL matching on ethernet+ip+udp headers
588 - Create IPv4 stream for pg1 -> pg0 interface.
589 - Create large acl matching on ethernet+ip+udp header fields
590 - Send and verify received packets on pg0 interface.
593 # 40b offset = 80bytes - (sizeof(UDP/IP/ETH) + 4b)
594 # + 4b as build_ip_ma*() func, do not match against UDP Len & Chksum
595 msk = VarMask(offset=40, spec='ffff')
596 mth = VarMatch(offset=40, value=0x1234, length=2)
598 payload_msk = self.build_payload_mask([msk])
599 payload_match = self.build_payload_match([mth])
604 # 36b offset = 80bytes - (sizeof(UDP/IP/ETH))
605 packet_ex = bytes.fromhex(('0' * 36) + '1234')
606 pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes,
607 UDP(sport=sport, dport=dport),
609 self.pg1.add_stream(pkts)
612 self.create_classify_table(
614 self.build_mac_mask(src_mac='ffffffffffff',
615 dst_mac='ffffffffffff',
617 self.build_ip_mask(proto='ff',
625 self.create_classify_session(
626 self.acl_tbl_idx.get(key),
627 self.build_mac_match(src_mac=self.pg0.local_mac,
628 dst_mac=self.pg0.remote_mac,
631 self.build_ip_match(proto=socket.IPPROTO_UDP,
632 src_ip=self.pg1.remote_ip4,
633 dst_ip=self.pg0.remote_ip4,
638 self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
639 self.acl_active_table = key
641 self.pg_enable_capture(self.pg_interfaces)
644 pkts = self.pg0.get_capture(len(pkts))
645 self.verify_capture(self.pg0, pkts)
646 self.pg1.assert_nothing_captured(remark="packets forwarded")
647 self.pg2.assert_nothing_captured(remark="packets forwarded")
648 self.pg3.assert_nothing_captured(remark="packets forwarded")
650 def test_iacl_nested(self):
651 """ Nested input ACL test
653 Test scenario for Large ACL matching on ethernet+ip+udp headers
654 - Create IPv4 stream for pg0 -> pg1 interface.
655 - Create 1st classifier table, without any entries
656 - Create nested acl matching on ethernet+ip+udp header fields
657 - Send and verify received packets on pg1 interface.
662 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
663 UDP(sport=sport, dport=dport))
665 self.pg0.add_stream(pkts)
667 subtable_key = 'subtable_in'
668 self.create_classify_table(
670 self.build_mac_mask(src_mac='ffffffffffff',
671 dst_mac='ffffffffffff',
673 self.build_ip_mask(proto='ff',
681 self.create_classify_table(
683 self.build_mac_mask(src_mac='ffffffffffff',
684 dst_mac='ffffffffffff',
686 self.build_ip_mask(proto='ff',
691 next_table_index=self.acl_tbl_idx.get(subtable_key))
693 self.create_classify_session(
694 self.acl_tbl_idx.get(subtable_key),
695 self.build_mac_match(src_mac=self.pg0.remote_mac,
696 dst_mac=self.pg0.local_mac,
699 self.build_ip_match(proto=socket.IPPROTO_UDP,
700 src_ip=self.pg0.remote_ip4,
701 dst_ip=self.pg1.remote_ip4,
705 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
706 self.acl_active_table = key
708 self.pg_enable_capture(self.pg_interfaces)
711 pkts = self.pg1.get_capture(len(pkts))
712 self.verify_capture(self.pg1, pkts)
713 self.pg0.assert_nothing_captured(remark="packets forwarded")
714 self.pg2.assert_nothing_captured(remark="packets forwarded")
715 self.pg3.assert_nothing_captured(remark="packets forwarded")
717 def test_oacl_nested(self):
718 """ Nested output ACL test
720 Test scenario for Large ACL matching on ethernet+ip+udp headers
721 - Create IPv4 stream for pg1 -> pg0 interface.
722 - Create 1st classifier table, without any entries
723 - Create nested acl matching on ethernet+ip+udp header fields
724 - Send and verify received packets on pg0 interface.
729 pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes,
730 UDP(sport=sport, dport=dport))
731 self.pg1.add_stream(pkts)
733 subtable_key = 'subtable_out'
734 self.create_classify_table(
736 self.build_mac_mask(src_mac='ffffffffffff',
737 dst_mac='ffffffffffff',
739 self.build_ip_mask(proto='ff',
747 self.create_classify_table(
749 self.build_mac_mask(src_mac='ffffffffffff',
750 dst_mac='ffffffffffff',
752 self.build_ip_mask(proto='ff',
757 next_table_index=self.acl_tbl_idx.get(subtable_key),
760 self.create_classify_session(
761 self.acl_tbl_idx.get(subtable_key),
762 self.build_mac_match(src_mac=self.pg0.local_mac,
763 dst_mac=self.pg0.remote_mac,
766 self.build_ip_match(proto=socket.IPPROTO_UDP,
767 src_ip=self.pg1.remote_ip4,
768 dst_ip=self.pg0.remote_ip4,
772 self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
773 self.acl_active_table = key
775 self.pg_enable_capture(self.pg_interfaces)
778 pkts = self.pg0.get_capture(len(pkts))
779 self.verify_capture(self.pg0, pkts)
780 self.pg1.assert_nothing_captured(remark="packets forwarded")
781 self.pg2.assert_nothing_captured(remark="packets forwarded")
782 self.pg3.assert_nothing_captured(remark="packets forwarded")
785 class TestClassifierPBR(TestClassifier):
786 """ Classifier PBR Test Case """
790 super(TestClassifierPBR, cls).setUpClass()
793 def tearDownClass(cls):
794 super(TestClassifierPBR, cls).tearDownClass()
796 def test_acl_pbr(self):
799 Test scenario for PBR with source IP
800 - Create IPv4 stream for pg0 -> pg3 interface.
801 - Configure PBR fib entry for packet forwarding.
802 - Send and verify received packets on pg3 interface.
805 # PBR testing with source IP
806 pkts = self.create_stream(self.pg0, self.pg3, self.pg_if_packet_sizes)
807 self.pg0.add_stream(pkts)
810 self.create_classify_table(key, self.build_ip_mask(src_ip='ffffffff'))
812 # this will create the VRF/table in which we will insert the route
813 self.create_classify_session(
814 self.acl_tbl_idx.get(key),
815 self.build_ip_match(src_ip=self.pg0.remote_ip4),
816 pbr_option, self.pbr_vrfid)
817 self.assertTrue(self.verify_vrf(self.pbr_vrfid))
818 r = VppIpRoute(self, self.pg3.local_ip4, 24,
819 [VppRoutePath(self.pg3.remote_ip4,
821 table_id=self.pbr_vrfid)
824 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
826 self.pg_enable_capture(self.pg_interfaces)
829 pkts = self.pg3.get_capture(len(pkts))
830 self.verify_capture(self.pg3, pkts)
831 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key), 0)
832 self.pg0.assert_nothing_captured(remark="packets forwarded")
833 self.pg1.assert_nothing_captured(remark="packets forwarded")
834 self.pg2.assert_nothing_captured(remark="packets forwarded")
836 # remove the classify session and the route
837 r.remove_vpp_config()
838 self.create_classify_session(
839 self.acl_tbl_idx.get(key),
840 self.build_ip_match(src_ip=self.pg0.remote_ip4),
841 pbr_option, self.pbr_vrfid, is_add=0)
843 # and the table should be gone.
844 self.assertFalse(self.verify_vrf(self.pbr_vrfid))
846 if __name__ == '__main__':
847 unittest.main(testRunner=VppTestRunner)