tests: replace pycodestyle with black
[vpp.git] / test / test_classifier.py
1 #!/usr/bin/env python3
2
3 import binascii
4 import socket
5 import unittest
6
7 from framework import VppTestCase, VppTestRunner
8 from scapy.packet import Raw, Packet
9
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP, UDP, TCP
12 from util import ppp
13 from template_classifier import TestClassifier, VarMask, VarMatch
14 from vpp_ip_route import VppIpRoute, VppRoutePath
15 from vpp_ip import INVALID_INDEX
16 from vpp_papi import VppEnum
17
18
19 # Tests split to different test case classes because of issue reported in
20 # ticket VPP-1336
21 class TestClassifierIP(TestClassifier):
22     """Classifier IP Test Case"""
23
24     @classmethod
25     def setUpClass(cls):
26         super(TestClassifierIP, cls).setUpClass()
27
28     @classmethod
29     def tearDownClass(cls):
30         super(TestClassifierIP, cls).tearDownClass()
31
32     def test_iacl_src_ip(self):
33         """Source IP iACL test
34
35         Test scenario for basic IP ACL with source IP
36             - Create IPv4 stream for pg0 -> pg1 interface.
37             - Create iACL with source IP address.
38             - Send and verify received packets on pg1 interface.
39         """
40
41         # Basic iACL testing with source IP
42         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
43         self.pg0.add_stream(pkts)
44
45         key = "ip_src"
46         self.create_classify_table(key, self.build_ip_mask(src_ip="ffffffff"))
47         self.create_classify_session(
48             self.acl_tbl_idx.get(key), self.build_ip_match(src_ip=self.pg0.remote_ip4)
49         )
50         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
51         self.acl_active_table = key
52
53         self.pg_enable_capture(self.pg_interfaces)
54         self.pg_start()
55
56         pkts = self.pg1.get_capture(len(pkts))
57         self.verify_capture(self.pg1, pkts)
58         self.pg0.assert_nothing_captured(remark="packets forwarded")
59         self.pg2.assert_nothing_captured(remark="packets forwarded")
60         self.pg3.assert_nothing_captured(remark="packets forwarded")
61
62     def test_iacl_dst_ip(self):
63         """Destination IP iACL test
64
65         Test scenario for basic IP ACL with destination IP
66             - Create IPv4 stream for pg0 -> pg1 interface.
67             - Create iACL with destination IP address.
68             - Send and verify received packets on pg1 interface.
69         """
70
71         # Basic iACL testing with destination IP
72         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
73         self.pg0.add_stream(pkts)
74
75         key = "ip_dst"
76         self.create_classify_table(key, self.build_ip_mask(dst_ip="ffffffff"))
77         self.create_classify_session(
78             self.acl_tbl_idx.get(key), self.build_ip_match(dst_ip=self.pg1.remote_ip4)
79         )
80         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
81         self.acl_active_table = key
82
83         self.pg_enable_capture(self.pg_interfaces)
84         self.pg_start()
85
86         pkts = self.pg1.get_capture(len(pkts))
87         self.verify_capture(self.pg1, pkts)
88         self.pg0.assert_nothing_captured(remark="packets forwarded")
89         self.pg2.assert_nothing_captured(remark="packets forwarded")
90         self.pg3.assert_nothing_captured(remark="packets forwarded")
91
92     def test_iacl_src_dst_ip(self):
93         """Source and destination IP iACL test
94
95         Test scenario for basic IP ACL with source and destination IP
96             - Create IPv4 stream for pg0 -> pg1 interface.
97             - Create iACL with source and destination IP addresses.
98             - Send and verify received packets on pg1 interface.
99         """
100
101         # Basic iACL testing with source and destination IP
102         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
103         self.pg0.add_stream(pkts)
104
105         key = "ip"
106         self.create_classify_table(
107             key, self.build_ip_mask(src_ip="ffffffff", dst_ip="ffffffff")
108         )
109         self.create_classify_session(
110             self.acl_tbl_idx.get(key),
111             self.build_ip_match(src_ip=self.pg0.remote_ip4, dst_ip=self.pg1.remote_ip4),
112         )
113         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
114         self.acl_active_table = key
115
116         self.pg_enable_capture(self.pg_interfaces)
117         self.pg_start()
118
119         pkts = self.pg1.get_capture(len(pkts))
120         self.verify_capture(self.pg1, pkts)
121         self.pg0.assert_nothing_captured(remark="packets forwarded")
122         self.pg2.assert_nothing_captured(remark="packets forwarded")
123         self.pg3.assert_nothing_captured(remark="packets forwarded")
124
125
126 class TestClassifierUDP(TestClassifier):
127     """Classifier UDP proto Test Case"""
128
129     @classmethod
130     def setUpClass(cls):
131         super(TestClassifierUDP, cls).setUpClass()
132
133     @classmethod
134     def tearDownClass(cls):
135         super(TestClassifierUDP, cls).tearDownClass()
136
137     def test_iacl_proto_udp(self):
138         """UDP protocol iACL test
139
140         Test scenario for basic protocol ACL with UDP protocol
141             - Create IPv4 stream for pg0 -> pg1 interface.
142             - Create iACL with UDP IP protocol.
143             - Send and verify received packets on pg1 interface.
144         """
145
146         # Basic iACL testing with UDP protocol
147         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
148         self.pg0.add_stream(pkts)
149
150         key = "proto_udp"
151         self.create_classify_table(key, self.build_ip_mask(proto="ff"))
152         self.create_classify_session(
153             self.acl_tbl_idx.get(key), self.build_ip_match(proto=socket.IPPROTO_UDP)
154         )
155         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
156         self.acl_active_table = key
157
158         self.pg_enable_capture(self.pg_interfaces)
159         self.pg_start()
160
161         pkts = self.pg1.get_capture(len(pkts))
162         self.verify_capture(self.pg1, pkts)
163         self.pg0.assert_nothing_captured(remark="packets forwarded")
164         self.pg2.assert_nothing_captured(remark="packets forwarded")
165         self.pg3.assert_nothing_captured(remark="packets forwarded")
166
167     def test_iacl_proto_udp_sport(self):
168         """UDP source port iACL test
169
170         Test scenario for basic protocol ACL with UDP and sport
171             - Create IPv4 stream for pg0 -> pg1 interface.
172             - Create iACL with UDP IP protocol and defined sport.
173             - Send and verify received packets on pg1 interface.
174         """
175
176         # Basic iACL testing with UDP and sport
177         sport = 38
178         pkts = self.create_stream(
179             self.pg0, self.pg1, self.pg_if_packet_sizes, UDP(sport=sport, dport=5678)
180         )
181         self.pg0.add_stream(pkts)
182
183         key = "proto_udp_sport"
184         self.create_classify_table(key, self.build_ip_mask(proto="ff", src_port="ffff"))
185         self.create_classify_session(
186             self.acl_tbl_idx.get(key),
187             self.build_ip_match(proto=socket.IPPROTO_UDP, src_port=sport),
188         )
189         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
190         self.acl_active_table = key
191
192         self.pg_enable_capture(self.pg_interfaces)
193         self.pg_start()
194
195         pkts = self.pg1.get_capture(len(pkts))
196         self.verify_capture(self.pg1, pkts)
197         self.pg0.assert_nothing_captured(remark="packets forwarded")
198         self.pg2.assert_nothing_captured(remark="packets forwarded")
199         self.pg3.assert_nothing_captured(remark="packets forwarded")
200
201     def test_iacl_proto_udp_dport(self):
202         """UDP destination port iACL test
203
204         Test scenario for basic protocol ACL with UDP and dport
205             - Create IPv4 stream for pg0 -> pg1 interface.
206             - Create iACL with UDP IP protocol and defined dport.
207             - Send and verify received packets on pg1 interface.
208         """
209
210         # Basic iACL testing with UDP and dport
211         dport = 427
212         pkts = self.create_stream(
213             self.pg0, self.pg1, self.pg_if_packet_sizes, UDP(sport=1234, dport=dport)
214         )
215         self.pg0.add_stream(pkts)
216
217         key = "proto_udp_dport"
218         self.create_classify_table(key, self.build_ip_mask(proto="ff", dst_port="ffff"))
219         self.create_classify_session(
220             self.acl_tbl_idx.get(key),
221             self.build_ip_match(proto=socket.IPPROTO_UDP, dst_port=dport),
222         )
223         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
224         self.acl_active_table = key
225
226         self.pg_enable_capture(self.pg_interfaces)
227         self.pg_start()
228
229         pkts = self.pg1.get_capture(len(pkts))
230         self.verify_capture(self.pg1, pkts)
231         self.pg0.assert_nothing_captured(remark="packets forwarded")
232         self.pg2.assert_nothing_captured(remark="packets forwarded")
233         self.pg3.assert_nothing_captured(remark="packets forwarded")
234
235     def test_iacl_proto_udp_sport_dport(self):
236         """UDP source and destination ports iACL test
237
238         Test scenario for basic protocol ACL with UDP and sport and dport
239             - Create IPv4 stream for pg0 -> pg1 interface.
240             - Create iACL with UDP IP protocol and defined sport and dport.
241             - Send and verify received packets on pg1 interface.
242         """
243
244         # Basic iACL testing with UDP and sport and dport
245         sport = 13720
246         dport = 9080
247         pkts = self.create_stream(
248             self.pg0, self.pg1, self.pg_if_packet_sizes, UDP(sport=sport, dport=dport)
249         )
250         self.pg0.add_stream(pkts)
251
252         key = "proto_udp_ports"
253         self.create_classify_table(
254             key, self.build_ip_mask(proto="ff", src_port="ffff", dst_port="ffff")
255         )
256         self.create_classify_session(
257             self.acl_tbl_idx.get(key),
258             self.build_ip_match(
259                 proto=socket.IPPROTO_UDP, src_port=sport, dst_port=dport
260             ),
261         )
262         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
263         self.acl_active_table = key
264
265         self.pg_enable_capture(self.pg_interfaces)
266         self.pg_start()
267
268         pkts = self.pg1.get_capture(len(pkts))
269         self.verify_capture(self.pg1, pkts)
270         self.pg0.assert_nothing_captured(remark="packets forwarded")
271         self.pg2.assert_nothing_captured(remark="packets forwarded")
272         self.pg3.assert_nothing_captured(remark="packets forwarded")
273
274
275 class TestClassifierTCP(TestClassifier):
276     """Classifier TCP proto Test Case"""
277
278     @classmethod
279     def setUpClass(cls):
280         super(TestClassifierTCP, cls).setUpClass()
281
282     @classmethod
283     def tearDownClass(cls):
284         super(TestClassifierTCP, cls).tearDownClass()
285
286     def test_iacl_proto_tcp(self):
287         """TCP protocol iACL test
288
289         Test scenario for basic protocol ACL with TCP protocol
290             - Create IPv4 stream for pg0 -> pg1 interface.
291             - Create iACL with TCP IP protocol.
292             - Send and verify received packets on pg1 interface.
293         """
294
295         # Basic iACL testing with TCP protocol
296         pkts = self.create_stream(
297             self.pg0, self.pg1, self.pg_if_packet_sizes, TCP(sport=1234, dport=5678)
298         )
299         self.pg0.add_stream(pkts)
300
301         key = "proto_tcp"
302         self.create_classify_table(key, self.build_ip_mask(proto="ff"))
303         self.create_classify_session(
304             self.acl_tbl_idx.get(key), self.build_ip_match(proto=socket.IPPROTO_TCP)
305         )
306         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
307         self.acl_active_table = key
308
309         self.pg_enable_capture(self.pg_interfaces)
310         self.pg_start()
311
312         pkts = self.pg1.get_capture(len(pkts))
313         self.verify_capture(self.pg1, pkts, TCP)
314         self.pg0.assert_nothing_captured(remark="packets forwarded")
315         self.pg2.assert_nothing_captured(remark="packets forwarded")
316         self.pg3.assert_nothing_captured(remark="packets forwarded")
317
318     def test_iacl_proto_tcp_sport(self):
319         """TCP source port iACL test
320
321         Test scenario for basic protocol ACL with TCP and sport
322             - Create IPv4 stream for pg0 -> pg1 interface.
323             - Create iACL with TCP IP protocol and defined sport.
324             - Send and verify received packets on pg1 interface.
325         """
326
327         # Basic iACL testing with TCP and sport
328         sport = 38
329         pkts = self.create_stream(
330             self.pg0, self.pg1, self.pg_if_packet_sizes, TCP(sport=sport, dport=5678)
331         )
332         self.pg0.add_stream(pkts)
333
334         key = "proto_tcp_sport"
335         self.create_classify_table(key, self.build_ip_mask(proto="ff", src_port="ffff"))
336         self.create_classify_session(
337             self.acl_tbl_idx.get(key),
338             self.build_ip_match(proto=socket.IPPROTO_TCP, src_port=sport),
339         )
340         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
341         self.acl_active_table = key
342
343         self.pg_enable_capture(self.pg_interfaces)
344         self.pg_start()
345
346         pkts = self.pg1.get_capture(len(pkts))
347         self.verify_capture(self.pg1, pkts, TCP)
348         self.pg0.assert_nothing_captured(remark="packets forwarded")
349         self.pg2.assert_nothing_captured(remark="packets forwarded")
350         self.pg3.assert_nothing_captured(remark="packets forwarded")
351
352     def test_iacl_proto_tcp_dport(self):
353         """TCP destination port iACL test
354
355         Test scenario for basic protocol ACL with TCP and dport
356             - Create IPv4 stream for pg0 -> pg1 interface.
357             - Create iACL with TCP IP protocol and defined dport.
358             - Send and verify received packets on pg1 interface.
359         """
360
361         # Basic iACL testing with TCP and dport
362         dport = 427
363         pkts = self.create_stream(
364             self.pg0, self.pg1, self.pg_if_packet_sizes, TCP(sport=1234, dport=dport)
365         )
366         self.pg0.add_stream(pkts)
367
368         key = "proto_tcp_sport"
369         self.create_classify_table(key, self.build_ip_mask(proto="ff", dst_port="ffff"))
370         self.create_classify_session(
371             self.acl_tbl_idx.get(key),
372             self.build_ip_match(proto=socket.IPPROTO_TCP, dst_port=dport),
373         )
374         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
375         self.acl_active_table = key
376
377         self.pg_enable_capture(self.pg_interfaces)
378         self.pg_start()
379
380         pkts = self.pg1.get_capture(len(pkts))
381         self.verify_capture(self.pg1, pkts, TCP)
382         self.pg0.assert_nothing_captured(remark="packets forwarded")
383         self.pg2.assert_nothing_captured(remark="packets forwarded")
384         self.pg3.assert_nothing_captured(remark="packets forwarded")
385
386     def test_iacl_proto_tcp_sport_dport(self):
387         """TCP source and destination ports iACL test
388
389         Test scenario for basic protocol ACL with TCP and sport and dport
390             - Create IPv4 stream for pg0 -> pg1 interface.
391             - Create iACL with TCP IP protocol and defined sport and dport.
392             - Send and verify received packets on pg1 interface.
393         """
394
395         # Basic iACL testing with TCP and sport and dport
396         sport = 13720
397         dport = 9080
398         pkts = self.create_stream(
399             self.pg0, self.pg1, self.pg_if_packet_sizes, TCP(sport=sport, dport=dport)
400         )
401         self.pg0.add_stream(pkts)
402
403         key = "proto_tcp_ports"
404         self.create_classify_table(
405             key, self.build_ip_mask(proto="ff", src_port="ffff", dst_port="ffff")
406         )
407         self.create_classify_session(
408             self.acl_tbl_idx.get(key),
409             self.build_ip_match(
410                 proto=socket.IPPROTO_TCP, src_port=sport, dst_port=dport
411             ),
412         )
413         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
414         self.acl_active_table = key
415
416         self.pg_enable_capture(self.pg_interfaces)
417         self.pg_start()
418
419         pkts = self.pg1.get_capture(len(pkts))
420         self.verify_capture(self.pg1, pkts, TCP)
421         self.pg0.assert_nothing_captured(remark="packets forwarded")
422         self.pg2.assert_nothing_captured(remark="packets forwarded")
423         self.pg3.assert_nothing_captured(remark="packets forwarded")
424
425
426 class TestClassifierIPOut(TestClassifier):
427     """Classifier output IP Test Case"""
428
429     @classmethod
430     def setUpClass(cls):
431         super(TestClassifierIPOut, cls).setUpClass()
432
433     @classmethod
434     def tearDownClass(cls):
435         super(TestClassifierIPOut, cls).tearDownClass()
436
437     def test_acl_ip_out(self):
438         """Output IP ACL test
439
440         Test scenario for basic IP ACL with source IP
441             - Create IPv4 stream for pg1 -> pg0 interface.
442             - Create ACL with source IP address.
443             - Send and verify received packets on pg0 interface.
444         """
445
446         # Basic oACL testing with source IP
447         pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes)
448         self.pg1.add_stream(pkts)
449
450         key = "ip_out"
451         self.create_classify_table(
452             key, self.build_ip_mask(src_ip="ffffffff"), data_offset=0
453         )
454         self.create_classify_session(
455             self.acl_tbl_idx.get(key), self.build_ip_match(src_ip=self.pg1.remote_ip4)
456         )
457         self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
458         self.acl_active_table = key
459
460         self.pg_enable_capture(self.pg_interfaces)
461         self.pg_start()
462
463         pkts = self.pg0.get_capture(len(pkts))
464         self.verify_capture(self.pg0, pkts)
465         self.pg1.assert_nothing_captured(remark="packets forwarded")
466         self.pg2.assert_nothing_captured(remark="packets forwarded")
467         self.pg3.assert_nothing_captured(remark="packets forwarded")
468
469
470 class TestClassifierMAC(TestClassifier):
471     """Classifier MAC Test Case"""
472
473     @classmethod
474     def setUpClass(cls):
475         super(TestClassifierMAC, cls).setUpClass()
476
477     @classmethod
478     def tearDownClass(cls):
479         super(TestClassifierMAC, cls).tearDownClass()
480
481     def test_acl_mac(self):
482         """MAC ACL test
483
484         Test scenario for basic MAC ACL with source MAC
485             - Create IPv4 stream for pg0 -> pg2 interface.
486             - Create ACL with source MAC address.
487             - Send and verify received packets on pg2 interface.
488         """
489
490         # Basic iACL testing with source MAC
491         pkts = self.create_stream(self.pg0, self.pg2, self.pg_if_packet_sizes)
492         self.pg0.add_stream(pkts)
493
494         key = "mac"
495         self.create_classify_table(
496             key, self.build_mac_mask(src_mac="ffffffffffff"), data_offset=-14
497         )
498         self.create_classify_session(
499             self.acl_tbl_idx.get(key), self.build_mac_match(src_mac=self.pg0.remote_mac)
500         )
501         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
502         self.acl_active_table = key
503
504         self.pg_enable_capture(self.pg_interfaces)
505         self.pg_start()
506
507         pkts = self.pg2.get_capture(len(pkts))
508         self.verify_capture(self.pg2, pkts)
509         self.pg0.assert_nothing_captured(remark="packets forwarded")
510         self.pg1.assert_nothing_captured(remark="packets forwarded")
511         self.pg3.assert_nothing_captured(remark="packets forwarded")
512
513
514 class TestClassifierComplex(TestClassifier):
515     """Large & Nested Classifiers Test Cases"""
516
517     @classmethod
518     def setUpClass(cls):
519         super(TestClassifierComplex, cls).setUpClass()
520
521     @classmethod
522     def tearDownClass(cls):
523         super(TestClassifierComplex, cls).tearDownClass()
524
525     def test_iacl_large(self):
526         """Large input ACL test
527
528         Test scenario for Large ACL matching on ethernet+ip+udp headers
529             - Create IPv4 stream for pg0 -> pg1 interface.
530             - Create large acl matching on ethernet+ip+udp header fields
531             - Send and verify received packets on pg1 interface.
532         """
533
534         # 40b offset = 80bytes - (sizeof(UDP/IP/ETH) + 4b)
535         # + 4b as build_ip_ma*() func, do not match against UDP Len & Chksum
536         msk = VarMask(offset=40, spec="ffff")
537         mth = VarMatch(offset=40, value=0x1234, length=2)
538
539         payload_msk = self.build_payload_mask([msk])
540         payload_match = self.build_payload_match([mth])
541
542         sport = 13720
543         dport = 9080
544
545         # 36b offset = 80bytes - (sizeof(UDP/IP/ETH))
546         packet_ex = bytes.fromhex(("0" * 36) + "1234")
547         pkts = self.create_stream(
548             self.pg0,
549             self.pg1,
550             self.pg_if_packet_sizes,
551             UDP(sport=sport, dport=dport),
552             packet_ex,
553         )
554         self.pg0.add_stream(pkts)
555
556         key = "large_in"
557         self.create_classify_table(
558             key,
559             self.build_mac_mask(
560                 src_mac="ffffffffffff", dst_mac="ffffffffffff", ether_type="ffff"
561             )
562             + self.build_ip_mask(
563                 proto="ff",
564                 src_ip="ffffffff",
565                 dst_ip="ffffffff",
566                 src_port="ffff",
567                 dst_port="ffff",
568             )
569             + payload_msk,
570             data_offset=-14,
571         )
572
573         self.create_classify_session(
574             self.acl_tbl_idx.get(key),
575             self.build_mac_match(
576                 src_mac=self.pg0.remote_mac,
577                 dst_mac=self.pg0.local_mac,
578                 # ipv4 next header
579                 ether_type="0800",
580             )
581             + self.build_ip_match(
582                 proto=socket.IPPROTO_UDP,
583                 src_ip=self.pg0.remote_ip4,
584                 dst_ip=self.pg1.remote_ip4,
585                 src_port=sport,
586                 dst_port=dport,
587             )
588             + payload_match,
589         )
590
591         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
592         self.acl_active_table = key
593
594         self.pg_enable_capture(self.pg_interfaces)
595         self.pg_start()
596
597         pkts = self.pg1.get_capture(len(pkts))
598         self.verify_capture(self.pg1, pkts)
599         self.pg0.assert_nothing_captured(remark="packets forwarded")
600         self.pg2.assert_nothing_captured(remark="packets forwarded")
601         self.pg3.assert_nothing_captured(remark="packets forwarded")
602
603     def test_oacl_large(self):
604         """Large output ACL test
605         Test scenario for Large ACL matching on ethernet+ip+udp headers
606             - Create IPv4 stream for pg1 -> pg0 interface.
607             - Create large acl matching on ethernet+ip+udp header fields
608             - Send and verify received packets on pg0 interface.
609         """
610
611         # 40b offset = 80bytes - (sizeof(UDP/IP/ETH) + 4b)
612         # + 4b as build_ip_ma*() func, do not match against UDP Len & Chksum
613         msk = VarMask(offset=40, spec="ffff")
614         mth = VarMatch(offset=40, value=0x1234, length=2)
615
616         payload_msk = self.build_payload_mask([msk])
617         payload_match = self.build_payload_match([mth])
618
619         sport = 13720
620         dport = 9080
621
622         # 36b offset = 80bytes - (sizeof(UDP/IP/ETH))
623         packet_ex = bytes.fromhex(("0" * 36) + "1234")
624         pkts = self.create_stream(
625             self.pg1,
626             self.pg0,
627             self.pg_if_packet_sizes,
628             UDP(sport=sport, dport=dport),
629             packet_ex,
630         )
631         self.pg1.add_stream(pkts)
632
633         key = "large_out"
634         self.create_classify_table(
635             key,
636             self.build_mac_mask(
637                 src_mac="ffffffffffff", dst_mac="ffffffffffff", ether_type="ffff"
638             )
639             + self.build_ip_mask(
640                 proto="ff",
641                 src_ip="ffffffff",
642                 dst_ip="ffffffff",
643                 src_port="ffff",
644                 dst_port="ffff",
645             )
646             + payload_msk,
647             data_offset=-14,
648         )
649
650         self.create_classify_session(
651             self.acl_tbl_idx.get(key),
652             self.build_mac_match(
653                 src_mac=self.pg0.local_mac,
654                 dst_mac=self.pg0.remote_mac,
655                 # ipv4 next header
656                 ether_type="0800",
657             )
658             + self.build_ip_match(
659                 proto=socket.IPPROTO_UDP,
660                 src_ip=self.pg1.remote_ip4,
661                 dst_ip=self.pg0.remote_ip4,
662                 src_port=sport,
663                 dst_port=dport,
664             )
665             + payload_match,
666         )
667
668         self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
669         self.acl_active_table = key
670
671         self.pg_enable_capture(self.pg_interfaces)
672         self.pg_start()
673
674         pkts = self.pg0.get_capture(len(pkts))
675         self.verify_capture(self.pg0, pkts)
676         self.pg1.assert_nothing_captured(remark="packets forwarded")
677         self.pg2.assert_nothing_captured(remark="packets forwarded")
678         self.pg3.assert_nothing_captured(remark="packets forwarded")
679
680     def test_iacl_nested(self):
681         """Nested input ACL test
682
683         Test scenario for Large ACL matching on ethernet+ip+udp headers
684             - Create IPv4 stream for pg0 -> pg1 interface.
685             - Create 1st classifier table, without any entries
686             - Create nested acl matching on ethernet+ip+udp header fields
687             - Send and verify received packets on pg1 interface.
688         """
689
690         sport = 13720
691         dport = 9080
692         pkts = self.create_stream(
693             self.pg0, self.pg1, self.pg_if_packet_sizes, UDP(sport=sport, dport=dport)
694         )
695
696         self.pg0.add_stream(pkts)
697
698         subtable_key = "subtable_in"
699         self.create_classify_table(
700             subtable_key,
701             self.build_mac_mask(
702                 src_mac="ffffffffffff", dst_mac="ffffffffffff", ether_type="ffff"
703             )
704             + self.build_ip_mask(
705                 proto="ff",
706                 src_ip="ffffffff",
707                 dst_ip="ffffffff",
708                 src_port="ffff",
709                 dst_port="ffff",
710             ),
711             data_offset=-14,
712         )
713
714         key = "nested_in"
715         self.create_classify_table(
716             key,
717             self.build_mac_mask(
718                 src_mac="ffffffffffff", dst_mac="ffffffffffff", ether_type="ffff"
719             )
720             + self.build_ip_mask(
721                 proto="ff",
722                 src_ip="ffffffff",
723                 dst_ip="ffffffff",
724                 src_port="ffff",
725                 dst_port="ffff",
726             ),
727             next_table_index=self.acl_tbl_idx.get(subtable_key),
728         )
729
730         self.create_classify_session(
731             self.acl_tbl_idx.get(subtable_key),
732             self.build_mac_match(
733                 src_mac=self.pg0.remote_mac,
734                 dst_mac=self.pg0.local_mac,
735                 # ipv4 next header
736                 ether_type="0800",
737             )
738             + self.build_ip_match(
739                 proto=socket.IPPROTO_UDP,
740                 src_ip=self.pg0.remote_ip4,
741                 dst_ip=self.pg1.remote_ip4,
742                 src_port=sport,
743                 dst_port=dport,
744             ),
745         )
746
747         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
748         self.acl_active_table = key
749
750         self.pg_enable_capture(self.pg_interfaces)
751         self.pg_start()
752
753         pkts = self.pg1.get_capture(len(pkts))
754         self.verify_capture(self.pg1, pkts)
755         self.pg0.assert_nothing_captured(remark="packets forwarded")
756         self.pg2.assert_nothing_captured(remark="packets forwarded")
757         self.pg3.assert_nothing_captured(remark="packets forwarded")
758
759     def test_oacl_nested(self):
760         """Nested output ACL test
761
762         Test scenario for Large ACL matching on ethernet+ip+udp headers
763             - Create IPv4 stream for pg1 -> pg0 interface.
764             - Create 1st classifier table, without any entries
765             - Create nested acl matching on ethernet+ip+udp header fields
766             - Send and verify received packets on pg0 interface.
767         """
768
769         sport = 13720
770         dport = 9080
771         pkts = self.create_stream(
772             self.pg1, self.pg0, self.pg_if_packet_sizes, UDP(sport=sport, dport=dport)
773         )
774         self.pg1.add_stream(pkts)
775
776         subtable_key = "subtable_out"
777         self.create_classify_table(
778             subtable_key,
779             self.build_mac_mask(
780                 src_mac="ffffffffffff", dst_mac="ffffffffffff", ether_type="ffff"
781             )
782             + self.build_ip_mask(
783                 proto="ff",
784                 src_ip="ffffffff",
785                 dst_ip="ffffffff",
786                 src_port="ffff",
787                 dst_port="ffff",
788             ),
789             data_offset=-14,
790         )
791
792         key = "nested_out"
793         self.create_classify_table(
794             key,
795             self.build_mac_mask(
796                 src_mac="ffffffffffff", dst_mac="ffffffffffff", ether_type="ffff"
797             )
798             + self.build_ip_mask(
799                 proto="ff",
800                 src_ip="ffffffff",
801                 dst_ip="ffffffff",
802                 src_port="ffff",
803                 dst_port="ffff",
804             ),
805             next_table_index=self.acl_tbl_idx.get(subtable_key),
806             data_offset=-14,
807         )
808
809         self.create_classify_session(
810             self.acl_tbl_idx.get(subtable_key),
811             self.build_mac_match(
812                 src_mac=self.pg0.local_mac,
813                 dst_mac=self.pg0.remote_mac,
814                 # ipv4 next header
815                 ether_type="0800",
816             )
817             + self.build_ip_match(
818                 proto=socket.IPPROTO_UDP,
819                 src_ip=self.pg1.remote_ip4,
820                 dst_ip=self.pg0.remote_ip4,
821                 src_port=sport,
822                 dst_port=dport,
823             ),
824         )
825
826         self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
827         self.acl_active_table = key
828
829         self.pg_enable_capture(self.pg_interfaces)
830         self.pg_start()
831
832         pkts = self.pg0.get_capture(len(pkts))
833         self.verify_capture(self.pg0, pkts)
834         self.pg1.assert_nothing_captured(remark="packets forwarded")
835         self.pg2.assert_nothing_captured(remark="packets forwarded")
836         self.pg3.assert_nothing_captured(remark="packets forwarded")
837
838
839 class TestClassifierPBR(TestClassifier):
840     """Classifier PBR Test Case"""
841
842     @classmethod
843     def setUpClass(cls):
844         super(TestClassifierPBR, cls).setUpClass()
845
846     @classmethod
847     def tearDownClass(cls):
848         super(TestClassifierPBR, cls).tearDownClass()
849
850     def test_acl_pbr(self):
851         """IP PBR test
852
853         Test scenario for PBR with source IP
854             - Create IPv4 stream for pg0 -> pg3 interface.
855             - Configure PBR fib entry for packet forwarding.
856             - Send and verify received packets on pg3 interface.
857         """
858
859         # PBR testing with source IP
860         pkts = self.create_stream(self.pg0, self.pg3, self.pg_if_packet_sizes)
861         self.pg0.add_stream(pkts)
862
863         key = "pbr"
864         self.create_classify_table(key, self.build_ip_mask(src_ip="ffffffff"))
865         pbr_option = 1
866         # this will create the VRF/table in which we will insert the route
867         self.create_classify_session(
868             self.acl_tbl_idx.get(key),
869             self.build_ip_match(src_ip=self.pg0.remote_ip4),
870             pbr_option,
871             self.pbr_vrfid,
872         )
873         self.assertTrue(self.verify_vrf(self.pbr_vrfid))
874         r = VppIpRoute(
875             self,
876             self.pg3.local_ip4,
877             24,
878             [VppRoutePath(self.pg3.remote_ip4, INVALID_INDEX)],
879             table_id=self.pbr_vrfid,
880         )
881         r.add_vpp_config()
882
883         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
884
885         self.pg_enable_capture(self.pg_interfaces)
886         self.pg_start()
887
888         pkts = self.pg3.get_capture(len(pkts))
889         self.verify_capture(self.pg3, pkts)
890         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key), 0)
891         self.pg0.assert_nothing_captured(remark="packets forwarded")
892         self.pg1.assert_nothing_captured(remark="packets forwarded")
893         self.pg2.assert_nothing_captured(remark="packets forwarded")
894
895         # remove the classify session and the route
896         r.remove_vpp_config()
897         self.create_classify_session(
898             self.acl_tbl_idx.get(key),
899             self.build_ip_match(src_ip=self.pg0.remote_ip4),
900             pbr_option,
901             self.pbr_vrfid,
902             is_add=0,
903         )
904
905         # and the table should be gone.
906         self.assertFalse(self.verify_vrf(self.pbr_vrfid))
907
908
909 class TestClassifierPunt(TestClassifier):
910     """Classifier punt Test Case"""
911
912     @classmethod
913     def setUpClass(cls):
914         super(TestClassifierPunt, cls).setUpClass()
915
916     @classmethod
917     def tearDownClass(cls):
918         super(TestClassifierPunt, cls).tearDownClass()
919
920     def test_punt_udp(self):
921         """IPv4/UDP protocol punt ACL test
922
923         Test scenario for basic punt ACL with UDP protocol
924             - Create IPv4 stream for pg0 -> pg1 interface.
925             - Create punt ACL with UDP IP protocol.
926             - Send and verify received packets on pg1 interface.
927         """
928
929         sport = 6754
930         dport = 17923
931
932         key = "ip4_udp_punt"
933         self.create_classify_table(
934             key, self.build_ip_mask(src_ip="ffffffff", proto="ff", src_port="ffff")
935         )
936         table_index = self.acl_tbl_idx.get(key)
937         self.vapi.punt_acl_add_del(ip4_table_index=table_index)
938         self.acl_active_table = key
939
940         # punt udp packets to dport received on pg0 through pg1
941         self.vapi.set_punt(
942             is_add=1,
943             punt={
944                 "type": VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4,
945                 "punt": {
946                     "l4": {
947                         "af": VppEnum.vl_api_address_family_t.ADDRESS_IP4,
948                         "protocol": VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP,
949                         "port": dport,
950                     }
951                 },
952             },
953         )
954         self.vapi.ip_punt_redirect(
955             punt={
956                 "rx_sw_if_index": self.pg0.sw_if_index,
957                 "tx_sw_if_index": self.pg1.sw_if_index,
958                 "nh": self.pg1.remote_ip4,
959             }
960         )
961
962         pkts = [
963             (
964                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
965                 / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
966                 / UDP(sport=sport, dport=dport)
967                 / Raw("\x17" * 100)
968             )
969         ] * 2
970
971         # allow a session but not matching the stream: expect to drop
972         self.create_classify_session(
973             table_index,
974             self.build_ip_match(
975                 src_ip=self.pg0.remote_ip4,
976                 proto=socket.IPPROTO_UDP,
977                 src_port=sport + 10,
978             ),
979         )
980         self.send_and_assert_no_replies(self.pg0, pkts)
981
982         # allow a session matching the stream: expect to pass
983         self.create_classify_session(
984             table_index,
985             self.build_ip_match(
986                 src_ip=self.pg0.remote_ip4, proto=socket.IPPROTO_UDP, src_port=sport
987             ),
988         )
989         self.send_and_expect_only(self.pg0, pkts, self.pg1)
990
991         # test dump api: ip4 is set, ip6 is not
992         r = self.vapi.punt_acl_get()
993         self.assertEqual(r.ip4_table_index, table_index)
994         self.assertEqual(r.ip6_table_index, 0xFFFFFFFF)
995
996         # cleanup
997         self.acl_active_table = ""
998         self.vapi.punt_acl_add_del(ip4_table_index=table_index, is_add=0)
999
1000         # test dump api: nothing set
1001         r = self.vapi.punt_acl_get()
1002         self.assertEqual(r.ip4_table_index, 0xFFFFFFFF)
1003         self.assertEqual(r.ip6_table_index, 0xFFFFFFFF)
1004
1005
1006 if __name__ == "__main__":
1007     unittest.main(testRunner=VppTestRunner)