classify: API cleanup
[vpp.git] / test / test_classifier.py
1 #!/usr/bin/env python3
2
3 import binascii
4 import socket
5 import unittest
6
7 from framework import VppTestCase, VppTestRunner
8
9 from scapy.packet import Raw
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP, UDP, TCP
12 from util import ppp
13 from template_classifier import TestClassifier
14 from vpp_ip_route import VppIpRoute, VppRoutePath
15 from vpp_ip import INVALID_INDEX
16
17
18 # Tests split to different test case classes because of issue reported in
19 # ticket VPP-1336
20 class TestClassifierIP(TestClassifier):
21     """ Classifier IP Test Case """
22
23     @classmethod
24     def setUpClass(cls):
25         super(TestClassifierIP, cls).setUpClass()
26
27     @classmethod
28     def tearDownClass(cls):
29         super(TestClassifierIP, cls).tearDownClass()
30
31     def test_iacl_src_ip(self):
32         """ Source IP iACL test
33
34         Test scenario for basic IP ACL with source IP
35             - Create IPv4 stream for pg0 -> pg1 interface.
36             - Create iACL with source IP address.
37             - Send and verify received packets on pg1 interface.
38         """
39
40         # Basic iACL testing with source IP
41         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
42         self.pg0.add_stream(pkts)
43
44         key = 'ip_src'
45         self.create_classify_table(key, self.build_ip_mask(src_ip='ffffffff'))
46         self.create_classify_session(
47             self.acl_tbl_idx.get(key),
48             self.build_ip_match(src_ip=self.pg0.remote_ip4))
49         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
50         self.acl_active_table = key
51
52         self.pg_enable_capture(self.pg_interfaces)
53         self.pg_start()
54
55         pkts = self.pg1.get_capture(len(pkts))
56         self.verify_capture(self.pg1, pkts)
57         self.pg0.assert_nothing_captured(remark="packets forwarded")
58         self.pg2.assert_nothing_captured(remark="packets forwarded")
59         self.pg3.assert_nothing_captured(remark="packets forwarded")
60
61     def test_iacl_dst_ip(self):
62         """ Destination IP iACL test
63
64         Test scenario for basic IP ACL with destination IP
65             - Create IPv4 stream for pg0 -> pg1 interface.
66             - Create iACL with destination IP address.
67             - Send and verify received packets on pg1 interface.
68         """
69
70         # Basic iACL testing with destination IP
71         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
72         self.pg0.add_stream(pkts)
73
74         key = 'ip_dst'
75         self.create_classify_table(key, self.build_ip_mask(dst_ip='ffffffff'))
76         self.create_classify_session(
77             self.acl_tbl_idx.get(key),
78             self.build_ip_match(dst_ip=self.pg1.remote_ip4))
79         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
80         self.acl_active_table = key
81
82         self.pg_enable_capture(self.pg_interfaces)
83         self.pg_start()
84
85         pkts = self.pg1.get_capture(len(pkts))
86         self.verify_capture(self.pg1, pkts)
87         self.pg0.assert_nothing_captured(remark="packets forwarded")
88         self.pg2.assert_nothing_captured(remark="packets forwarded")
89         self.pg3.assert_nothing_captured(remark="packets forwarded")
90
91     def test_iacl_src_dst_ip(self):
92         """ Source and destination IP iACL test
93
94         Test scenario for basic IP ACL with source and destination IP
95             - Create IPv4 stream for pg0 -> pg1 interface.
96             - Create iACL with source and destination IP addresses.
97             - Send and verify received packets on pg1 interface.
98         """
99
100         # Basic iACL testing with source and destination IP
101         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
102         self.pg0.add_stream(pkts)
103
104         key = 'ip'
105         self.create_classify_table(
106             key, self.build_ip_mask(src_ip='ffffffff', dst_ip='ffffffff'))
107         self.create_classify_session(
108             self.acl_tbl_idx.get(key),
109             self.build_ip_match(src_ip=self.pg0.remote_ip4,
110                                 dst_ip=self.pg1.remote_ip4))
111         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
112         self.acl_active_table = key
113
114         self.pg_enable_capture(self.pg_interfaces)
115         self.pg_start()
116
117         pkts = self.pg1.get_capture(len(pkts))
118         self.verify_capture(self.pg1, pkts)
119         self.pg0.assert_nothing_captured(remark="packets forwarded")
120         self.pg2.assert_nothing_captured(remark="packets forwarded")
121         self.pg3.assert_nothing_captured(remark="packets forwarded")
122
123
124 class TestClassifierUDP(TestClassifier):
125     """ Classifier UDP proto Test Case """
126
127     @classmethod
128     def setUpClass(cls):
129         super(TestClassifierUDP, cls).setUpClass()
130
131     @classmethod
132     def tearDownClass(cls):
133         super(TestClassifierUDP, cls).tearDownClass()
134
135     def test_iacl_proto_udp(self):
136         """ UDP protocol iACL test
137
138         Test scenario for basic protocol ACL with UDP protocol
139             - Create IPv4 stream for pg0 -> pg1 interface.
140             - Create iACL with UDP IP protocol.
141             - Send and verify received packets on pg1 interface.
142         """
143
144         # Basic iACL testing with UDP protocol
145         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
146         self.pg0.add_stream(pkts)
147
148         key = 'proto_udp'
149         self.create_classify_table(key, self.build_ip_mask(proto='ff'))
150         self.create_classify_session(
151             self.acl_tbl_idx.get(key),
152             self.build_ip_match(proto=socket.IPPROTO_UDP))
153         self.input_acl_set_interface(
154             self.pg0, self.acl_tbl_idx.get(key))
155         self.acl_active_table = key
156
157         self.pg_enable_capture(self.pg_interfaces)
158         self.pg_start()
159
160         pkts = self.pg1.get_capture(len(pkts))
161         self.verify_capture(self.pg1, pkts)
162         self.pg0.assert_nothing_captured(remark="packets forwarded")
163         self.pg2.assert_nothing_captured(remark="packets forwarded")
164         self.pg3.assert_nothing_captured(remark="packets forwarded")
165
166     def test_iacl_proto_udp_sport(self):
167         """ UDP source port iACL test
168
169         Test scenario for basic protocol ACL with UDP and sport
170             - Create IPv4 stream for pg0 -> pg1 interface.
171             - Create iACL with UDP IP protocol and defined sport.
172             - Send and verify received packets on pg1 interface.
173         """
174
175         # Basic iACL testing with UDP and sport
176         sport = 38
177         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
178                                   UDP(sport=sport, dport=5678))
179         self.pg0.add_stream(pkts)
180
181         key = 'proto_udp_sport'
182         self.create_classify_table(
183             key, self.build_ip_mask(proto='ff', src_port='ffff'))
184         self.create_classify_session(
185             self.acl_tbl_idx.get(key),
186             self.build_ip_match(proto=socket.IPPROTO_UDP, src_port=sport))
187         self.input_acl_set_interface(
188             self.pg0, self.acl_tbl_idx.get(key))
189         self.acl_active_table = key
190
191         self.pg_enable_capture(self.pg_interfaces)
192         self.pg_start()
193
194         pkts = self.pg1.get_capture(len(pkts))
195         self.verify_capture(self.pg1, pkts)
196         self.pg0.assert_nothing_captured(remark="packets forwarded")
197         self.pg2.assert_nothing_captured(remark="packets forwarded")
198         self.pg3.assert_nothing_captured(remark="packets forwarded")
199
200     def test_iacl_proto_udp_dport(self):
201         """ UDP destination port iACL test
202
203         Test scenario for basic protocol ACL with UDP and dport
204             - Create IPv4 stream for pg0 -> pg1 interface.
205             - Create iACL with UDP IP protocol and defined dport.
206             - Send and verify received packets on pg1 interface.
207         """
208
209         # Basic iACL testing with UDP and dport
210         dport = 427
211         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
212                                   UDP(sport=1234, dport=dport))
213         self.pg0.add_stream(pkts)
214
215         key = 'proto_udp_dport'
216         self.create_classify_table(
217             key, self.build_ip_mask(proto='ff', dst_port='ffff'))
218         self.create_classify_session(
219             self.acl_tbl_idx.get(key),
220             self.build_ip_match(proto=socket.IPPROTO_UDP, dst_port=dport))
221         self.input_acl_set_interface(
222             self.pg0, self.acl_tbl_idx.get(key))
223         self.acl_active_table = key
224
225         self.pg_enable_capture(self.pg_interfaces)
226         self.pg_start()
227
228         pkts = self.pg1.get_capture(len(pkts))
229         self.verify_capture(self.pg1, pkts)
230         self.pg0.assert_nothing_captured(remark="packets forwarded")
231         self.pg2.assert_nothing_captured(remark="packets forwarded")
232         self.pg3.assert_nothing_captured(remark="packets forwarded")
233
234     def test_iacl_proto_udp_sport_dport(self):
235         """ UDP source and destination ports iACL test
236
237         Test scenario for basic protocol ACL with UDP and sport and dport
238             - Create IPv4 stream for pg0 -> pg1 interface.
239             - Create iACL with UDP IP protocol and defined sport and dport.
240             - Send and verify received packets on pg1 interface.
241         """
242
243         # Basic iACL testing with UDP and sport and dport
244         sport = 13720
245         dport = 9080
246         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
247                                   UDP(sport=sport, dport=dport))
248         self.pg0.add_stream(pkts)
249
250         key = 'proto_udp_ports'
251         self.create_classify_table(
252             key,
253             self.build_ip_mask(proto='ff', src_port='ffff', dst_port='ffff'))
254         self.create_classify_session(
255             self.acl_tbl_idx.get(key),
256             self.build_ip_match(proto=socket.IPPROTO_UDP, src_port=sport,
257                                 dst_port=dport))
258         self.input_acl_set_interface(
259             self.pg0, self.acl_tbl_idx.get(key))
260         self.acl_active_table = key
261
262         self.pg_enable_capture(self.pg_interfaces)
263         self.pg_start()
264
265         pkts = self.pg1.get_capture(len(pkts))
266         self.verify_capture(self.pg1, pkts)
267         self.pg0.assert_nothing_captured(remark="packets forwarded")
268         self.pg2.assert_nothing_captured(remark="packets forwarded")
269         self.pg3.assert_nothing_captured(remark="packets forwarded")
270
271
272 class TestClassifierTCP(TestClassifier):
273     """ Classifier TCP proto Test Case """
274
275     @classmethod
276     def setUpClass(cls):
277         super(TestClassifierTCP, cls).setUpClass()
278
279     @classmethod
280     def tearDownClass(cls):
281         super(TestClassifierTCP, cls).tearDownClass()
282
283     def test_iacl_proto_tcp(self):
284         """ TCP protocol iACL test
285
286         Test scenario for basic protocol ACL with TCP protocol
287             - Create IPv4 stream for pg0 -> pg1 interface.
288             - Create iACL with TCP IP protocol.
289             - Send and verify received packets on pg1 interface.
290         """
291
292         # Basic iACL testing with TCP protocol
293         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
294                                   TCP(sport=1234, dport=5678))
295         self.pg0.add_stream(pkts)
296
297         key = 'proto_tcp'
298         self.create_classify_table(key, self.build_ip_mask(proto='ff'))
299         self.create_classify_session(
300             self.acl_tbl_idx.get(key),
301             self.build_ip_match(proto=socket.IPPROTO_TCP))
302         self.input_acl_set_interface(
303             self.pg0, self.acl_tbl_idx.get(key))
304         self.acl_active_table = key
305
306         self.pg_enable_capture(self.pg_interfaces)
307         self.pg_start()
308
309         pkts = self.pg1.get_capture(len(pkts))
310         self.verify_capture(self.pg1, pkts, TCP)
311         self.pg0.assert_nothing_captured(remark="packets forwarded")
312         self.pg2.assert_nothing_captured(remark="packets forwarded")
313         self.pg3.assert_nothing_captured(remark="packets forwarded")
314
315     def test_iacl_proto_tcp_sport(self):
316         """ TCP source port iACL test
317
318         Test scenario for basic protocol ACL with TCP and sport
319             - Create IPv4 stream for pg0 -> pg1 interface.
320             - Create iACL with TCP IP protocol and defined sport.
321             - Send and verify received packets on pg1 interface.
322         """
323
324         # Basic iACL testing with TCP and sport
325         sport = 38
326         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
327                                   TCP(sport=sport, dport=5678))
328         self.pg0.add_stream(pkts)
329
330         key = 'proto_tcp_sport'
331         self.create_classify_table(
332             key, self.build_ip_mask(proto='ff', src_port='ffff'))
333         self.create_classify_session(
334             self.acl_tbl_idx.get(key),
335             self.build_ip_match(proto=socket.IPPROTO_TCP, src_port=sport))
336         self.input_acl_set_interface(
337             self.pg0, self.acl_tbl_idx.get(key))
338         self.acl_active_table = key
339
340         self.pg_enable_capture(self.pg_interfaces)
341         self.pg_start()
342
343         pkts = self.pg1.get_capture(len(pkts))
344         self.verify_capture(self.pg1, pkts, TCP)
345         self.pg0.assert_nothing_captured(remark="packets forwarded")
346         self.pg2.assert_nothing_captured(remark="packets forwarded")
347         self.pg3.assert_nothing_captured(remark="packets forwarded")
348
349     def test_iacl_proto_tcp_dport(self):
350         """ TCP destination port iACL test
351
352         Test scenario for basic protocol ACL with TCP and dport
353             - Create IPv4 stream for pg0 -> pg1 interface.
354             - Create iACL with TCP IP protocol and defined dport.
355             - Send and verify received packets on pg1 interface.
356         """
357
358         # Basic iACL testing with TCP and dport
359         dport = 427
360         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
361                                   TCP(sport=1234, dport=dport))
362         self.pg0.add_stream(pkts)
363
364         key = 'proto_tcp_sport'
365         self.create_classify_table(
366             key, self.build_ip_mask(proto='ff', dst_port='ffff'))
367         self.create_classify_session(
368             self.acl_tbl_idx.get(key),
369             self.build_ip_match(proto=socket.IPPROTO_TCP, dst_port=dport))
370         self.input_acl_set_interface(
371             self.pg0, self.acl_tbl_idx.get(key))
372         self.acl_active_table = key
373
374         self.pg_enable_capture(self.pg_interfaces)
375         self.pg_start()
376
377         pkts = self.pg1.get_capture(len(pkts))
378         self.verify_capture(self.pg1, pkts, TCP)
379         self.pg0.assert_nothing_captured(remark="packets forwarded")
380         self.pg2.assert_nothing_captured(remark="packets forwarded")
381         self.pg3.assert_nothing_captured(remark="packets forwarded")
382
383     def test_iacl_proto_tcp_sport_dport(self):
384         """ TCP source and destination ports iACL test
385
386         Test scenario for basic protocol ACL with TCP and sport and dport
387             - Create IPv4 stream for pg0 -> pg1 interface.
388             - Create iACL with TCP IP protocol and defined sport and dport.
389             - Send and verify received packets on pg1 interface.
390         """
391
392         # Basic iACL testing with TCP and sport and dport
393         sport = 13720
394         dport = 9080
395         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
396                                   TCP(sport=sport, dport=dport))
397         self.pg0.add_stream(pkts)
398
399         key = 'proto_tcp_ports'
400         self.create_classify_table(
401             key,
402             self.build_ip_mask(proto='ff', src_port='ffff', dst_port='ffff'))
403         self.create_classify_session(
404             self.acl_tbl_idx.get(key),
405             self.build_ip_match(proto=socket.IPPROTO_TCP, src_port=sport,
406                                 dst_port=dport))
407         self.input_acl_set_interface(
408             self.pg0, self.acl_tbl_idx.get(key))
409         self.acl_active_table = key
410
411         self.pg_enable_capture(self.pg_interfaces)
412         self.pg_start()
413
414         pkts = self.pg1.get_capture(len(pkts))
415         self.verify_capture(self.pg1, pkts, TCP)
416         self.pg0.assert_nothing_captured(remark="packets forwarded")
417         self.pg2.assert_nothing_captured(remark="packets forwarded")
418         self.pg3.assert_nothing_captured(remark="packets forwarded")
419
420
421 class TestClassifierIPOut(TestClassifier):
422     """ Classifier output IP Test Case """
423
424     @classmethod
425     def setUpClass(cls):
426         super(TestClassifierIPOut, cls).setUpClass()
427
428     @classmethod
429     def tearDownClass(cls):
430         super(TestClassifierIPOut, cls).tearDownClass()
431
432     def test_acl_ip_out(self):
433         """ Output IP ACL test
434
435         Test scenario for basic IP ACL with source IP
436             - Create IPv4 stream for pg1 -> pg0 interface.
437             - Create ACL with source IP address.
438             - Send and verify received packets on pg0 interface.
439         """
440
441         # Basic oACL testing with source IP
442         pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes)
443         self.pg1.add_stream(pkts)
444
445         key = 'ip_out'
446         self.create_classify_table(
447             key, self.build_ip_mask(src_ip='ffffffff'), data_offset=0)
448         self.create_classify_session(
449             self.acl_tbl_idx.get(key),
450             self.build_ip_match(src_ip=self.pg1.remote_ip4))
451         self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
452         self.acl_active_table = key
453
454         self.pg_enable_capture(self.pg_interfaces)
455         self.pg_start()
456
457         pkts = self.pg0.get_capture(len(pkts))
458         self.verify_capture(self.pg0, pkts)
459         self.pg1.assert_nothing_captured(remark="packets forwarded")
460         self.pg2.assert_nothing_captured(remark="packets forwarded")
461         self.pg3.assert_nothing_captured(remark="packets forwarded")
462
463
464 class TestClassifierMAC(TestClassifier):
465     """ Classifier MAC Test Case """
466
467     @classmethod
468     def setUpClass(cls):
469         super(TestClassifierMAC, cls).setUpClass()
470
471     @classmethod
472     def tearDownClass(cls):
473         super(TestClassifierMAC, cls).tearDownClass()
474
475     def test_acl_mac(self):
476         """ MAC ACL test
477
478         Test scenario for basic MAC ACL with source MAC
479             - Create IPv4 stream for pg0 -> pg2 interface.
480             - Create ACL with source MAC address.
481             - Send and verify received packets on pg2 interface.
482         """
483
484         # Basic iACL testing with source MAC
485         pkts = self.create_stream(self.pg0, self.pg2, self.pg_if_packet_sizes)
486         self.pg0.add_stream(pkts)
487
488         key = 'mac'
489         self.create_classify_table(
490             key, self.build_mac_mask(src_mac='ffffffffffff'), data_offset=-14)
491         self.create_classify_session(
492             self.acl_tbl_idx.get(key),
493             self.build_mac_match(src_mac=self.pg0.remote_mac))
494         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
495         self.acl_active_table = key
496
497         self.pg_enable_capture(self.pg_interfaces)
498         self.pg_start()
499
500         pkts = self.pg2.get_capture(len(pkts))
501         self.verify_capture(self.pg2, pkts)
502         self.pg0.assert_nothing_captured(remark="packets forwarded")
503         self.pg1.assert_nothing_captured(remark="packets forwarded")
504         self.pg3.assert_nothing_captured(remark="packets forwarded")
505
506
507 class TestClassifierPBR(TestClassifier):
508     """ Classifier PBR Test Case """
509
510     @classmethod
511     def setUpClass(cls):
512         super(TestClassifierPBR, cls).setUpClass()
513
514     @classmethod
515     def tearDownClass(cls):
516         super(TestClassifierPBR, cls).tearDownClass()
517
518     def test_acl_pbr(self):
519         """ IP PBR test
520
521         Test scenario for PBR with source IP
522             - Create IPv4 stream for pg0 -> pg3 interface.
523             - Configure PBR fib entry for packet forwarding.
524             - Send and verify received packets on pg3 interface.
525         """
526
527         # PBR testing with source IP
528         pkts = self.create_stream(self.pg0, self.pg3, self.pg_if_packet_sizes)
529         self.pg0.add_stream(pkts)
530
531         key = 'pbr'
532         self.create_classify_table(key, self.build_ip_mask(src_ip='ffffffff'))
533         pbr_option = 1
534         # this will create the VRF/table in which we will insert the route
535         self.create_classify_session(
536             self.acl_tbl_idx.get(key),
537             self.build_ip_match(src_ip=self.pg0.remote_ip4),
538             pbr_option, self.pbr_vrfid)
539         self.assertTrue(self.verify_vrf(self.pbr_vrfid))
540         r = VppIpRoute(self, self.pg3.local_ip4, 24,
541                        [VppRoutePath(self.pg3.remote_ip4,
542                                      INVALID_INDEX)],
543                        table_id=self.pbr_vrfid)
544         r.add_vpp_config()
545
546         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
547
548         self.pg_enable_capture(self.pg_interfaces)
549         self.pg_start()
550
551         pkts = self.pg3.get_capture(len(pkts))
552         self.verify_capture(self.pg3, pkts)
553         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key), 0)
554         self.pg0.assert_nothing_captured(remark="packets forwarded")
555         self.pg1.assert_nothing_captured(remark="packets forwarded")
556         self.pg2.assert_nothing_captured(remark="packets forwarded")
557
558         # remove the classify session and the route
559         r.remove_vpp_config()
560         self.create_classify_session(
561             self.acl_tbl_idx.get(key),
562             self.build_ip_match(src_ip=self.pg0.remote_ip4),
563             pbr_option, self.pbr_vrfid, is_add=0)
564
565         # and the table should be gone.
566         self.assertFalse(self.verify_vrf(self.pbr_vrfid))
567
568 if __name__ == '__main__':
569     unittest.main(testRunner=VppTestRunner)