vlib: prevent some signals from being executed on workers
[vpp.git] / test / test_classifier.py
1 #!/usr/bin/env python3
2
3 import socket
4 import unittest
5
6 from asfframework import VppTestRunner
7 from scapy.packet import Raw
8
9 from scapy.layers.l2 import Ether
10 from scapy.layers.inet import IP, UDP, TCP
11 from template_classifier import TestClassifier, VarMask, VarMatch
12 from vpp_ip_route import VppIpRoute, VppRoutePath
13 from vpp_ip import INVALID_INDEX
14 from vpp_papi import VppEnum
15
16
17 # Tests split to different test case classes because of issue reported in
18 # ticket VPP-1336
19 class TestClassifierIP(TestClassifier):
20     """Classifier IP Test Case"""
21
22     @classmethod
23     def setUpClass(cls):
24         super(TestClassifierIP, cls).setUpClass()
25
26     @classmethod
27     def tearDownClass(cls):
28         super(TestClassifierIP, cls).tearDownClass()
29
30     def test_iacl_src_ip(self):
31         """Source IP iACL test
32
33         Test scenario for basic IP ACL with source IP
34             - Create IPv4 stream for pg0 -> pg1 interface.
35             - Create iACL with source IP address.
36             - Send and verify received packets on pg1 interface.
37         """
38
39         # Basic iACL testing with source IP
40         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
41         self.pg0.add_stream(pkts)
42
43         key = "ip_src"
44         self.create_classify_table(key, self.build_ip_mask(src_ip="ffffffff"))
45         self.create_classify_session(
46             self.acl_tbl_idx.get(key), self.build_ip_match(src_ip=self.pg0.remote_ip4)
47         )
48         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
49         self.acl_active_table = key
50
51         self.pg_enable_capture(self.pg_interfaces)
52         self.pg_start()
53
54         pkts = self.pg1.get_capture(len(pkts))
55         self.verify_capture(self.pg1, pkts)
56         self.pg0.assert_nothing_captured(remark="packets forwarded")
57         self.pg2.assert_nothing_captured(remark="packets forwarded")
58         self.pg3.assert_nothing_captured(remark="packets forwarded")
59
60     def test_iacl_dst_ip(self):
61         """Destination IP iACL test
62
63         Test scenario for basic IP ACL with destination IP
64             - Create IPv4 stream for pg0 -> pg1 interface.
65             - Create iACL with destination IP address.
66             - Send and verify received packets on pg1 interface.
67         """
68
69         # Basic iACL testing with destination IP
70         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
71         self.pg0.add_stream(pkts)
72
73         key = "ip_dst"
74         self.create_classify_table(key, self.build_ip_mask(dst_ip="ffffffff"))
75         self.create_classify_session(
76             self.acl_tbl_idx.get(key), self.build_ip_match(dst_ip=self.pg1.remote_ip4)
77         )
78         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
79         self.acl_active_table = key
80
81         self.pg_enable_capture(self.pg_interfaces)
82         self.pg_start()
83
84         pkts = self.pg1.get_capture(len(pkts))
85         self.verify_capture(self.pg1, pkts)
86         self.pg0.assert_nothing_captured(remark="packets forwarded")
87         self.pg2.assert_nothing_captured(remark="packets forwarded")
88         self.pg3.assert_nothing_captured(remark="packets forwarded")
89
90     def test_iacl_src_dst_ip(self):
91         """Source and destination IP iACL test
92
93         Test scenario for basic IP ACL with source and destination IP
94             - Create IPv4 stream for pg0 -> pg1 interface.
95             - Create iACL with source and destination IP addresses.
96             - Send and verify received packets on pg1 interface.
97         """
98
99         # Basic iACL testing with source and destination IP
100         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
101         self.pg0.add_stream(pkts)
102
103         key = "ip"
104         self.create_classify_table(
105             key, self.build_ip_mask(src_ip="ffffffff", dst_ip="ffffffff")
106         )
107         self.create_classify_session(
108             self.acl_tbl_idx.get(key),
109             self.build_ip_match(src_ip=self.pg0.remote_ip4, dst_ip=self.pg1.remote_ip4),
110         )
111         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
112         self.acl_active_table = key
113
114         self.pg_enable_capture(self.pg_interfaces)
115         self.pg_start()
116
117         pkts = self.pg1.get_capture(len(pkts))
118         self.verify_capture(self.pg1, pkts)
119         self.pg0.assert_nothing_captured(remark="packets forwarded")
120         self.pg2.assert_nothing_captured(remark="packets forwarded")
121         self.pg3.assert_nothing_captured(remark="packets forwarded")
122
123
124 class TestClassifierUDP(TestClassifier):
125     """Classifier UDP proto Test Case"""
126
127     @classmethod
128     def setUpClass(cls):
129         super(TestClassifierUDP, cls).setUpClass()
130
131     @classmethod
132     def tearDownClass(cls):
133         super(TestClassifierUDP, cls).tearDownClass()
134
135     def test_iacl_proto_udp(self):
136         """UDP protocol iACL test
137
138         Test scenario for basic protocol ACL with UDP protocol
139             - Create IPv4 stream for pg0 -> pg1 interface.
140             - Create iACL with UDP IP protocol.
141             - Send and verify received packets on pg1 interface.
142         """
143
144         # Basic iACL testing with UDP protocol
145         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
146         self.pg0.add_stream(pkts)
147
148         key = "proto_udp"
149         self.create_classify_table(key, self.build_ip_mask(proto="ff"))
150         self.create_classify_session(
151             self.acl_tbl_idx.get(key), self.build_ip_match(proto=socket.IPPROTO_UDP)
152         )
153         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
154         self.acl_active_table = key
155
156         self.pg_enable_capture(self.pg_interfaces)
157         self.pg_start()
158
159         pkts = self.pg1.get_capture(len(pkts))
160         self.verify_capture(self.pg1, pkts)
161         self.pg0.assert_nothing_captured(remark="packets forwarded")
162         self.pg2.assert_nothing_captured(remark="packets forwarded")
163         self.pg3.assert_nothing_captured(remark="packets forwarded")
164
165     def test_iacl_proto_udp_sport(self):
166         """UDP source port iACL test
167
168         Test scenario for basic protocol ACL with UDP and sport
169             - Create IPv4 stream for pg0 -> pg1 interface.
170             - Create iACL with UDP IP protocol and defined sport.
171             - Send and verify received packets on pg1 interface.
172         """
173
174         # Basic iACL testing with UDP and sport
175         sport = 38
176         pkts = self.create_stream(
177             self.pg0, self.pg1, self.pg_if_packet_sizes, UDP(sport=sport, dport=5678)
178         )
179         self.pg0.add_stream(pkts)
180
181         key = "proto_udp_sport"
182         self.create_classify_table(key, self.build_ip_mask(proto="ff", src_port="ffff"))
183         self.create_classify_session(
184             self.acl_tbl_idx.get(key),
185             self.build_ip_match(proto=socket.IPPROTO_UDP, src_port=sport),
186         )
187         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
188         self.acl_active_table = key
189
190         self.pg_enable_capture(self.pg_interfaces)
191         self.pg_start()
192
193         pkts = self.pg1.get_capture(len(pkts))
194         self.verify_capture(self.pg1, pkts)
195         self.pg0.assert_nothing_captured(remark="packets forwarded")
196         self.pg2.assert_nothing_captured(remark="packets forwarded")
197         self.pg3.assert_nothing_captured(remark="packets forwarded")
198
199     def test_iacl_proto_udp_dport(self):
200         """UDP destination port iACL test
201
202         Test scenario for basic protocol ACL with UDP and dport
203             - Create IPv4 stream for pg0 -> pg1 interface.
204             - Create iACL with UDP IP protocol and defined dport.
205             - Send and verify received packets on pg1 interface.
206         """
207
208         # Basic iACL testing with UDP and dport
209         dport = 427
210         pkts = self.create_stream(
211             self.pg0, self.pg1, self.pg_if_packet_sizes, UDP(sport=1234, dport=dport)
212         )
213         self.pg0.add_stream(pkts)
214
215         key = "proto_udp_dport"
216         self.create_classify_table(key, self.build_ip_mask(proto="ff", dst_port="ffff"))
217         self.create_classify_session(
218             self.acl_tbl_idx.get(key),
219             self.build_ip_match(proto=socket.IPPROTO_UDP, dst_port=dport),
220         )
221         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
222         self.acl_active_table = key
223
224         self.pg_enable_capture(self.pg_interfaces)
225         self.pg_start()
226
227         pkts = self.pg1.get_capture(len(pkts))
228         self.verify_capture(self.pg1, pkts)
229         self.pg0.assert_nothing_captured(remark="packets forwarded")
230         self.pg2.assert_nothing_captured(remark="packets forwarded")
231         self.pg3.assert_nothing_captured(remark="packets forwarded")
232
233     def test_iacl_proto_udp_sport_dport(self):
234         """UDP source and destination ports iACL test
235
236         Test scenario for basic protocol ACL with UDP and sport and dport
237             - Create IPv4 stream for pg0 -> pg1 interface.
238             - Create iACL with UDP IP protocol and defined sport and dport.
239             - Send and verify received packets on pg1 interface.
240         """
241
242         # Basic iACL testing with UDP and sport and dport
243         sport = 13720
244         dport = 9080
245         pkts = self.create_stream(
246             self.pg0, self.pg1, self.pg_if_packet_sizes, UDP(sport=sport, dport=dport)
247         )
248         self.pg0.add_stream(pkts)
249
250         key = "proto_udp_ports"
251         self.create_classify_table(
252             key, self.build_ip_mask(proto="ff", src_port="ffff", dst_port="ffff")
253         )
254         self.create_classify_session(
255             self.acl_tbl_idx.get(key),
256             self.build_ip_match(
257                 proto=socket.IPPROTO_UDP, src_port=sport, dst_port=dport
258             ),
259         )
260         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
261         self.acl_active_table = key
262
263         self.pg_enable_capture(self.pg_interfaces)
264         self.pg_start()
265
266         pkts = self.pg1.get_capture(len(pkts))
267         self.verify_capture(self.pg1, pkts)
268         self.pg0.assert_nothing_captured(remark="packets forwarded")
269         self.pg2.assert_nothing_captured(remark="packets forwarded")
270         self.pg3.assert_nothing_captured(remark="packets forwarded")
271
272
273 class TestClassifierTCP(TestClassifier):
274     """Classifier TCP proto Test Case"""
275
276     @classmethod
277     def setUpClass(cls):
278         super(TestClassifierTCP, cls).setUpClass()
279
280     @classmethod
281     def tearDownClass(cls):
282         super(TestClassifierTCP, cls).tearDownClass()
283
284     def test_iacl_proto_tcp(self):
285         """TCP protocol iACL test
286
287         Test scenario for basic protocol ACL with TCP protocol
288             - Create IPv4 stream for pg0 -> pg1 interface.
289             - Create iACL with TCP IP protocol.
290             - Send and verify received packets on pg1 interface.
291         """
292
293         # Basic iACL testing with TCP protocol
294         pkts = self.create_stream(
295             self.pg0, self.pg1, self.pg_if_packet_sizes, TCP(sport=1234, dport=5678)
296         )
297         self.pg0.add_stream(pkts)
298
299         key = "proto_tcp"
300         self.create_classify_table(key, self.build_ip_mask(proto="ff"))
301         self.create_classify_session(
302             self.acl_tbl_idx.get(key), self.build_ip_match(proto=socket.IPPROTO_TCP)
303         )
304         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
305         self.acl_active_table = key
306
307         self.pg_enable_capture(self.pg_interfaces)
308         self.pg_start()
309
310         pkts = self.pg1.get_capture(len(pkts))
311         self.verify_capture(self.pg1, pkts, TCP)
312         self.pg0.assert_nothing_captured(remark="packets forwarded")
313         self.pg2.assert_nothing_captured(remark="packets forwarded")
314         self.pg3.assert_nothing_captured(remark="packets forwarded")
315
316     def test_iacl_proto_tcp_sport(self):
317         """TCP source port iACL test
318
319         Test scenario for basic protocol ACL with TCP and sport
320             - Create IPv4 stream for pg0 -> pg1 interface.
321             - Create iACL with TCP IP protocol and defined sport.
322             - Send and verify received packets on pg1 interface.
323         """
324
325         # Basic iACL testing with TCP and sport
326         sport = 38
327         pkts = self.create_stream(
328             self.pg0, self.pg1, self.pg_if_packet_sizes, TCP(sport=sport, dport=5678)
329         )
330         self.pg0.add_stream(pkts)
331
332         key = "proto_tcp_sport"
333         self.create_classify_table(key, self.build_ip_mask(proto="ff", src_port="ffff"))
334         self.create_classify_session(
335             self.acl_tbl_idx.get(key),
336             self.build_ip_match(proto=socket.IPPROTO_TCP, src_port=sport),
337         )
338         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
339         self.acl_active_table = key
340
341         self.pg_enable_capture(self.pg_interfaces)
342         self.pg_start()
343
344         pkts = self.pg1.get_capture(len(pkts))
345         self.verify_capture(self.pg1, pkts, TCP)
346         self.pg0.assert_nothing_captured(remark="packets forwarded")
347         self.pg2.assert_nothing_captured(remark="packets forwarded")
348         self.pg3.assert_nothing_captured(remark="packets forwarded")
349
350     def test_iacl_proto_tcp_dport(self):
351         """TCP destination port iACL test
352
353         Test scenario for basic protocol ACL with TCP and dport
354             - Create IPv4 stream for pg0 -> pg1 interface.
355             - Create iACL with TCP IP protocol and defined dport.
356             - Send and verify received packets on pg1 interface.
357         """
358
359         # Basic iACL testing with TCP and dport
360         dport = 427
361         pkts = self.create_stream(
362             self.pg0, self.pg1, self.pg_if_packet_sizes, TCP(sport=1234, dport=dport)
363         )
364         self.pg0.add_stream(pkts)
365
366         key = "proto_tcp_sport"
367         self.create_classify_table(key, self.build_ip_mask(proto="ff", dst_port="ffff"))
368         self.create_classify_session(
369             self.acl_tbl_idx.get(key),
370             self.build_ip_match(proto=socket.IPPROTO_TCP, dst_port=dport),
371         )
372         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
373         self.acl_active_table = key
374
375         self.pg_enable_capture(self.pg_interfaces)
376         self.pg_start()
377
378         pkts = self.pg1.get_capture(len(pkts))
379         self.verify_capture(self.pg1, pkts, TCP)
380         self.pg0.assert_nothing_captured(remark="packets forwarded")
381         self.pg2.assert_nothing_captured(remark="packets forwarded")
382         self.pg3.assert_nothing_captured(remark="packets forwarded")
383
384     def test_iacl_proto_tcp_sport_dport(self):
385         """TCP source and destination ports iACL test
386
387         Test scenario for basic protocol ACL with TCP and sport and dport
388             - Create IPv4 stream for pg0 -> pg1 interface.
389             - Create iACL with TCP IP protocol and defined sport and dport.
390             - Send and verify received packets on pg1 interface.
391         """
392
393         # Basic iACL testing with TCP and sport and dport
394         sport = 13720
395         dport = 9080
396         pkts = self.create_stream(
397             self.pg0, self.pg1, self.pg_if_packet_sizes, TCP(sport=sport, dport=dport)
398         )
399         self.pg0.add_stream(pkts)
400
401         key = "proto_tcp_ports"
402         self.create_classify_table(
403             key, self.build_ip_mask(proto="ff", src_port="ffff", dst_port="ffff")
404         )
405         self.create_classify_session(
406             self.acl_tbl_idx.get(key),
407             self.build_ip_match(
408                 proto=socket.IPPROTO_TCP, src_port=sport, dst_port=dport
409             ),
410         )
411         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
412         self.acl_active_table = key
413
414         self.pg_enable_capture(self.pg_interfaces)
415         self.pg_start()
416
417         pkts = self.pg1.get_capture(len(pkts))
418         self.verify_capture(self.pg1, pkts, TCP)
419         self.pg0.assert_nothing_captured(remark="packets forwarded")
420         self.pg2.assert_nothing_captured(remark="packets forwarded")
421         self.pg3.assert_nothing_captured(remark="packets forwarded")
422
423
424 class TestClassifierIPOut(TestClassifier):
425     """Classifier output IP Test Case"""
426
427     @classmethod
428     def setUpClass(cls):
429         super(TestClassifierIPOut, cls).setUpClass()
430
431     @classmethod
432     def tearDownClass(cls):
433         super(TestClassifierIPOut, cls).tearDownClass()
434
435     def test_acl_ip_out(self):
436         """Output IP ACL test
437
438         Test scenario for basic IP ACL with source IP
439             - Create IPv4 stream for pg1 -> pg0 interface.
440             - Create ACL with source IP address.
441             - Send and verify received packets on pg0 interface.
442         """
443
444         # Basic oACL testing with source IP
445         pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes)
446         self.pg1.add_stream(pkts)
447
448         key = "ip_out"
449         self.create_classify_table(
450             key, self.build_ip_mask(src_ip="ffffffff"), data_offset=0
451         )
452         self.create_classify_session(
453             self.acl_tbl_idx.get(key), self.build_ip_match(src_ip=self.pg1.remote_ip4)
454         )
455         self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
456         self.acl_active_table = key
457
458         self.pg_enable_capture(self.pg_interfaces)
459         self.pg_start()
460
461         pkts = self.pg0.get_capture(len(pkts))
462         self.verify_capture(self.pg0, pkts)
463         self.pg1.assert_nothing_captured(remark="packets forwarded")
464         self.pg2.assert_nothing_captured(remark="packets forwarded")
465         self.pg3.assert_nothing_captured(remark="packets forwarded")
466
467
468 class TestClassifierMAC(TestClassifier):
469     """Classifier MAC Test Case"""
470
471     @classmethod
472     def setUpClass(cls):
473         super(TestClassifierMAC, cls).setUpClass()
474
475     @classmethod
476     def tearDownClass(cls):
477         super(TestClassifierMAC, cls).tearDownClass()
478
479     def test_acl_mac(self):
480         """MAC ACL test
481
482         Test scenario for basic MAC ACL with source MAC
483             - Create IPv4 stream for pg0 -> pg2 interface.
484             - Create ACL with source MAC address.
485             - Send and verify received packets on pg2 interface.
486         """
487
488         # Basic iACL testing with source MAC
489         pkts = self.create_stream(self.pg0, self.pg2, self.pg_if_packet_sizes)
490         self.pg0.add_stream(pkts)
491
492         key = "mac"
493         self.create_classify_table(
494             key, self.build_mac_mask(src_mac="ffffffffffff"), data_offset=-14
495         )
496         self.create_classify_session(
497             self.acl_tbl_idx.get(key), self.build_mac_match(src_mac=self.pg0.remote_mac)
498         )
499         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
500         self.acl_active_table = key
501
502         self.pg_enable_capture(self.pg_interfaces)
503         self.pg_start()
504
505         pkts = self.pg2.get_capture(len(pkts))
506         self.verify_capture(self.pg2, pkts)
507         self.pg0.assert_nothing_captured(remark="packets forwarded")
508         self.pg1.assert_nothing_captured(remark="packets forwarded")
509         self.pg3.assert_nothing_captured(remark="packets forwarded")
510
511
512 class TestClassifierComplex(TestClassifier):
513     """Large & Nested Classifiers Test Cases"""
514
515     @classmethod
516     def setUpClass(cls):
517         super(TestClassifierComplex, cls).setUpClass()
518
519     @classmethod
520     def tearDownClass(cls):
521         super(TestClassifierComplex, cls).tearDownClass()
522
523     def test_iacl_large(self):
524         """Large input ACL test
525
526         Test scenario for Large ACL matching on ethernet+ip+udp headers
527             - Create IPv4 stream for pg0 -> pg1 interface.
528             - Create large acl matching on ethernet+ip+udp header fields
529             - Send and verify received packets on pg1 interface.
530         """
531
532         # 40b offset = 80bytes - (sizeof(UDP/IP/ETH) + 4b)
533         # + 4b as build_ip_ma*() func, do not match against UDP Len & Chksum
534         msk = VarMask(offset=40, spec="ffff")
535         mth = VarMatch(offset=40, value=0x1234, length=2)
536
537         payload_msk = self.build_payload_mask([msk])
538         payload_match = self.build_payload_match([mth])
539
540         sport = 13720
541         dport = 9080
542
543         # 36b offset = 80bytes - (sizeof(UDP/IP/ETH))
544         packet_ex = bytes.fromhex(("0" * 36) + "1234")
545         pkts = self.create_stream(
546             self.pg0,
547             self.pg1,
548             self.pg_if_packet_sizes,
549             UDP(sport=sport, dport=dport),
550             packet_ex,
551         )
552         self.pg0.add_stream(pkts)
553
554         key = "large_in"
555         self.create_classify_table(
556             key,
557             self.build_mac_mask(
558                 src_mac="ffffffffffff", dst_mac="ffffffffffff", ether_type="ffff"
559             )
560             + self.build_ip_mask(
561                 proto="ff",
562                 src_ip="ffffffff",
563                 dst_ip="ffffffff",
564                 src_port="ffff",
565                 dst_port="ffff",
566             )
567             + payload_msk,
568             data_offset=-14,
569         )
570
571         self.create_classify_session(
572             self.acl_tbl_idx.get(key),
573             self.build_mac_match(
574                 src_mac=self.pg0.remote_mac,
575                 dst_mac=self.pg0.local_mac,
576                 # ipv4 next header
577                 ether_type="0800",
578             )
579             + self.build_ip_match(
580                 proto=socket.IPPROTO_UDP,
581                 src_ip=self.pg0.remote_ip4,
582                 dst_ip=self.pg1.remote_ip4,
583                 src_port=sport,
584                 dst_port=dport,
585             )
586             + payload_match,
587         )
588
589         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
590         self.acl_active_table = key
591
592         self.pg_enable_capture(self.pg_interfaces)
593         self.pg_start()
594
595         pkts = self.pg1.get_capture(len(pkts))
596         self.verify_capture(self.pg1, pkts)
597         self.pg0.assert_nothing_captured(remark="packets forwarded")
598         self.pg2.assert_nothing_captured(remark="packets forwarded")
599         self.pg3.assert_nothing_captured(remark="packets forwarded")
600
601     def test_oacl_large(self):
602         """Large output ACL test
603         Test scenario for Large ACL matching on ethernet+ip+udp headers
604             - Create IPv4 stream for pg1 -> pg0 interface.
605             - Create large acl matching on ethernet+ip+udp header fields
606             - Send and verify received packets on pg0 interface.
607         """
608
609         # 40b offset = 80bytes - (sizeof(UDP/IP/ETH) + 4b)
610         # + 4b as build_ip_ma*() func, do not match against UDP Len & Chksum
611         msk = VarMask(offset=40, spec="ffff")
612         mth = VarMatch(offset=40, value=0x1234, length=2)
613
614         payload_msk = self.build_payload_mask([msk])
615         payload_match = self.build_payload_match([mth])
616
617         sport = 13720
618         dport = 9080
619
620         # 36b offset = 80bytes - (sizeof(UDP/IP/ETH))
621         packet_ex = bytes.fromhex(("0" * 36) + "1234")
622         pkts = self.create_stream(
623             self.pg1,
624             self.pg0,
625             self.pg_if_packet_sizes,
626             UDP(sport=sport, dport=dport),
627             packet_ex,
628         )
629         self.pg1.add_stream(pkts)
630
631         key = "large_out"
632         self.create_classify_table(
633             key,
634             self.build_mac_mask(
635                 src_mac="ffffffffffff", dst_mac="ffffffffffff", ether_type="ffff"
636             )
637             + self.build_ip_mask(
638                 proto="ff",
639                 src_ip="ffffffff",
640                 dst_ip="ffffffff",
641                 src_port="ffff",
642                 dst_port="ffff",
643             )
644             + payload_msk,
645             data_offset=-14,
646         )
647
648         self.create_classify_session(
649             self.acl_tbl_idx.get(key),
650             self.build_mac_match(
651                 src_mac=self.pg0.local_mac,
652                 dst_mac=self.pg0.remote_mac,
653                 # ipv4 next header
654                 ether_type="0800",
655             )
656             + self.build_ip_match(
657                 proto=socket.IPPROTO_UDP,
658                 src_ip=self.pg1.remote_ip4,
659                 dst_ip=self.pg0.remote_ip4,
660                 src_port=sport,
661                 dst_port=dport,
662             )
663             + payload_match,
664         )
665
666         self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
667         self.acl_active_table = key
668
669         self.pg_enable_capture(self.pg_interfaces)
670         self.pg_start()
671
672         pkts = self.pg0.get_capture(len(pkts))
673         self.verify_capture(self.pg0, pkts)
674         self.pg1.assert_nothing_captured(remark="packets forwarded")
675         self.pg2.assert_nothing_captured(remark="packets forwarded")
676         self.pg3.assert_nothing_captured(remark="packets forwarded")
677
678     def test_iacl_nested(self):
679         """Nested input ACL test
680
681         Test scenario for Large ACL matching on ethernet+ip+udp headers
682             - Create IPv4 stream for pg0 -> pg1 interface.
683             - Create 1st classifier table, without any entries
684             - Create nested acl matching on ethernet+ip+udp header fields
685             - Send and verify received packets on pg1 interface.
686         """
687
688         sport = 13720
689         dport = 9080
690         pkts = self.create_stream(
691             self.pg0, self.pg1, self.pg_if_packet_sizes, UDP(sport=sport, dport=dport)
692         )
693
694         self.pg0.add_stream(pkts)
695
696         subtable_key = "subtable_in"
697         self.create_classify_table(
698             subtable_key,
699             self.build_mac_mask(
700                 src_mac="ffffffffffff", dst_mac="ffffffffffff", ether_type="ffff"
701             )
702             + self.build_ip_mask(
703                 proto="ff",
704                 src_ip="ffffffff",
705                 dst_ip="ffffffff",
706                 src_port="ffff",
707                 dst_port="ffff",
708             ),
709             data_offset=-14,
710         )
711
712         key = "nested_in"
713         self.create_classify_table(
714             key,
715             self.build_mac_mask(
716                 src_mac="ffffffffffff", dst_mac="ffffffffffff", ether_type="ffff"
717             )
718             + self.build_ip_mask(
719                 proto="ff",
720                 src_ip="ffffffff",
721                 dst_ip="ffffffff",
722                 src_port="ffff",
723                 dst_port="ffff",
724             ),
725             next_table_index=self.acl_tbl_idx.get(subtable_key),
726         )
727
728         self.create_classify_session(
729             self.acl_tbl_idx.get(subtable_key),
730             self.build_mac_match(
731                 src_mac=self.pg0.remote_mac,
732                 dst_mac=self.pg0.local_mac,
733                 # ipv4 next header
734                 ether_type="0800",
735             )
736             + self.build_ip_match(
737                 proto=socket.IPPROTO_UDP,
738                 src_ip=self.pg0.remote_ip4,
739                 dst_ip=self.pg1.remote_ip4,
740                 src_port=sport,
741                 dst_port=dport,
742             ),
743         )
744
745         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
746         self.acl_active_table = key
747
748         self.pg_enable_capture(self.pg_interfaces)
749         self.pg_start()
750
751         pkts = self.pg1.get_capture(len(pkts))
752         self.verify_capture(self.pg1, pkts)
753         self.pg0.assert_nothing_captured(remark="packets forwarded")
754         self.pg2.assert_nothing_captured(remark="packets forwarded")
755         self.pg3.assert_nothing_captured(remark="packets forwarded")
756
757     def test_oacl_nested(self):
758         """Nested output ACL test
759
760         Test scenario for Large ACL matching on ethernet+ip+udp headers
761             - Create IPv4 stream for pg1 -> pg0 interface.
762             - Create 1st classifier table, without any entries
763             - Create nested acl matching on ethernet+ip+udp header fields
764             - Send and verify received packets on pg0 interface.
765         """
766
767         sport = 13720
768         dport = 9080
769         pkts = self.create_stream(
770             self.pg1, self.pg0, self.pg_if_packet_sizes, UDP(sport=sport, dport=dport)
771         )
772         self.pg1.add_stream(pkts)
773
774         subtable_key = "subtable_out"
775         self.create_classify_table(
776             subtable_key,
777             self.build_mac_mask(
778                 src_mac="ffffffffffff", dst_mac="ffffffffffff", ether_type="ffff"
779             )
780             + self.build_ip_mask(
781                 proto="ff",
782                 src_ip="ffffffff",
783                 dst_ip="ffffffff",
784                 src_port="ffff",
785                 dst_port="ffff",
786             ),
787             data_offset=-14,
788         )
789
790         key = "nested_out"
791         self.create_classify_table(
792             key,
793             self.build_mac_mask(
794                 src_mac="ffffffffffff", dst_mac="ffffffffffff", ether_type="ffff"
795             )
796             + self.build_ip_mask(
797                 proto="ff",
798                 src_ip="ffffffff",
799                 dst_ip="ffffffff",
800                 src_port="ffff",
801                 dst_port="ffff",
802             ),
803             next_table_index=self.acl_tbl_idx.get(subtable_key),
804             data_offset=-14,
805         )
806
807         self.create_classify_session(
808             self.acl_tbl_idx.get(subtable_key),
809             self.build_mac_match(
810                 src_mac=self.pg0.local_mac,
811                 dst_mac=self.pg0.remote_mac,
812                 # ipv4 next header
813                 ether_type="0800",
814             )
815             + self.build_ip_match(
816                 proto=socket.IPPROTO_UDP,
817                 src_ip=self.pg1.remote_ip4,
818                 dst_ip=self.pg0.remote_ip4,
819                 src_port=sport,
820                 dst_port=dport,
821             ),
822         )
823
824         self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
825         self.acl_active_table = key
826
827         self.pg_enable_capture(self.pg_interfaces)
828         self.pg_start()
829
830         pkts = self.pg0.get_capture(len(pkts))
831         self.verify_capture(self.pg0, pkts)
832         self.pg1.assert_nothing_captured(remark="packets forwarded")
833         self.pg2.assert_nothing_captured(remark="packets forwarded")
834         self.pg3.assert_nothing_captured(remark="packets forwarded")
835
836
837 class TestClassifierPBR(TestClassifier):
838     """Classifier PBR Test Case"""
839
840     @classmethod
841     def setUpClass(cls):
842         super(TestClassifierPBR, cls).setUpClass()
843
844     @classmethod
845     def tearDownClass(cls):
846         super(TestClassifierPBR, cls).tearDownClass()
847
848     def test_acl_pbr(self):
849         """IP PBR test
850
851         Test scenario for PBR with source IP
852             - Create IPv4 stream for pg0 -> pg3 interface.
853             - Configure PBR fib entry for packet forwarding.
854             - Send and verify received packets on pg3 interface.
855         """
856
857         # PBR testing with source IP
858         pkts = self.create_stream(self.pg0, self.pg3, self.pg_if_packet_sizes)
859         self.pg0.add_stream(pkts)
860
861         key = "pbr"
862         self.create_classify_table(key, self.build_ip_mask(src_ip="ffffffff"))
863         pbr_option = 1
864         # this will create the VRF/table in which we will insert the route
865         self.create_classify_session(
866             self.acl_tbl_idx.get(key),
867             self.build_ip_match(src_ip=self.pg0.remote_ip4),
868             pbr_option,
869             self.pbr_vrfid,
870         )
871         self.assertTrue(self.verify_vrf(self.pbr_vrfid))
872         r = VppIpRoute(
873             self,
874             self.pg3.local_ip4,
875             24,
876             [VppRoutePath(self.pg3.remote_ip4, INVALID_INDEX)],
877             table_id=self.pbr_vrfid,
878         )
879         r.add_vpp_config()
880
881         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
882
883         self.pg_enable_capture(self.pg_interfaces)
884         self.pg_start()
885
886         pkts = self.pg3.get_capture(len(pkts))
887         self.verify_capture(self.pg3, pkts)
888         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key), 0)
889         self.pg0.assert_nothing_captured(remark="packets forwarded")
890         self.pg1.assert_nothing_captured(remark="packets forwarded")
891         self.pg2.assert_nothing_captured(remark="packets forwarded")
892
893         # remove the classify session and the route
894         r.remove_vpp_config()
895         self.create_classify_session(
896             self.acl_tbl_idx.get(key),
897             self.build_ip_match(src_ip=self.pg0.remote_ip4),
898             pbr_option,
899             self.pbr_vrfid,
900             is_add=0,
901         )
902
903         # and the table should be gone.
904         self.assertFalse(self.verify_vrf(self.pbr_vrfid))
905
906
907 class TestClassifierPunt(TestClassifier):
908     """Classifier punt Test Case"""
909
910     @classmethod
911     def setUpClass(cls):
912         super(TestClassifierPunt, cls).setUpClass()
913
914     @classmethod
915     def tearDownClass(cls):
916         super(TestClassifierPunt, cls).tearDownClass()
917
918     def test_punt_udp(self):
919         """IPv4/UDP protocol punt ACL test
920
921         Test scenario for basic punt ACL with UDP protocol
922             - Create IPv4 stream for pg0 -> pg1 interface.
923             - Create punt ACL with UDP IP protocol.
924             - Send and verify received packets on pg1 interface.
925         """
926
927         sport = 6754
928         dport = 17923
929
930         key = "ip4_udp_punt"
931         self.create_classify_table(
932             key, self.build_ip_mask(src_ip="ffffffff", proto="ff", src_port="ffff")
933         )
934         table_index = self.acl_tbl_idx.get(key)
935         self.vapi.punt_acl_add_del(ip4_table_index=table_index)
936         self.acl_active_table = key
937
938         # punt udp packets to dport received on pg0 through pg1
939         self.vapi.set_punt(
940             is_add=1,
941             punt={
942                 "type": VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4,
943                 "punt": {
944                     "l4": {
945                         "af": VppEnum.vl_api_address_family_t.ADDRESS_IP4,
946                         "protocol": VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP,
947                         "port": dport,
948                     }
949                 },
950             },
951         )
952         self.vapi.ip_punt_redirect(
953             punt={
954                 "rx_sw_if_index": self.pg0.sw_if_index,
955                 "tx_sw_if_index": self.pg1.sw_if_index,
956                 "nh": self.pg1.remote_ip4,
957             }
958         )
959
960         pkts = [
961             (
962                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
963                 / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
964                 / UDP(sport=sport, dport=dport)
965                 / Raw("\x17" * 100)
966             )
967         ] * 2
968
969         # allow a session but not matching the stream: expect to drop
970         self.create_classify_session(
971             table_index,
972             self.build_ip_match(
973                 src_ip=self.pg0.remote_ip4,
974                 proto=socket.IPPROTO_UDP,
975                 src_port=sport + 10,
976             ),
977         )
978         self.send_and_assert_no_replies(self.pg0, pkts)
979
980         # allow a session matching the stream: expect to pass
981         self.create_classify_session(
982             table_index,
983             self.build_ip_match(
984                 src_ip=self.pg0.remote_ip4, proto=socket.IPPROTO_UDP, src_port=sport
985             ),
986         )
987         self.send_and_expect_only(self.pg0, pkts, self.pg1)
988
989         # test dump api: ip4 is set, ip6 is not
990         r = self.vapi.punt_acl_get()
991         self.assertEqual(r.ip4_table_index, table_index)
992         self.assertEqual(r.ip6_table_index, 0xFFFFFFFF)
993
994         # cleanup
995         self.acl_active_table = ""
996         self.vapi.punt_acl_add_del(ip4_table_index=table_index, is_add=0)
997
998         # test dump api: nothing set
999         r = self.vapi.punt_acl_get()
1000         self.assertEqual(r.ip4_table_index, 0xFFFFFFFF)
1001         self.assertEqual(r.ip6_table_index, 0xFFFFFFFF)
1002
1003
1004 if __name__ == "__main__":
1005     unittest.main(testRunner=VppTestRunner)