classify: Large and nested classifer unit tests
[vpp.git] / test / test_classifier.py
1 #!/usr/bin/env python3
2
3 import binascii
4 import socket
5 import unittest
6
7 from framework import VppTestCase, VppTestRunner
8 from scapy.packet import Raw, Packet
9
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP, UDP, TCP
12 from util import ppp
13 from template_classifier import TestClassifier, VarMask, VarMatch
14 from vpp_ip_route import VppIpRoute, VppRoutePath
15 from vpp_ip import INVALID_INDEX
16
17
18 # Tests split to different test case classes because of issue reported in
19 # ticket VPP-1336
20 class TestClassifierIP(TestClassifier):
21     """ Classifier IP Test Case """
22
23     @classmethod
24     def setUpClass(cls):
25         super(TestClassifierIP, cls).setUpClass()
26
27     @classmethod
28     def tearDownClass(cls):
29         super(TestClassifierIP, cls).tearDownClass()
30
31     def test_iacl_src_ip(self):
32         """ Source IP iACL test
33
34         Test scenario for basic IP ACL with source IP
35             - Create IPv4 stream for pg0 -> pg1 interface.
36             - Create iACL with source IP address.
37             - Send and verify received packets on pg1 interface.
38         """
39
40         # Basic iACL testing with source IP
41         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
42         self.pg0.add_stream(pkts)
43
44         key = 'ip_src'
45         self.create_classify_table(key, self.build_ip_mask(src_ip='ffffffff'))
46         self.create_classify_session(
47             self.acl_tbl_idx.get(key),
48             self.build_ip_match(src_ip=self.pg0.remote_ip4))
49         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
50         self.acl_active_table = key
51
52         self.pg_enable_capture(self.pg_interfaces)
53         self.pg_start()
54
55         pkts = self.pg1.get_capture(len(pkts))
56         self.verify_capture(self.pg1, pkts)
57         self.pg0.assert_nothing_captured(remark="packets forwarded")
58         self.pg2.assert_nothing_captured(remark="packets forwarded")
59         self.pg3.assert_nothing_captured(remark="packets forwarded")
60
61     def test_iacl_dst_ip(self):
62         """ Destination IP iACL test
63
64         Test scenario for basic IP ACL with destination IP
65             - Create IPv4 stream for pg0 -> pg1 interface.
66             - Create iACL with destination IP address.
67             - Send and verify received packets on pg1 interface.
68         """
69
70         # Basic iACL testing with destination IP
71         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
72         self.pg0.add_stream(pkts)
73
74         key = 'ip_dst'
75         self.create_classify_table(key, self.build_ip_mask(dst_ip='ffffffff'))
76         self.create_classify_session(
77             self.acl_tbl_idx.get(key),
78             self.build_ip_match(dst_ip=self.pg1.remote_ip4))
79         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
80         self.acl_active_table = key
81
82         self.pg_enable_capture(self.pg_interfaces)
83         self.pg_start()
84
85         pkts = self.pg1.get_capture(len(pkts))
86         self.verify_capture(self.pg1, pkts)
87         self.pg0.assert_nothing_captured(remark="packets forwarded")
88         self.pg2.assert_nothing_captured(remark="packets forwarded")
89         self.pg3.assert_nothing_captured(remark="packets forwarded")
90
91     def test_iacl_src_dst_ip(self):
92         """ Source and destination IP iACL test
93
94         Test scenario for basic IP ACL with source and destination IP
95             - Create IPv4 stream for pg0 -> pg1 interface.
96             - Create iACL with source and destination IP addresses.
97             - Send and verify received packets on pg1 interface.
98         """
99
100         # Basic iACL testing with source and destination IP
101         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
102         self.pg0.add_stream(pkts)
103
104         key = 'ip'
105         self.create_classify_table(
106             key, self.build_ip_mask(src_ip='ffffffff', dst_ip='ffffffff'))
107         self.create_classify_session(
108             self.acl_tbl_idx.get(key),
109             self.build_ip_match(src_ip=self.pg0.remote_ip4,
110                                 dst_ip=self.pg1.remote_ip4))
111         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
112         self.acl_active_table = key
113
114         self.pg_enable_capture(self.pg_interfaces)
115         self.pg_start()
116
117         pkts = self.pg1.get_capture(len(pkts))
118         self.verify_capture(self.pg1, pkts)
119         self.pg0.assert_nothing_captured(remark="packets forwarded")
120         self.pg2.assert_nothing_captured(remark="packets forwarded")
121         self.pg3.assert_nothing_captured(remark="packets forwarded")
122
123
124 class TestClassifierUDP(TestClassifier):
125     """ Classifier UDP proto Test Case """
126
127     @classmethod
128     def setUpClass(cls):
129         super(TestClassifierUDP, cls).setUpClass()
130
131     @classmethod
132     def tearDownClass(cls):
133         super(TestClassifierUDP, cls).tearDownClass()
134
135     def test_iacl_proto_udp(self):
136         """ UDP protocol iACL test
137
138         Test scenario for basic protocol ACL with UDP protocol
139             - Create IPv4 stream for pg0 -> pg1 interface.
140             - Create iACL with UDP IP protocol.
141             - Send and verify received packets on pg1 interface.
142         """
143
144         # Basic iACL testing with UDP protocol
145         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
146         self.pg0.add_stream(pkts)
147
148         key = 'proto_udp'
149         self.create_classify_table(key, self.build_ip_mask(proto='ff'))
150         self.create_classify_session(
151             self.acl_tbl_idx.get(key),
152             self.build_ip_match(proto=socket.IPPROTO_UDP))
153         self.input_acl_set_interface(
154             self.pg0, self.acl_tbl_idx.get(key))
155         self.acl_active_table = key
156
157         self.pg_enable_capture(self.pg_interfaces)
158         self.pg_start()
159
160         pkts = self.pg1.get_capture(len(pkts))
161         self.verify_capture(self.pg1, pkts)
162         self.pg0.assert_nothing_captured(remark="packets forwarded")
163         self.pg2.assert_nothing_captured(remark="packets forwarded")
164         self.pg3.assert_nothing_captured(remark="packets forwarded")
165
166     def test_iacl_proto_udp_sport(self):
167         """ UDP source port iACL test
168
169         Test scenario for basic protocol ACL with UDP and sport
170             - Create IPv4 stream for pg0 -> pg1 interface.
171             - Create iACL with UDP IP protocol and defined sport.
172             - Send and verify received packets on pg1 interface.
173         """
174
175         # Basic iACL testing with UDP and sport
176         sport = 38
177         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
178                                   UDP(sport=sport, dport=5678))
179         self.pg0.add_stream(pkts)
180
181         key = 'proto_udp_sport'
182         self.create_classify_table(
183             key, self.build_ip_mask(proto='ff', src_port='ffff'))
184         self.create_classify_session(
185             self.acl_tbl_idx.get(key),
186             self.build_ip_match(proto=socket.IPPROTO_UDP, src_port=sport))
187         self.input_acl_set_interface(
188             self.pg0, self.acl_tbl_idx.get(key))
189         self.acl_active_table = key
190
191         self.pg_enable_capture(self.pg_interfaces)
192         self.pg_start()
193
194         pkts = self.pg1.get_capture(len(pkts))
195         self.verify_capture(self.pg1, pkts)
196         self.pg0.assert_nothing_captured(remark="packets forwarded")
197         self.pg2.assert_nothing_captured(remark="packets forwarded")
198         self.pg3.assert_nothing_captured(remark="packets forwarded")
199
200     def test_iacl_proto_udp_dport(self):
201         """ UDP destination port iACL test
202
203         Test scenario for basic protocol ACL with UDP and dport
204             - Create IPv4 stream for pg0 -> pg1 interface.
205             - Create iACL with UDP IP protocol and defined dport.
206             - Send and verify received packets on pg1 interface.
207         """
208
209         # Basic iACL testing with UDP and dport
210         dport = 427
211         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
212                                   UDP(sport=1234, dport=dport))
213         self.pg0.add_stream(pkts)
214
215         key = 'proto_udp_dport'
216         self.create_classify_table(
217             key, self.build_ip_mask(proto='ff', dst_port='ffff'))
218         self.create_classify_session(
219             self.acl_tbl_idx.get(key),
220             self.build_ip_match(proto=socket.IPPROTO_UDP, dst_port=dport))
221         self.input_acl_set_interface(
222             self.pg0, self.acl_tbl_idx.get(key))
223         self.acl_active_table = key
224
225         self.pg_enable_capture(self.pg_interfaces)
226         self.pg_start()
227
228         pkts = self.pg1.get_capture(len(pkts))
229         self.verify_capture(self.pg1, pkts)
230         self.pg0.assert_nothing_captured(remark="packets forwarded")
231         self.pg2.assert_nothing_captured(remark="packets forwarded")
232         self.pg3.assert_nothing_captured(remark="packets forwarded")
233
234     def test_iacl_proto_udp_sport_dport(self):
235         """ UDP source and destination ports iACL test
236
237         Test scenario for basic protocol ACL with UDP and sport and dport
238             - Create IPv4 stream for pg0 -> pg1 interface.
239             - Create iACL with UDP IP protocol and defined sport and dport.
240             - Send and verify received packets on pg1 interface.
241         """
242
243         # Basic iACL testing with UDP and sport and dport
244         sport = 13720
245         dport = 9080
246         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
247                                   UDP(sport=sport, dport=dport))
248         self.pg0.add_stream(pkts)
249
250         key = 'proto_udp_ports'
251         self.create_classify_table(
252             key,
253             self.build_ip_mask(proto='ff', src_port='ffff', dst_port='ffff'))
254         self.create_classify_session(
255             self.acl_tbl_idx.get(key),
256             self.build_ip_match(proto=socket.IPPROTO_UDP, src_port=sport,
257                                 dst_port=dport))
258         self.input_acl_set_interface(
259             self.pg0, self.acl_tbl_idx.get(key))
260         self.acl_active_table = key
261
262         self.pg_enable_capture(self.pg_interfaces)
263         self.pg_start()
264
265         pkts = self.pg1.get_capture(len(pkts))
266         self.verify_capture(self.pg1, pkts)
267         self.pg0.assert_nothing_captured(remark="packets forwarded")
268         self.pg2.assert_nothing_captured(remark="packets forwarded")
269         self.pg3.assert_nothing_captured(remark="packets forwarded")
270
271
272 class TestClassifierTCP(TestClassifier):
273     """ Classifier TCP proto Test Case """
274
275     @classmethod
276     def setUpClass(cls):
277         super(TestClassifierTCP, cls).setUpClass()
278
279     @classmethod
280     def tearDownClass(cls):
281         super(TestClassifierTCP, cls).tearDownClass()
282
283     def test_iacl_proto_tcp(self):
284         """ TCP protocol iACL test
285
286         Test scenario for basic protocol ACL with TCP protocol
287             - Create IPv4 stream for pg0 -> pg1 interface.
288             - Create iACL with TCP IP protocol.
289             - Send and verify received packets on pg1 interface.
290         """
291
292         # Basic iACL testing with TCP protocol
293         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
294                                   TCP(sport=1234, dport=5678))
295         self.pg0.add_stream(pkts)
296
297         key = 'proto_tcp'
298         self.create_classify_table(key, self.build_ip_mask(proto='ff'))
299         self.create_classify_session(
300             self.acl_tbl_idx.get(key),
301             self.build_ip_match(proto=socket.IPPROTO_TCP))
302         self.input_acl_set_interface(
303             self.pg0, self.acl_tbl_idx.get(key))
304         self.acl_active_table = key
305
306         self.pg_enable_capture(self.pg_interfaces)
307         self.pg_start()
308
309         pkts = self.pg1.get_capture(len(pkts))
310         self.verify_capture(self.pg1, pkts, TCP)
311         self.pg0.assert_nothing_captured(remark="packets forwarded")
312         self.pg2.assert_nothing_captured(remark="packets forwarded")
313         self.pg3.assert_nothing_captured(remark="packets forwarded")
314
315     def test_iacl_proto_tcp_sport(self):
316         """ TCP source port iACL test
317
318         Test scenario for basic protocol ACL with TCP and sport
319             - Create IPv4 stream for pg0 -> pg1 interface.
320             - Create iACL with TCP IP protocol and defined sport.
321             - Send and verify received packets on pg1 interface.
322         """
323
324         # Basic iACL testing with TCP and sport
325         sport = 38
326         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
327                                   TCP(sport=sport, dport=5678))
328         self.pg0.add_stream(pkts)
329
330         key = 'proto_tcp_sport'
331         self.create_classify_table(
332             key, self.build_ip_mask(proto='ff', src_port='ffff'))
333         self.create_classify_session(
334             self.acl_tbl_idx.get(key),
335             self.build_ip_match(proto=socket.IPPROTO_TCP, src_port=sport))
336         self.input_acl_set_interface(
337             self.pg0, self.acl_tbl_idx.get(key))
338         self.acl_active_table = key
339
340         self.pg_enable_capture(self.pg_interfaces)
341         self.pg_start()
342
343         pkts = self.pg1.get_capture(len(pkts))
344         self.verify_capture(self.pg1, pkts, TCP)
345         self.pg0.assert_nothing_captured(remark="packets forwarded")
346         self.pg2.assert_nothing_captured(remark="packets forwarded")
347         self.pg3.assert_nothing_captured(remark="packets forwarded")
348
349     def test_iacl_proto_tcp_dport(self):
350         """ TCP destination port iACL test
351
352         Test scenario for basic protocol ACL with TCP and dport
353             - Create IPv4 stream for pg0 -> pg1 interface.
354             - Create iACL with TCP IP protocol and defined dport.
355             - Send and verify received packets on pg1 interface.
356         """
357
358         # Basic iACL testing with TCP and dport
359         dport = 427
360         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
361                                   TCP(sport=1234, dport=dport))
362         self.pg0.add_stream(pkts)
363
364         key = 'proto_tcp_sport'
365         self.create_classify_table(
366             key, self.build_ip_mask(proto='ff', dst_port='ffff'))
367         self.create_classify_session(
368             self.acl_tbl_idx.get(key),
369             self.build_ip_match(proto=socket.IPPROTO_TCP, dst_port=dport))
370         self.input_acl_set_interface(
371             self.pg0, self.acl_tbl_idx.get(key))
372         self.acl_active_table = key
373
374         self.pg_enable_capture(self.pg_interfaces)
375         self.pg_start()
376
377         pkts = self.pg1.get_capture(len(pkts))
378         self.verify_capture(self.pg1, pkts, TCP)
379         self.pg0.assert_nothing_captured(remark="packets forwarded")
380         self.pg2.assert_nothing_captured(remark="packets forwarded")
381         self.pg3.assert_nothing_captured(remark="packets forwarded")
382
383     def test_iacl_proto_tcp_sport_dport(self):
384         """ TCP source and destination ports iACL test
385
386         Test scenario for basic protocol ACL with TCP and sport and dport
387             - Create IPv4 stream for pg0 -> pg1 interface.
388             - Create iACL with TCP IP protocol and defined sport and dport.
389             - Send and verify received packets on pg1 interface.
390         """
391
392         # Basic iACL testing with TCP and sport and dport
393         sport = 13720
394         dport = 9080
395         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
396                                   TCP(sport=sport, dport=dport))
397         self.pg0.add_stream(pkts)
398
399         key = 'proto_tcp_ports'
400         self.create_classify_table(
401             key,
402             self.build_ip_mask(proto='ff', src_port='ffff', dst_port='ffff'))
403         self.create_classify_session(
404             self.acl_tbl_idx.get(key),
405             self.build_ip_match(proto=socket.IPPROTO_TCP, src_port=sport,
406                                 dst_port=dport))
407         self.input_acl_set_interface(
408             self.pg0, self.acl_tbl_idx.get(key))
409         self.acl_active_table = key
410
411         self.pg_enable_capture(self.pg_interfaces)
412         self.pg_start()
413
414         pkts = self.pg1.get_capture(len(pkts))
415         self.verify_capture(self.pg1, pkts, TCP)
416         self.pg0.assert_nothing_captured(remark="packets forwarded")
417         self.pg2.assert_nothing_captured(remark="packets forwarded")
418         self.pg3.assert_nothing_captured(remark="packets forwarded")
419
420
421 class TestClassifierIPOut(TestClassifier):
422     """ Classifier output IP Test Case """
423
424     @classmethod
425     def setUpClass(cls):
426         super(TestClassifierIPOut, cls).setUpClass()
427
428     @classmethod
429     def tearDownClass(cls):
430         super(TestClassifierIPOut, cls).tearDownClass()
431
432     def test_acl_ip_out(self):
433         """ Output IP ACL test
434
435         Test scenario for basic IP ACL with source IP
436             - Create IPv4 stream for pg1 -> pg0 interface.
437             - Create ACL with source IP address.
438             - Send and verify received packets on pg0 interface.
439         """
440
441         # Basic oACL testing with source IP
442         pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes)
443         self.pg1.add_stream(pkts)
444
445         key = 'ip_out'
446         self.create_classify_table(
447             key, self.build_ip_mask(src_ip='ffffffff'), data_offset=0)
448         self.create_classify_session(
449             self.acl_tbl_idx.get(key),
450             self.build_ip_match(src_ip=self.pg1.remote_ip4))
451         self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
452         self.acl_active_table = key
453
454         self.pg_enable_capture(self.pg_interfaces)
455         self.pg_start()
456
457         pkts = self.pg0.get_capture(len(pkts))
458         self.verify_capture(self.pg0, pkts)
459         self.pg1.assert_nothing_captured(remark="packets forwarded")
460         self.pg2.assert_nothing_captured(remark="packets forwarded")
461         self.pg3.assert_nothing_captured(remark="packets forwarded")
462
463
464 class TestClassifierMAC(TestClassifier):
465     """ Classifier MAC Test Case """
466
467     @classmethod
468     def setUpClass(cls):
469         super(TestClassifierMAC, cls).setUpClass()
470
471     @classmethod
472     def tearDownClass(cls):
473         super(TestClassifierMAC, cls).tearDownClass()
474
475     def test_acl_mac(self):
476         """ MAC ACL test
477
478         Test scenario for basic MAC ACL with source MAC
479             - Create IPv4 stream for pg0 -> pg2 interface.
480             - Create ACL with source MAC address.
481             - Send and verify received packets on pg2 interface.
482         """
483
484         # Basic iACL testing with source MAC
485         pkts = self.create_stream(self.pg0, self.pg2, self.pg_if_packet_sizes)
486         self.pg0.add_stream(pkts)
487
488         key = 'mac'
489         self.create_classify_table(
490             key, self.build_mac_mask(src_mac='ffffffffffff'), data_offset=-14)
491         self.create_classify_session(
492             self.acl_tbl_idx.get(key),
493             self.build_mac_match(src_mac=self.pg0.remote_mac))
494         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
495         self.acl_active_table = key
496
497         self.pg_enable_capture(self.pg_interfaces)
498         self.pg_start()
499
500         pkts = self.pg2.get_capture(len(pkts))
501         self.verify_capture(self.pg2, pkts)
502         self.pg0.assert_nothing_captured(remark="packets forwarded")
503         self.pg1.assert_nothing_captured(remark="packets forwarded")
504         self.pg3.assert_nothing_captured(remark="packets forwarded")
505
506
507 class TestClassifierComplex(TestClassifier):
508     """ Large & Nested Classifiers Test Cases """
509
510     @classmethod
511     def setUpClass(cls):
512         super(TestClassifierComplex, cls).setUpClass()
513
514     @classmethod
515     def tearDownClass(cls):
516         super(TestClassifierComplex, cls).tearDownClass()
517
518     def test_iacl_large(self):
519         """ Large input ACL test
520
521         Test scenario for Large ACL matching on ethernet+ip+udp headers
522             - Create IPv4 stream for pg0 -> pg1 interface.
523             - Create large acl matching on ethernet+ip+udp header fields
524             - Send and verify received packets on pg1 interface.
525         """
526
527         # 40b offset = 80bytes - (sizeof(UDP/IP/ETH) + 4b)
528         # + 4b as build_ip_ma*() func, do not match against UDP Len & Chksum
529         msk = VarMask(offset=40, spec='ffff')
530         mth = VarMatch(offset=40, value=0x1234, length=2)
531
532         payload_msk = self.build_payload_mask([msk])
533         payload_match = self.build_payload_match([mth])
534
535         sport = 13720
536         dport = 9080
537
538         # 36b offset = 80bytes - (sizeof(UDP/IP/ETH))
539         packet_ex = bytes.fromhex(('0' * 36) + '1234')
540         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
541                                   UDP(sport=sport, dport=dport),
542                                   packet_ex)
543         self.pg0.add_stream(pkts)
544
545         key = 'large_in'
546         self.create_classify_table(
547             key,
548             self.build_mac_mask(src_mac='ffffffffffff',
549                                 dst_mac='ffffffffffff',
550                                 ether_type='ffff') +
551             self.build_ip_mask(proto='ff',
552                                src_ip='ffffffff',
553                                dst_ip='ffffffff',
554                                src_port='ffff',
555                                dst_port='ffff') +
556             payload_msk,
557             data_offset=-14)
558
559         self.create_classify_session(
560             self.acl_tbl_idx.get(key),
561             self.build_mac_match(src_mac=self.pg0.remote_mac,
562                                  dst_mac=self.pg0.local_mac,
563                                  # ipv4 next header
564                                  ether_type='0800') +
565             self.build_ip_match(proto=socket.IPPROTO_UDP,
566                                 src_ip=self.pg0.remote_ip4,
567                                 dst_ip=self.pg1.remote_ip4,
568                                 src_port=sport,
569                                 dst_port=dport) +
570             payload_match
571         )
572
573         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
574         self.acl_active_table = key
575
576         self.pg_enable_capture(self.pg_interfaces)
577         self.pg_start()
578
579         pkts = self.pg1.get_capture(len(pkts))
580         self.verify_capture(self.pg1, pkts)
581         self.pg0.assert_nothing_captured(remark="packets forwarded")
582         self.pg2.assert_nothing_captured(remark="packets forwarded")
583         self.pg3.assert_nothing_captured(remark="packets forwarded")
584
585     def test_oacl_large(self):
586         """ Large output ACL test
587         Test scenario for Large ACL matching on ethernet+ip+udp headers
588             - Create IPv4 stream for pg1 -> pg0 interface.
589             - Create large acl matching on ethernet+ip+udp header fields
590             - Send and verify received packets on pg0 interface.
591         """
592
593         # 40b offset = 80bytes - (sizeof(UDP/IP/ETH) + 4b)
594         # + 4b as build_ip_ma*() func, do not match against UDP Len & Chksum
595         msk = VarMask(offset=40, spec='ffff')
596         mth = VarMatch(offset=40, value=0x1234, length=2)
597
598         payload_msk = self.build_payload_mask([msk])
599         payload_match = self.build_payload_match([mth])
600
601         sport = 13720
602         dport = 9080
603
604         # 36b offset = 80bytes - (sizeof(UDP/IP/ETH))
605         packet_ex = bytes.fromhex(('0' * 36) + '1234')
606         pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes,
607                                   UDP(sport=sport, dport=dport),
608                                   packet_ex)
609         self.pg1.add_stream(pkts)
610
611         key = 'large_out'
612         self.create_classify_table(
613             key,
614             self.build_mac_mask(src_mac='ffffffffffff',
615                                 dst_mac='ffffffffffff',
616                                 ether_type='ffff') +
617             self.build_ip_mask(proto='ff',
618                                src_ip='ffffffff',
619                                dst_ip='ffffffff',
620                                src_port='ffff',
621                                dst_port='ffff') +
622             payload_msk,
623             data_offset=-14)
624
625         self.create_classify_session(
626             self.acl_tbl_idx.get(key),
627             self.build_mac_match(src_mac=self.pg0.local_mac,
628                                  dst_mac=self.pg0.remote_mac,
629                                  # ipv4 next header
630                                  ether_type='0800') +
631             self.build_ip_match(proto=socket.IPPROTO_UDP,
632                                 src_ip=self.pg1.remote_ip4,
633                                 dst_ip=self.pg0.remote_ip4,
634                                 src_port=sport,
635                                 dst_port=dport) +
636             payload_match)
637
638         self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
639         self.acl_active_table = key
640
641         self.pg_enable_capture(self.pg_interfaces)
642         self.pg_start()
643
644         pkts = self.pg0.get_capture(len(pkts))
645         self.verify_capture(self.pg0, pkts)
646         self.pg1.assert_nothing_captured(remark="packets forwarded")
647         self.pg2.assert_nothing_captured(remark="packets forwarded")
648         self.pg3.assert_nothing_captured(remark="packets forwarded")
649
650     def test_iacl_nested(self):
651         """ Nested input ACL test
652
653         Test scenario for Large ACL matching on ethernet+ip+udp headers
654             - Create IPv4 stream for pg0 -> pg1 interface.
655             - Create 1st classifier table, without any entries
656             - Create nested acl matching on ethernet+ip+udp header fields
657             - Send and verify received packets on pg1 interface.
658         """
659
660         sport = 13720
661         dport = 9080
662         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
663                                   UDP(sport=sport, dport=dport))
664
665         self.pg0.add_stream(pkts)
666
667         subtable_key = 'subtable_in'
668         self.create_classify_table(
669             subtable_key,
670             self.build_mac_mask(src_mac='ffffffffffff',
671                                 dst_mac='ffffffffffff',
672                                 ether_type='ffff') +
673             self.build_ip_mask(proto='ff',
674                                src_ip='ffffffff',
675                                dst_ip='ffffffff',
676                                src_port='ffff',
677                                dst_port='ffff'),
678             data_offset=-14)
679
680         key = 'nested_in'
681         self.create_classify_table(
682             key,
683             self.build_mac_mask(src_mac='ffffffffffff',
684                                 dst_mac='ffffffffffff',
685                                 ether_type='ffff') +
686             self.build_ip_mask(proto='ff',
687                                src_ip='ffffffff',
688                                dst_ip='ffffffff',
689                                src_port='ffff',
690                                dst_port='ffff'),
691             next_table_index=self.acl_tbl_idx.get(subtable_key))
692
693         self.create_classify_session(
694             self.acl_tbl_idx.get(subtable_key),
695             self.build_mac_match(src_mac=self.pg0.remote_mac,
696                                  dst_mac=self.pg0.local_mac,
697                                  # ipv4 next header
698                                  ether_type='0800') +
699             self.build_ip_match(proto=socket.IPPROTO_UDP,
700                                 src_ip=self.pg0.remote_ip4,
701                                 dst_ip=self.pg1.remote_ip4,
702                                 src_port=sport,
703                                 dst_port=dport))
704
705         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
706         self.acl_active_table = key
707
708         self.pg_enable_capture(self.pg_interfaces)
709         self.pg_start()
710
711         pkts = self.pg1.get_capture(len(pkts))
712         self.verify_capture(self.pg1, pkts)
713         self.pg0.assert_nothing_captured(remark="packets forwarded")
714         self.pg2.assert_nothing_captured(remark="packets forwarded")
715         self.pg3.assert_nothing_captured(remark="packets forwarded")
716
717     def test_oacl_nested(self):
718         """ Nested output ACL test
719
720         Test scenario for Large ACL matching on ethernet+ip+udp headers
721             - Create IPv4 stream for pg1 -> pg0 interface.
722             - Create 1st classifier table, without any entries
723             - Create nested acl matching on ethernet+ip+udp header fields
724             - Send and verify received packets on pg0 interface.
725         """
726
727         sport = 13720
728         dport = 9080
729         pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes,
730                                   UDP(sport=sport, dport=dport))
731         self.pg1.add_stream(pkts)
732
733         subtable_key = 'subtable_out'
734         self.create_classify_table(
735             subtable_key,
736             self.build_mac_mask(src_mac='ffffffffffff',
737                                 dst_mac='ffffffffffff',
738                                 ether_type='ffff') +
739             self.build_ip_mask(proto='ff',
740                                src_ip='ffffffff',
741                                dst_ip='ffffffff',
742                                src_port='ffff',
743                                dst_port='ffff'),
744             data_offset=-14)
745
746         key = 'nested_out'
747         self.create_classify_table(
748             key,
749             self.build_mac_mask(src_mac='ffffffffffff',
750                                 dst_mac='ffffffffffff',
751                                 ether_type='ffff') +
752             self.build_ip_mask(proto='ff',
753                                src_ip='ffffffff',
754                                dst_ip='ffffffff',
755                                src_port='ffff',
756                                dst_port='ffff'),
757             next_table_index=self.acl_tbl_idx.get(subtable_key),
758             data_offset=-14)
759
760         self.create_classify_session(
761             self.acl_tbl_idx.get(subtable_key),
762             self.build_mac_match(src_mac=self.pg0.local_mac,
763                                  dst_mac=self.pg0.remote_mac,
764                                  # ipv4 next header
765                                  ether_type='0800') +
766             self.build_ip_match(proto=socket.IPPROTO_UDP,
767                                 src_ip=self.pg1.remote_ip4,
768                                 dst_ip=self.pg0.remote_ip4,
769                                 src_port=sport,
770                                 dst_port=dport))
771
772         self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
773         self.acl_active_table = key
774
775         self.pg_enable_capture(self.pg_interfaces)
776         self.pg_start()
777
778         pkts = self.pg0.get_capture(len(pkts))
779         self.verify_capture(self.pg0, pkts)
780         self.pg1.assert_nothing_captured(remark="packets forwarded")
781         self.pg2.assert_nothing_captured(remark="packets forwarded")
782         self.pg3.assert_nothing_captured(remark="packets forwarded")
783
784
785 class TestClassifierPBR(TestClassifier):
786     """ Classifier PBR Test Case """
787
788     @classmethod
789     def setUpClass(cls):
790         super(TestClassifierPBR, cls).setUpClass()
791
792     @classmethod
793     def tearDownClass(cls):
794         super(TestClassifierPBR, cls).tearDownClass()
795
796     def test_acl_pbr(self):
797         """ IP PBR test
798
799         Test scenario for PBR with source IP
800             - Create IPv4 stream for pg0 -> pg3 interface.
801             - Configure PBR fib entry for packet forwarding.
802             - Send and verify received packets on pg3 interface.
803         """
804
805         # PBR testing with source IP
806         pkts = self.create_stream(self.pg0, self.pg3, self.pg_if_packet_sizes)
807         self.pg0.add_stream(pkts)
808
809         key = 'pbr'
810         self.create_classify_table(key, self.build_ip_mask(src_ip='ffffffff'))
811         pbr_option = 1
812         # this will create the VRF/table in which we will insert the route
813         self.create_classify_session(
814             self.acl_tbl_idx.get(key),
815             self.build_ip_match(src_ip=self.pg0.remote_ip4),
816             pbr_option, self.pbr_vrfid)
817         self.assertTrue(self.verify_vrf(self.pbr_vrfid))
818         r = VppIpRoute(self, self.pg3.local_ip4, 24,
819                        [VppRoutePath(self.pg3.remote_ip4,
820                                      INVALID_INDEX)],
821                        table_id=self.pbr_vrfid)
822         r.add_vpp_config()
823
824         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
825
826         self.pg_enable_capture(self.pg_interfaces)
827         self.pg_start()
828
829         pkts = self.pg3.get_capture(len(pkts))
830         self.verify_capture(self.pg3, pkts)
831         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key), 0)
832         self.pg0.assert_nothing_captured(remark="packets forwarded")
833         self.pg1.assert_nothing_captured(remark="packets forwarded")
834         self.pg2.assert_nothing_captured(remark="packets forwarded")
835
836         # remove the classify session and the route
837         r.remove_vpp_config()
838         self.create_classify_session(
839             self.acl_tbl_idx.get(key),
840             self.build_ip_match(src_ip=self.pg0.remote_ip4),
841             pbr_option, self.pbr_vrfid, is_add=0)
842
843         # and the table should be gone.
844         self.assertFalse(self.verify_vrf(self.pbr_vrfid))
845
846 if __name__ == '__main__':
847     unittest.main(testRunner=VppTestRunner)