classify: add API to retrieve punt ACL tables
[vpp.git] / test / test_classifier.py
1 #!/usr/bin/env python3
2
3 import binascii
4 import socket
5 import unittest
6
7 from framework import VppTestCase, VppTestRunner
8 from scapy.packet import Raw, Packet
9
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP, UDP, TCP
12 from util import ppp
13 from template_classifier import TestClassifier, VarMask, VarMatch
14 from vpp_ip_route import VppIpRoute, VppRoutePath
15 from vpp_ip import INVALID_INDEX
16 from vpp_papi import VppEnum
17
18
19 # Tests split to different test case classes because of issue reported in
20 # ticket VPP-1336
21 class TestClassifierIP(TestClassifier):
22     """ Classifier IP Test Case """
23
24     @classmethod
25     def setUpClass(cls):
26         super(TestClassifierIP, cls).setUpClass()
27
28     @classmethod
29     def tearDownClass(cls):
30         super(TestClassifierIP, cls).tearDownClass()
31
32     def test_iacl_src_ip(self):
33         """ Source IP iACL test
34
35         Test scenario for basic IP ACL with source IP
36             - Create IPv4 stream for pg0 -> pg1 interface.
37             - Create iACL with source IP address.
38             - Send and verify received packets on pg1 interface.
39         """
40
41         # Basic iACL testing with source IP
42         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
43         self.pg0.add_stream(pkts)
44
45         key = 'ip_src'
46         self.create_classify_table(key, self.build_ip_mask(src_ip='ffffffff'))
47         self.create_classify_session(
48             self.acl_tbl_idx.get(key),
49             self.build_ip_match(src_ip=self.pg0.remote_ip4))
50         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
51         self.acl_active_table = key
52
53         self.pg_enable_capture(self.pg_interfaces)
54         self.pg_start()
55
56         pkts = self.pg1.get_capture(len(pkts))
57         self.verify_capture(self.pg1, pkts)
58         self.pg0.assert_nothing_captured(remark="packets forwarded")
59         self.pg2.assert_nothing_captured(remark="packets forwarded")
60         self.pg3.assert_nothing_captured(remark="packets forwarded")
61
62     def test_iacl_dst_ip(self):
63         """ Destination IP iACL test
64
65         Test scenario for basic IP ACL with destination IP
66             - Create IPv4 stream for pg0 -> pg1 interface.
67             - Create iACL with destination IP address.
68             - Send and verify received packets on pg1 interface.
69         """
70
71         # Basic iACL testing with destination IP
72         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
73         self.pg0.add_stream(pkts)
74
75         key = 'ip_dst'
76         self.create_classify_table(key, self.build_ip_mask(dst_ip='ffffffff'))
77         self.create_classify_session(
78             self.acl_tbl_idx.get(key),
79             self.build_ip_match(dst_ip=self.pg1.remote_ip4))
80         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
81         self.acl_active_table = key
82
83         self.pg_enable_capture(self.pg_interfaces)
84         self.pg_start()
85
86         pkts = self.pg1.get_capture(len(pkts))
87         self.verify_capture(self.pg1, pkts)
88         self.pg0.assert_nothing_captured(remark="packets forwarded")
89         self.pg2.assert_nothing_captured(remark="packets forwarded")
90         self.pg3.assert_nothing_captured(remark="packets forwarded")
91
92     def test_iacl_src_dst_ip(self):
93         """ Source and destination IP iACL test
94
95         Test scenario for basic IP ACL with source and destination IP
96             - Create IPv4 stream for pg0 -> pg1 interface.
97             - Create iACL with source and destination IP addresses.
98             - Send and verify received packets on pg1 interface.
99         """
100
101         # Basic iACL testing with source and destination IP
102         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
103         self.pg0.add_stream(pkts)
104
105         key = 'ip'
106         self.create_classify_table(
107             key, self.build_ip_mask(src_ip='ffffffff', dst_ip='ffffffff'))
108         self.create_classify_session(
109             self.acl_tbl_idx.get(key),
110             self.build_ip_match(src_ip=self.pg0.remote_ip4,
111                                 dst_ip=self.pg1.remote_ip4))
112         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
113         self.acl_active_table = key
114
115         self.pg_enable_capture(self.pg_interfaces)
116         self.pg_start()
117
118         pkts = self.pg1.get_capture(len(pkts))
119         self.verify_capture(self.pg1, pkts)
120         self.pg0.assert_nothing_captured(remark="packets forwarded")
121         self.pg2.assert_nothing_captured(remark="packets forwarded")
122         self.pg3.assert_nothing_captured(remark="packets forwarded")
123
124
125 class TestClassifierUDP(TestClassifier):
126     """ Classifier UDP proto Test Case """
127
128     @classmethod
129     def setUpClass(cls):
130         super(TestClassifierUDP, cls).setUpClass()
131
132     @classmethod
133     def tearDownClass(cls):
134         super(TestClassifierUDP, cls).tearDownClass()
135
136     def test_iacl_proto_udp(self):
137         """ UDP protocol iACL test
138
139         Test scenario for basic protocol ACL with UDP protocol
140             - Create IPv4 stream for pg0 -> pg1 interface.
141             - Create iACL with UDP IP protocol.
142             - Send and verify received packets on pg1 interface.
143         """
144
145         # Basic iACL testing with UDP protocol
146         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
147         self.pg0.add_stream(pkts)
148
149         key = 'proto_udp'
150         self.create_classify_table(key, self.build_ip_mask(proto='ff'))
151         self.create_classify_session(
152             self.acl_tbl_idx.get(key),
153             self.build_ip_match(proto=socket.IPPROTO_UDP))
154         self.input_acl_set_interface(
155             self.pg0, self.acl_tbl_idx.get(key))
156         self.acl_active_table = key
157
158         self.pg_enable_capture(self.pg_interfaces)
159         self.pg_start()
160
161         pkts = self.pg1.get_capture(len(pkts))
162         self.verify_capture(self.pg1, pkts)
163         self.pg0.assert_nothing_captured(remark="packets forwarded")
164         self.pg2.assert_nothing_captured(remark="packets forwarded")
165         self.pg3.assert_nothing_captured(remark="packets forwarded")
166
167     def test_iacl_proto_udp_sport(self):
168         """ UDP source port iACL test
169
170         Test scenario for basic protocol ACL with UDP and sport
171             - Create IPv4 stream for pg0 -> pg1 interface.
172             - Create iACL with UDP IP protocol and defined sport.
173             - Send and verify received packets on pg1 interface.
174         """
175
176         # Basic iACL testing with UDP and sport
177         sport = 38
178         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
179                                   UDP(sport=sport, dport=5678))
180         self.pg0.add_stream(pkts)
181
182         key = 'proto_udp_sport'
183         self.create_classify_table(
184             key, self.build_ip_mask(proto='ff', src_port='ffff'))
185         self.create_classify_session(
186             self.acl_tbl_idx.get(key),
187             self.build_ip_match(proto=socket.IPPROTO_UDP, src_port=sport))
188         self.input_acl_set_interface(
189             self.pg0, self.acl_tbl_idx.get(key))
190         self.acl_active_table = key
191
192         self.pg_enable_capture(self.pg_interfaces)
193         self.pg_start()
194
195         pkts = self.pg1.get_capture(len(pkts))
196         self.verify_capture(self.pg1, pkts)
197         self.pg0.assert_nothing_captured(remark="packets forwarded")
198         self.pg2.assert_nothing_captured(remark="packets forwarded")
199         self.pg3.assert_nothing_captured(remark="packets forwarded")
200
201     def test_iacl_proto_udp_dport(self):
202         """ UDP destination port iACL test
203
204         Test scenario for basic protocol ACL with UDP and dport
205             - Create IPv4 stream for pg0 -> pg1 interface.
206             - Create iACL with UDP IP protocol and defined dport.
207             - Send and verify received packets on pg1 interface.
208         """
209
210         # Basic iACL testing with UDP and dport
211         dport = 427
212         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
213                                   UDP(sport=1234, dport=dport))
214         self.pg0.add_stream(pkts)
215
216         key = 'proto_udp_dport'
217         self.create_classify_table(
218             key, self.build_ip_mask(proto='ff', dst_port='ffff'))
219         self.create_classify_session(
220             self.acl_tbl_idx.get(key),
221             self.build_ip_match(proto=socket.IPPROTO_UDP, dst_port=dport))
222         self.input_acl_set_interface(
223             self.pg0, self.acl_tbl_idx.get(key))
224         self.acl_active_table = key
225
226         self.pg_enable_capture(self.pg_interfaces)
227         self.pg_start()
228
229         pkts = self.pg1.get_capture(len(pkts))
230         self.verify_capture(self.pg1, pkts)
231         self.pg0.assert_nothing_captured(remark="packets forwarded")
232         self.pg2.assert_nothing_captured(remark="packets forwarded")
233         self.pg3.assert_nothing_captured(remark="packets forwarded")
234
235     def test_iacl_proto_udp_sport_dport(self):
236         """ UDP source and destination ports iACL test
237
238         Test scenario for basic protocol ACL with UDP and sport and dport
239             - Create IPv4 stream for pg0 -> pg1 interface.
240             - Create iACL with UDP IP protocol and defined sport and dport.
241             - Send and verify received packets on pg1 interface.
242         """
243
244         # Basic iACL testing with UDP and sport and dport
245         sport = 13720
246         dport = 9080
247         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
248                                   UDP(sport=sport, dport=dport))
249         self.pg0.add_stream(pkts)
250
251         key = 'proto_udp_ports'
252         self.create_classify_table(
253             key,
254             self.build_ip_mask(proto='ff', src_port='ffff', dst_port='ffff'))
255         self.create_classify_session(
256             self.acl_tbl_idx.get(key),
257             self.build_ip_match(proto=socket.IPPROTO_UDP, src_port=sport,
258                                 dst_port=dport))
259         self.input_acl_set_interface(
260             self.pg0, self.acl_tbl_idx.get(key))
261         self.acl_active_table = key
262
263         self.pg_enable_capture(self.pg_interfaces)
264         self.pg_start()
265
266         pkts = self.pg1.get_capture(len(pkts))
267         self.verify_capture(self.pg1, pkts)
268         self.pg0.assert_nothing_captured(remark="packets forwarded")
269         self.pg2.assert_nothing_captured(remark="packets forwarded")
270         self.pg3.assert_nothing_captured(remark="packets forwarded")
271
272
273 class TestClassifierTCP(TestClassifier):
274     """ Classifier TCP proto Test Case """
275
276     @classmethod
277     def setUpClass(cls):
278         super(TestClassifierTCP, cls).setUpClass()
279
280     @classmethod
281     def tearDownClass(cls):
282         super(TestClassifierTCP, cls).tearDownClass()
283
284     def test_iacl_proto_tcp(self):
285         """ TCP protocol iACL test
286
287         Test scenario for basic protocol ACL with TCP protocol
288             - Create IPv4 stream for pg0 -> pg1 interface.
289             - Create iACL with TCP IP protocol.
290             - Send and verify received packets on pg1 interface.
291         """
292
293         # Basic iACL testing with TCP protocol
294         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
295                                   TCP(sport=1234, dport=5678))
296         self.pg0.add_stream(pkts)
297
298         key = 'proto_tcp'
299         self.create_classify_table(key, self.build_ip_mask(proto='ff'))
300         self.create_classify_session(
301             self.acl_tbl_idx.get(key),
302             self.build_ip_match(proto=socket.IPPROTO_TCP))
303         self.input_acl_set_interface(
304             self.pg0, self.acl_tbl_idx.get(key))
305         self.acl_active_table = key
306
307         self.pg_enable_capture(self.pg_interfaces)
308         self.pg_start()
309
310         pkts = self.pg1.get_capture(len(pkts))
311         self.verify_capture(self.pg1, pkts, TCP)
312         self.pg0.assert_nothing_captured(remark="packets forwarded")
313         self.pg2.assert_nothing_captured(remark="packets forwarded")
314         self.pg3.assert_nothing_captured(remark="packets forwarded")
315
316     def test_iacl_proto_tcp_sport(self):
317         """ TCP source port iACL test
318
319         Test scenario for basic protocol ACL with TCP and sport
320             - Create IPv4 stream for pg0 -> pg1 interface.
321             - Create iACL with TCP IP protocol and defined sport.
322             - Send and verify received packets on pg1 interface.
323         """
324
325         # Basic iACL testing with TCP and sport
326         sport = 38
327         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
328                                   TCP(sport=sport, dport=5678))
329         self.pg0.add_stream(pkts)
330
331         key = 'proto_tcp_sport'
332         self.create_classify_table(
333             key, self.build_ip_mask(proto='ff', src_port='ffff'))
334         self.create_classify_session(
335             self.acl_tbl_idx.get(key),
336             self.build_ip_match(proto=socket.IPPROTO_TCP, src_port=sport))
337         self.input_acl_set_interface(
338             self.pg0, self.acl_tbl_idx.get(key))
339         self.acl_active_table = key
340
341         self.pg_enable_capture(self.pg_interfaces)
342         self.pg_start()
343
344         pkts = self.pg1.get_capture(len(pkts))
345         self.verify_capture(self.pg1, pkts, TCP)
346         self.pg0.assert_nothing_captured(remark="packets forwarded")
347         self.pg2.assert_nothing_captured(remark="packets forwarded")
348         self.pg3.assert_nothing_captured(remark="packets forwarded")
349
350     def test_iacl_proto_tcp_dport(self):
351         """ TCP destination port iACL test
352
353         Test scenario for basic protocol ACL with TCP and dport
354             - Create IPv4 stream for pg0 -> pg1 interface.
355             - Create iACL with TCP IP protocol and defined dport.
356             - Send and verify received packets on pg1 interface.
357         """
358
359         # Basic iACL testing with TCP and dport
360         dport = 427
361         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
362                                   TCP(sport=1234, dport=dport))
363         self.pg0.add_stream(pkts)
364
365         key = 'proto_tcp_sport'
366         self.create_classify_table(
367             key, self.build_ip_mask(proto='ff', dst_port='ffff'))
368         self.create_classify_session(
369             self.acl_tbl_idx.get(key),
370             self.build_ip_match(proto=socket.IPPROTO_TCP, dst_port=dport))
371         self.input_acl_set_interface(
372             self.pg0, self.acl_tbl_idx.get(key))
373         self.acl_active_table = key
374
375         self.pg_enable_capture(self.pg_interfaces)
376         self.pg_start()
377
378         pkts = self.pg1.get_capture(len(pkts))
379         self.verify_capture(self.pg1, pkts, TCP)
380         self.pg0.assert_nothing_captured(remark="packets forwarded")
381         self.pg2.assert_nothing_captured(remark="packets forwarded")
382         self.pg3.assert_nothing_captured(remark="packets forwarded")
383
384     def test_iacl_proto_tcp_sport_dport(self):
385         """ TCP source and destination ports iACL test
386
387         Test scenario for basic protocol ACL with TCP and sport and dport
388             - Create IPv4 stream for pg0 -> pg1 interface.
389             - Create iACL with TCP IP protocol and defined sport and dport.
390             - Send and verify received packets on pg1 interface.
391         """
392
393         # Basic iACL testing with TCP and sport and dport
394         sport = 13720
395         dport = 9080
396         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
397                                   TCP(sport=sport, dport=dport))
398         self.pg0.add_stream(pkts)
399
400         key = 'proto_tcp_ports'
401         self.create_classify_table(
402             key,
403             self.build_ip_mask(proto='ff', src_port='ffff', dst_port='ffff'))
404         self.create_classify_session(
405             self.acl_tbl_idx.get(key),
406             self.build_ip_match(proto=socket.IPPROTO_TCP, src_port=sport,
407                                 dst_port=dport))
408         self.input_acl_set_interface(
409             self.pg0, self.acl_tbl_idx.get(key))
410         self.acl_active_table = key
411
412         self.pg_enable_capture(self.pg_interfaces)
413         self.pg_start()
414
415         pkts = self.pg1.get_capture(len(pkts))
416         self.verify_capture(self.pg1, pkts, TCP)
417         self.pg0.assert_nothing_captured(remark="packets forwarded")
418         self.pg2.assert_nothing_captured(remark="packets forwarded")
419         self.pg3.assert_nothing_captured(remark="packets forwarded")
420
421
422 class TestClassifierIPOut(TestClassifier):
423     """ Classifier output IP Test Case """
424
425     @classmethod
426     def setUpClass(cls):
427         super(TestClassifierIPOut, cls).setUpClass()
428
429     @classmethod
430     def tearDownClass(cls):
431         super(TestClassifierIPOut, cls).tearDownClass()
432
433     def test_acl_ip_out(self):
434         """ Output IP ACL test
435
436         Test scenario for basic IP ACL with source IP
437             - Create IPv4 stream for pg1 -> pg0 interface.
438             - Create ACL with source IP address.
439             - Send and verify received packets on pg0 interface.
440         """
441
442         # Basic oACL testing with source IP
443         pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes)
444         self.pg1.add_stream(pkts)
445
446         key = 'ip_out'
447         self.create_classify_table(
448             key, self.build_ip_mask(src_ip='ffffffff'), data_offset=0)
449         self.create_classify_session(
450             self.acl_tbl_idx.get(key),
451             self.build_ip_match(src_ip=self.pg1.remote_ip4))
452         self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
453         self.acl_active_table = key
454
455         self.pg_enable_capture(self.pg_interfaces)
456         self.pg_start()
457
458         pkts = self.pg0.get_capture(len(pkts))
459         self.verify_capture(self.pg0, pkts)
460         self.pg1.assert_nothing_captured(remark="packets forwarded")
461         self.pg2.assert_nothing_captured(remark="packets forwarded")
462         self.pg3.assert_nothing_captured(remark="packets forwarded")
463
464
465 class TestClassifierMAC(TestClassifier):
466     """ Classifier MAC Test Case """
467
468     @classmethod
469     def setUpClass(cls):
470         super(TestClassifierMAC, cls).setUpClass()
471
472     @classmethod
473     def tearDownClass(cls):
474         super(TestClassifierMAC, cls).tearDownClass()
475
476     def test_acl_mac(self):
477         """ MAC ACL test
478
479         Test scenario for basic MAC ACL with source MAC
480             - Create IPv4 stream for pg0 -> pg2 interface.
481             - Create ACL with source MAC address.
482             - Send and verify received packets on pg2 interface.
483         """
484
485         # Basic iACL testing with source MAC
486         pkts = self.create_stream(self.pg0, self.pg2, self.pg_if_packet_sizes)
487         self.pg0.add_stream(pkts)
488
489         key = 'mac'
490         self.create_classify_table(
491             key, self.build_mac_mask(src_mac='ffffffffffff'), data_offset=-14)
492         self.create_classify_session(
493             self.acl_tbl_idx.get(key),
494             self.build_mac_match(src_mac=self.pg0.remote_mac))
495         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
496         self.acl_active_table = key
497
498         self.pg_enable_capture(self.pg_interfaces)
499         self.pg_start()
500
501         pkts = self.pg2.get_capture(len(pkts))
502         self.verify_capture(self.pg2, pkts)
503         self.pg0.assert_nothing_captured(remark="packets forwarded")
504         self.pg1.assert_nothing_captured(remark="packets forwarded")
505         self.pg3.assert_nothing_captured(remark="packets forwarded")
506
507
508 class TestClassifierComplex(TestClassifier):
509     """ Large & Nested Classifiers Test Cases """
510
511     @classmethod
512     def setUpClass(cls):
513         super(TestClassifierComplex, cls).setUpClass()
514
515     @classmethod
516     def tearDownClass(cls):
517         super(TestClassifierComplex, cls).tearDownClass()
518
519     def test_iacl_large(self):
520         """ Large input ACL test
521
522         Test scenario for Large ACL matching on ethernet+ip+udp headers
523             - Create IPv4 stream for pg0 -> pg1 interface.
524             - Create large acl matching on ethernet+ip+udp header fields
525             - Send and verify received packets on pg1 interface.
526         """
527
528         # 40b offset = 80bytes - (sizeof(UDP/IP/ETH) + 4b)
529         # + 4b as build_ip_ma*() func, do not match against UDP Len & Chksum
530         msk = VarMask(offset=40, spec='ffff')
531         mth = VarMatch(offset=40, value=0x1234, length=2)
532
533         payload_msk = self.build_payload_mask([msk])
534         payload_match = self.build_payload_match([mth])
535
536         sport = 13720
537         dport = 9080
538
539         # 36b offset = 80bytes - (sizeof(UDP/IP/ETH))
540         packet_ex = bytes.fromhex(('0' * 36) + '1234')
541         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
542                                   UDP(sport=sport, dport=dport),
543                                   packet_ex)
544         self.pg0.add_stream(pkts)
545
546         key = 'large_in'
547         self.create_classify_table(
548             key,
549             self.build_mac_mask(src_mac='ffffffffffff',
550                                 dst_mac='ffffffffffff',
551                                 ether_type='ffff') +
552             self.build_ip_mask(proto='ff',
553                                src_ip='ffffffff',
554                                dst_ip='ffffffff',
555                                src_port='ffff',
556                                dst_port='ffff') +
557             payload_msk,
558             data_offset=-14)
559
560         self.create_classify_session(
561             self.acl_tbl_idx.get(key),
562             self.build_mac_match(src_mac=self.pg0.remote_mac,
563                                  dst_mac=self.pg0.local_mac,
564                                  # ipv4 next header
565                                  ether_type='0800') +
566             self.build_ip_match(proto=socket.IPPROTO_UDP,
567                                 src_ip=self.pg0.remote_ip4,
568                                 dst_ip=self.pg1.remote_ip4,
569                                 src_port=sport,
570                                 dst_port=dport) +
571             payload_match
572         )
573
574         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
575         self.acl_active_table = key
576
577         self.pg_enable_capture(self.pg_interfaces)
578         self.pg_start()
579
580         pkts = self.pg1.get_capture(len(pkts))
581         self.verify_capture(self.pg1, pkts)
582         self.pg0.assert_nothing_captured(remark="packets forwarded")
583         self.pg2.assert_nothing_captured(remark="packets forwarded")
584         self.pg3.assert_nothing_captured(remark="packets forwarded")
585
586     def test_oacl_large(self):
587         """ Large output ACL test
588         Test scenario for Large ACL matching on ethernet+ip+udp headers
589             - Create IPv4 stream for pg1 -> pg0 interface.
590             - Create large acl matching on ethernet+ip+udp header fields
591             - Send and verify received packets on pg0 interface.
592         """
593
594         # 40b offset = 80bytes - (sizeof(UDP/IP/ETH) + 4b)
595         # + 4b as build_ip_ma*() func, do not match against UDP Len & Chksum
596         msk = VarMask(offset=40, spec='ffff')
597         mth = VarMatch(offset=40, value=0x1234, length=2)
598
599         payload_msk = self.build_payload_mask([msk])
600         payload_match = self.build_payload_match([mth])
601
602         sport = 13720
603         dport = 9080
604
605         # 36b offset = 80bytes - (sizeof(UDP/IP/ETH))
606         packet_ex = bytes.fromhex(('0' * 36) + '1234')
607         pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes,
608                                   UDP(sport=sport, dport=dport),
609                                   packet_ex)
610         self.pg1.add_stream(pkts)
611
612         key = 'large_out'
613         self.create_classify_table(
614             key,
615             self.build_mac_mask(src_mac='ffffffffffff',
616                                 dst_mac='ffffffffffff',
617                                 ether_type='ffff') +
618             self.build_ip_mask(proto='ff',
619                                src_ip='ffffffff',
620                                dst_ip='ffffffff',
621                                src_port='ffff',
622                                dst_port='ffff') +
623             payload_msk,
624             data_offset=-14)
625
626         self.create_classify_session(
627             self.acl_tbl_idx.get(key),
628             self.build_mac_match(src_mac=self.pg0.local_mac,
629                                  dst_mac=self.pg0.remote_mac,
630                                  # ipv4 next header
631                                  ether_type='0800') +
632             self.build_ip_match(proto=socket.IPPROTO_UDP,
633                                 src_ip=self.pg1.remote_ip4,
634                                 dst_ip=self.pg0.remote_ip4,
635                                 src_port=sport,
636                                 dst_port=dport) +
637             payload_match)
638
639         self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
640         self.acl_active_table = key
641
642         self.pg_enable_capture(self.pg_interfaces)
643         self.pg_start()
644
645         pkts = self.pg0.get_capture(len(pkts))
646         self.verify_capture(self.pg0, pkts)
647         self.pg1.assert_nothing_captured(remark="packets forwarded")
648         self.pg2.assert_nothing_captured(remark="packets forwarded")
649         self.pg3.assert_nothing_captured(remark="packets forwarded")
650
651     def test_iacl_nested(self):
652         """ Nested input ACL test
653
654         Test scenario for Large ACL matching on ethernet+ip+udp headers
655             - Create IPv4 stream for pg0 -> pg1 interface.
656             - Create 1st classifier table, without any entries
657             - Create nested acl matching on ethernet+ip+udp header fields
658             - Send and verify received packets on pg1 interface.
659         """
660
661         sport = 13720
662         dport = 9080
663         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
664                                   UDP(sport=sport, dport=dport))
665
666         self.pg0.add_stream(pkts)
667
668         subtable_key = 'subtable_in'
669         self.create_classify_table(
670             subtable_key,
671             self.build_mac_mask(src_mac='ffffffffffff',
672                                 dst_mac='ffffffffffff',
673                                 ether_type='ffff') +
674             self.build_ip_mask(proto='ff',
675                                src_ip='ffffffff',
676                                dst_ip='ffffffff',
677                                src_port='ffff',
678                                dst_port='ffff'),
679             data_offset=-14)
680
681         key = 'nested_in'
682         self.create_classify_table(
683             key,
684             self.build_mac_mask(src_mac='ffffffffffff',
685                                 dst_mac='ffffffffffff',
686                                 ether_type='ffff') +
687             self.build_ip_mask(proto='ff',
688                                src_ip='ffffffff',
689                                dst_ip='ffffffff',
690                                src_port='ffff',
691                                dst_port='ffff'),
692             next_table_index=self.acl_tbl_idx.get(subtable_key))
693
694         self.create_classify_session(
695             self.acl_tbl_idx.get(subtable_key),
696             self.build_mac_match(src_mac=self.pg0.remote_mac,
697                                  dst_mac=self.pg0.local_mac,
698                                  # ipv4 next header
699                                  ether_type='0800') +
700             self.build_ip_match(proto=socket.IPPROTO_UDP,
701                                 src_ip=self.pg0.remote_ip4,
702                                 dst_ip=self.pg1.remote_ip4,
703                                 src_port=sport,
704                                 dst_port=dport))
705
706         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
707         self.acl_active_table = key
708
709         self.pg_enable_capture(self.pg_interfaces)
710         self.pg_start()
711
712         pkts = self.pg1.get_capture(len(pkts))
713         self.verify_capture(self.pg1, pkts)
714         self.pg0.assert_nothing_captured(remark="packets forwarded")
715         self.pg2.assert_nothing_captured(remark="packets forwarded")
716         self.pg3.assert_nothing_captured(remark="packets forwarded")
717
718     def test_oacl_nested(self):
719         """ Nested output ACL test
720
721         Test scenario for Large ACL matching on ethernet+ip+udp headers
722             - Create IPv4 stream for pg1 -> pg0 interface.
723             - Create 1st classifier table, without any entries
724             - Create nested acl matching on ethernet+ip+udp header fields
725             - Send and verify received packets on pg0 interface.
726         """
727
728         sport = 13720
729         dport = 9080
730         pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes,
731                                   UDP(sport=sport, dport=dport))
732         self.pg1.add_stream(pkts)
733
734         subtable_key = 'subtable_out'
735         self.create_classify_table(
736             subtable_key,
737             self.build_mac_mask(src_mac='ffffffffffff',
738                                 dst_mac='ffffffffffff',
739                                 ether_type='ffff') +
740             self.build_ip_mask(proto='ff',
741                                src_ip='ffffffff',
742                                dst_ip='ffffffff',
743                                src_port='ffff',
744                                dst_port='ffff'),
745             data_offset=-14)
746
747         key = 'nested_out'
748         self.create_classify_table(
749             key,
750             self.build_mac_mask(src_mac='ffffffffffff',
751                                 dst_mac='ffffffffffff',
752                                 ether_type='ffff') +
753             self.build_ip_mask(proto='ff',
754                                src_ip='ffffffff',
755                                dst_ip='ffffffff',
756                                src_port='ffff',
757                                dst_port='ffff'),
758             next_table_index=self.acl_tbl_idx.get(subtable_key),
759             data_offset=-14)
760
761         self.create_classify_session(
762             self.acl_tbl_idx.get(subtable_key),
763             self.build_mac_match(src_mac=self.pg0.local_mac,
764                                  dst_mac=self.pg0.remote_mac,
765                                  # ipv4 next header
766                                  ether_type='0800') +
767             self.build_ip_match(proto=socket.IPPROTO_UDP,
768                                 src_ip=self.pg1.remote_ip4,
769                                 dst_ip=self.pg0.remote_ip4,
770                                 src_port=sport,
771                                 dst_port=dport))
772
773         self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
774         self.acl_active_table = key
775
776         self.pg_enable_capture(self.pg_interfaces)
777         self.pg_start()
778
779         pkts = self.pg0.get_capture(len(pkts))
780         self.verify_capture(self.pg0, pkts)
781         self.pg1.assert_nothing_captured(remark="packets forwarded")
782         self.pg2.assert_nothing_captured(remark="packets forwarded")
783         self.pg3.assert_nothing_captured(remark="packets forwarded")
784
785
786 class TestClassifierPBR(TestClassifier):
787     """ Classifier PBR Test Case """
788
789     @classmethod
790     def setUpClass(cls):
791         super(TestClassifierPBR, cls).setUpClass()
792
793     @classmethod
794     def tearDownClass(cls):
795         super(TestClassifierPBR, cls).tearDownClass()
796
797     def test_acl_pbr(self):
798         """ IP PBR test
799
800         Test scenario for PBR with source IP
801             - Create IPv4 stream for pg0 -> pg3 interface.
802             - Configure PBR fib entry for packet forwarding.
803             - Send and verify received packets on pg3 interface.
804         """
805
806         # PBR testing with source IP
807         pkts = self.create_stream(self.pg0, self.pg3, self.pg_if_packet_sizes)
808         self.pg0.add_stream(pkts)
809
810         key = 'pbr'
811         self.create_classify_table(key, self.build_ip_mask(src_ip='ffffffff'))
812         pbr_option = 1
813         # this will create the VRF/table in which we will insert the route
814         self.create_classify_session(
815             self.acl_tbl_idx.get(key),
816             self.build_ip_match(src_ip=self.pg0.remote_ip4),
817             pbr_option, self.pbr_vrfid)
818         self.assertTrue(self.verify_vrf(self.pbr_vrfid))
819         r = VppIpRoute(self, self.pg3.local_ip4, 24,
820                        [VppRoutePath(self.pg3.remote_ip4,
821                                      INVALID_INDEX)],
822                        table_id=self.pbr_vrfid)
823         r.add_vpp_config()
824
825         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
826
827         self.pg_enable_capture(self.pg_interfaces)
828         self.pg_start()
829
830         pkts = self.pg3.get_capture(len(pkts))
831         self.verify_capture(self.pg3, pkts)
832         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key), 0)
833         self.pg0.assert_nothing_captured(remark="packets forwarded")
834         self.pg1.assert_nothing_captured(remark="packets forwarded")
835         self.pg2.assert_nothing_captured(remark="packets forwarded")
836
837         # remove the classify session and the route
838         r.remove_vpp_config()
839         self.create_classify_session(
840             self.acl_tbl_idx.get(key),
841             self.build_ip_match(src_ip=self.pg0.remote_ip4),
842             pbr_option, self.pbr_vrfid, is_add=0)
843
844         # and the table should be gone.
845         self.assertFalse(self.verify_vrf(self.pbr_vrfid))
846
847
848 class TestClassifierPunt(TestClassifier):
849     """ Classifier punt Test Case """
850
851     @classmethod
852     def setUpClass(cls):
853         super(TestClassifierPunt, cls).setUpClass()
854
855     @classmethod
856     def tearDownClass(cls):
857         super(TestClassifierPunt, cls).tearDownClass()
858
859     def test_punt_udp(self):
860         """ IPv4/UDP protocol punt ACL test
861
862         Test scenario for basic punt ACL with UDP protocol
863             - Create IPv4 stream for pg0 -> pg1 interface.
864             - Create punt ACL with UDP IP protocol.
865             - Send and verify received packets on pg1 interface.
866         """
867
868         sport = 6754
869         dport = 17923
870
871         key = 'ip4_udp_punt'
872         self.create_classify_table(
873             key,
874             self.build_ip_mask(
875                 src_ip='ffffffff',
876                 proto='ff',
877                 src_port='ffff'))
878         table_index = self.acl_tbl_idx.get(key)
879         self.vapi.punt_acl_add_del(ip4_table_index=table_index)
880         self.acl_active_table = key
881
882         # punt udp packets to dport received on pg0 through pg1
883         self.vapi.set_punt(
884             is_add=1,
885             punt={
886                 'type': VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4,
887                 'punt': {
888                     'l4': {
889                         'af': VppEnum.vl_api_address_family_t.ADDRESS_IP4,
890                         'protocol': VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP,
891                         'port': dport,
892                     }}})
893         self.vapi.ip_punt_redirect(punt={
894             'rx_sw_if_index': self.pg0.sw_if_index,
895             'tx_sw_if_index': self.pg1.sw_if_index,
896             'nh': self.pg1.remote_ip4,
897         })
898
899         pkts = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
900                  IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) /
901                  UDP(sport=sport, dport=dport) /
902                  Raw('\x17' * 100))] * 2
903
904         # allow a session but not matching the stream: expect to drop
905         self.create_classify_session(
906             table_index,
907             self.build_ip_match(src_ip=self.pg0.remote_ip4,
908                                 proto=socket.IPPROTO_UDP, src_port=sport + 10))
909         self.send_and_assert_no_replies(self.pg0, pkts)
910
911         # allow a session matching the stream: expect to pass
912         self.create_classify_session(
913             table_index,
914             self.build_ip_match(src_ip=self.pg0.remote_ip4,
915                                 proto=socket.IPPROTO_UDP, src_port=sport))
916         self.send_and_expect_only(self.pg0, pkts, self.pg1)
917
918         # test dump api: ip4 is set, ip6 is not
919         r = self.vapi.punt_acl_get()
920         self.assertEqual(r.ip4_table_index, table_index)
921         self.assertEqual(r.ip6_table_index, 0xffffffff)
922
923         # cleanup
924         self.acl_active_table = ''
925         self.vapi.punt_acl_add_del(ip4_table_index=table_index, is_add=0)
926
927         # test dump api: nothing set
928         r = self.vapi.punt_acl_get()
929         self.assertEqual(r.ip4_table_index, 0xffffffff)
930         self.assertEqual(r.ip6_table_index, 0xffffffff)
931
932
933 if __name__ == '__main__':
934     unittest.main(testRunner=VppTestRunner)