tests: replace pycodestyle with black
[vpp.git] / test / test_classifier_ip6.py
1 #!/usr/bin/env python3
2
3 import unittest
4 import socket
5 import binascii
6
7 from framework import VppTestCase, VppTestRunner
8
9 from scapy.packet import Raw
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet6 import IPv6, UDP, TCP
12 from util import ppp
13 from template_classifier import TestClassifier
14
15
16 class TestClassifierIP6(TestClassifier):
17     """Classifier IP6 Test Case"""
18
19     @classmethod
20     def setUpClass(cls):
21         super(TestClassifierIP6, cls).setUpClass()
22         cls.af = socket.AF_INET6
23
24     @classmethod
25     def tearDownClass(cls):
26         super(TestClassifierIP6, cls).tearDownClass()
27
28     def test_iacl_src_ip(self):
29         """Source IP6 iACL test
30
31         Test scenario for basic IP ACL with source IP
32             - Create IPv6 stream for pg0 -> pg1 interface.
33             - Create iACL with source IP address.
34             - Send and verify received packets on pg1 interface.
35         """
36
37         # Basic iACL testing with source IP
38         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
39         self.pg0.add_stream(pkts)
40
41         key = "ip6_src"
42         self.create_classify_table(
43             key, self.build_ip6_mask(src_ip="ffffffffffffffffffffffffffffffff")
44         )
45         self.create_classify_session(
46             self.acl_tbl_idx.get(key), self.build_ip6_match(src_ip=self.pg0.remote_ip6)
47         )
48         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
49         self.acl_active_table = key
50
51         self.pg_enable_capture(self.pg_interfaces)
52         self.pg_start()
53
54         pkts = self.pg1.get_capture(len(pkts))
55         self.verify_capture(self.pg1, pkts)
56         self.pg0.assert_nothing_captured(remark="packets forwarded")
57         self.pg2.assert_nothing_captured(remark="packets forwarded")
58
59     def test_iacl_dst_ip(self):
60         """Destination IP6 iACL test
61
62         Test scenario for basic IP ACL with destination IP
63             - Create IPv6 stream for pg0 -> pg1 interface.
64             - Create iACL with destination IP address.
65             - Send and verify received packets on pg1 interface.
66         """
67
68         # Basic iACL testing with destination IP
69         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
70         self.pg0.add_stream(pkts)
71
72         key = "ip6_dst"
73         self.create_classify_table(
74             key, self.build_ip6_mask(dst_ip="ffffffffffffffffffffffffffffffff")
75         )
76         self.create_classify_session(
77             self.acl_tbl_idx.get(key), self.build_ip6_match(dst_ip=self.pg1.remote_ip6)
78         )
79         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
80         self.acl_active_table = key
81
82         self.pg_enable_capture(self.pg_interfaces)
83         self.pg_start()
84
85         pkts = self.pg1.get_capture(len(pkts))
86         self.verify_capture(self.pg1, pkts)
87         self.pg0.assert_nothing_captured(remark="packets forwarded")
88         self.pg2.assert_nothing_captured(remark="packets forwarded")
89
90     def test_iacl_src_dst_ip(self):
91         """Source and destination IP6 iACL test
92
93         Test scenario for basic IP ACL with source and destination IP
94             - Create IPv4 stream for pg0 -> pg1 interface.
95             - Create iACL with source and destination IP addresses.
96             - Send and verify received packets on pg1 interface.
97         """
98
99         # Basic iACL testing with source and destination IP
100         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
101         self.pg0.add_stream(pkts)
102
103         key = "ip6"
104         self.create_classify_table(
105             key,
106             self.build_ip6_mask(
107                 src_ip="ffffffffffffffffffffffffffffffff",
108                 dst_ip="ffffffffffffffffffffffffffffffff",
109             ),
110         )
111         self.create_classify_session(
112             self.acl_tbl_idx.get(key),
113             self.build_ip6_match(
114                 src_ip=self.pg0.remote_ip6, dst_ip=self.pg1.remote_ip6
115             ),
116         )
117         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
118         self.acl_active_table = key
119
120         self.pg_enable_capture(self.pg_interfaces)
121         self.pg_start()
122
123         pkts = self.pg1.get_capture(len(pkts))
124         self.verify_capture(self.pg1, pkts)
125         self.pg0.assert_nothing_captured(remark="packets forwarded")
126         self.pg2.assert_nothing_captured(remark="packets forwarded")
127
128
129 # Tests split to different test case classes because of issue reported in
130 # ticket VPP-1336
131 class TestClassifierIP6UDP(TestClassifier):
132     """Classifier IP6 UDP proto Test Case"""
133
134     @classmethod
135     def setUpClass(cls):
136         super(TestClassifierIP6UDP, cls).setUpClass()
137         cls.af = socket.AF_INET6
138
139     def test_iacl_proto_udp(self):
140         """IP6 UDP protocol iACL test
141
142         Test scenario for basic protocol ACL with UDP protocol
143             - Create IPv6 stream for pg0 -> pg1 interface.
144             - Create iACL with UDP IP protocol.
145             - Send and verify received packets on pg1 interface.
146         """
147
148         # Basic iACL testing with UDP protocol
149         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
150         self.pg0.add_stream(pkts)
151
152         key = "nh_udp"
153         self.create_classify_table(key, self.build_ip6_mask(nh="ff"))
154         self.create_classify_session(
155             self.acl_tbl_idx.get(key), self.build_ip6_match(nh=socket.IPPROTO_UDP)
156         )
157         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
158         self.acl_active_table = key
159
160         self.pg_enable_capture(self.pg_interfaces)
161         self.pg_start()
162
163         pkts = self.pg1.get_capture(len(pkts))
164         self.verify_capture(self.pg1, pkts)
165         self.pg0.assert_nothing_captured(remark="packets forwarded")
166         self.pg2.assert_nothing_captured(remark="packets forwarded")
167
168     def test_iacl_proto_udp_sport(self):
169         """IP6 UDP source port iACL test
170
171         Test scenario for basic protocol ACL with UDP and sport
172             - Create IPv6 stream for pg0 -> pg1 interface.
173             - Create iACL with UDP IP protocol and defined sport.
174             - Send and verify received packets on pg1 interface.
175         """
176
177         # Basic iACL testing with UDP and sport
178         sport = 38
179         pkts = self.create_stream(
180             self.pg0, self.pg1, self.pg_if_packet_sizes, UDP(sport=sport, dport=5678)
181         )
182         self.pg0.add_stream(pkts)
183
184         key = "nh_udp_sport"
185         self.create_classify_table(key, self.build_ip6_mask(nh="ff", src_port="ffff"))
186         self.create_classify_session(
187             self.acl_tbl_idx.get(key),
188             self.build_ip6_match(nh=socket.IPPROTO_UDP, src_port=sport),
189         )
190         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
191         self.acl_active_table = key
192
193         self.pg_enable_capture(self.pg_interfaces)
194         self.pg_start()
195
196         pkts = self.pg1.get_capture(len(pkts))
197         self.verify_capture(self.pg1, pkts)
198         self.pg0.assert_nothing_captured(remark="packets forwarded")
199         self.pg2.assert_nothing_captured(remark="packets forwarded")
200
201     def test_iacl_proto_udp_dport(self):
202         """IP6 UDP destination port iACL test
203
204         Test scenario for basic protocol ACL with UDP and dport
205             - Create IPv6 stream for pg0 -> pg1 interface.
206             - Create iACL with UDP IP protocol and defined dport.
207             - Send and verify received packets on pg1 interface.
208         """
209
210         # Basic iACL testing with UDP and dport
211         dport = 427
212         pkts = self.create_stream(
213             self.pg0, self.pg1, self.pg_if_packet_sizes, UDP(sport=1234, dport=dport)
214         )
215         self.pg0.add_stream(pkts)
216
217         key = "nh_udp_dport"
218         self.create_classify_table(key, self.build_ip6_mask(nh="ff", dst_port="ffff"))
219         self.create_classify_session(
220             self.acl_tbl_idx.get(key),
221             self.build_ip6_match(nh=socket.IPPROTO_UDP, dst_port=dport),
222         )
223         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
224         self.acl_active_table = key
225
226         self.pg_enable_capture(self.pg_interfaces)
227         self.pg_start()
228
229         pkts = self.pg1.get_capture(len(pkts))
230         self.verify_capture(self.pg1, pkts)
231         self.pg0.assert_nothing_captured(remark="packets forwarded")
232         self.pg2.assert_nothing_captured(remark="packets forwarded")
233
234     def test_iacl_proto_udp_sport_dport(self):
235         """IP6 UDP source and destination ports iACL test
236
237         Test scenario for basic protocol ACL with UDP and sport and dport
238             - Create IPv6 stream for pg0 -> pg1 interface.
239             - Create iACL with UDP IP protocol and defined sport and dport.
240             - Send and verify received packets on pg1 interface.
241         """
242
243         # Basic iACL testing with UDP and sport and dport
244         sport = 13720
245         dport = 9080
246         pkts = self.create_stream(
247             self.pg0, self.pg1, self.pg_if_packet_sizes, UDP(sport=sport, dport=dport)
248         )
249         self.pg0.add_stream(pkts)
250
251         key = "nh_udp_ports"
252         self.create_classify_table(
253             key, self.build_ip6_mask(nh="ff", src_port="ffff", dst_port="ffff")
254         )
255         self.create_classify_session(
256             self.acl_tbl_idx.get(key),
257             self.build_ip6_match(nh=socket.IPPROTO_UDP, src_port=sport, dst_port=dport),
258         )
259         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
260         self.acl_active_table = key
261
262         self.pg_enable_capture(self.pg_interfaces)
263         self.pg_start()
264
265         pkts = self.pg1.get_capture(len(pkts))
266         self.verify_capture(self.pg1, pkts)
267         self.pg0.assert_nothing_captured(remark="packets forwarded")
268         self.pg2.assert_nothing_captured(remark="packets forwarded")
269
270
271 class TestClassifierIP6TCP(TestClassifier):
272     """Classifier IP6 TCP proto Test Case"""
273
274     @classmethod
275     def setUpClass(cls):
276         super(TestClassifierIP6TCP, cls).setUpClass()
277         cls.af = socket.AF_INET6
278
279     def test_iacl_proto_tcp(self):
280         """IP6 TCP protocol iACL test
281
282         Test scenario for basic protocol ACL with TCP protocol
283             - Create IPv6 stream for pg0 -> pg1 interface.
284             - Create iACL with TCP IP protocol.
285             - Send and verify received packets on pg1 interface.
286         """
287
288         # Basic iACL testing with TCP protocol
289         pkts = self.create_stream(
290             self.pg0, self.pg1, self.pg_if_packet_sizes, TCP(sport=1234, dport=5678)
291         )
292         self.pg0.add_stream(pkts)
293
294         key = "nh_tcp"
295         self.create_classify_table(key, self.build_ip6_mask(nh="ff"))
296         self.create_classify_session(
297             self.acl_tbl_idx.get(key), self.build_ip6_match(nh=socket.IPPROTO_TCP)
298         )
299         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
300         self.acl_active_table = key
301
302         self.pg_enable_capture(self.pg_interfaces)
303         self.pg_start()
304
305         pkts = self.pg1.get_capture(len(pkts))
306         self.verify_capture(self.pg1, pkts, TCP)
307         self.pg0.assert_nothing_captured(remark="packets forwarded")
308         self.pg2.assert_nothing_captured(remark="packets forwarded")
309
310     def test_iacl_proto_tcp_sport(self):
311         """IP6 TCP source port iACL test
312
313         Test scenario for basic protocol ACL with TCP and sport
314             - Create IPv6 stream for pg0 -> pg1 interface.
315             - Create iACL with TCP IP protocol and defined sport.
316             - Send and verify received packets on pg1 interface.
317         """
318
319         # Basic iACL testing with TCP and sport
320         sport = 38
321         pkts = self.create_stream(
322             self.pg0, self.pg1, self.pg_if_packet_sizes, TCP(sport=sport, dport=5678)
323         )
324         self.pg0.add_stream(pkts)
325
326         key = "nh_tcp_sport"
327         self.create_classify_table(key, self.build_ip6_mask(nh="ff", src_port="ffff"))
328         self.create_classify_session(
329             self.acl_tbl_idx.get(key),
330             self.build_ip6_match(nh=socket.IPPROTO_TCP, src_port=sport),
331         )
332         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
333         self.acl_active_table = key
334
335         self.pg_enable_capture(self.pg_interfaces)
336         self.pg_start()
337
338         pkts = self.pg1.get_capture(len(pkts))
339         self.verify_capture(self.pg1, pkts, TCP)
340         self.pg0.assert_nothing_captured(remark="packets forwarded")
341         self.pg2.assert_nothing_captured(remark="packets forwarded")
342
343     def test_iacl_proto_tcp_dport(self):
344         """IP6 TCP destination port iACL test
345
346         Test scenario for basic protocol ACL with TCP and dport
347             - Create IPv6 stream for pg0 -> pg1 interface.
348             - Create iACL with TCP IP protocol and defined dport.
349             - Send and verify received packets on pg1 interface.
350         """
351
352         # Basic iACL testing with TCP and dport
353         dport = 427
354         pkts = self.create_stream(
355             self.pg0, self.pg1, self.pg_if_packet_sizes, TCP(sport=1234, dport=dport)
356         )
357         self.pg0.add_stream(pkts)
358
359         key = "nh_tcp_dport"
360         self.create_classify_table(key, self.build_ip6_mask(nh="ff", dst_port="ffff"))
361         self.create_classify_session(
362             self.acl_tbl_idx.get(key),
363             self.build_ip6_match(nh=socket.IPPROTO_TCP, dst_port=dport),
364         )
365         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
366         self.acl_active_table = key
367
368         self.pg_enable_capture(self.pg_interfaces)
369         self.pg_start()
370
371         pkts = self.pg1.get_capture(len(pkts))
372         self.verify_capture(self.pg1, pkts, TCP)
373         self.pg0.assert_nothing_captured(remark="packets forwarded")
374         self.pg2.assert_nothing_captured(remark="packets forwarded")
375
376     def test_iacl_proto_tcp_sport_dport(self):
377         """IP6 TCP source and destination ports iACL test
378
379         Test scenario for basic protocol ACL with TCP and sport and dport
380             - Create IPv6 stream for pg0 -> pg1 interface.
381             - Create iACL with TCP IP protocol and defined sport and dport.
382             - Send and verify received packets on pg1 interface.
383         """
384
385         # Basic iACL testing with TCP and sport and dport
386         sport = 13720
387         dport = 9080
388         pkts = self.create_stream(
389             self.pg0, self.pg1, self.pg_if_packet_sizes, TCP(sport=sport, dport=dport)
390         )
391         self.pg0.add_stream(pkts)
392
393         key = "nh_tcp_ports"
394         self.create_classify_table(
395             key, self.build_ip6_mask(nh="ff", src_port="ffff", dst_port="ffff")
396         )
397         self.create_classify_session(
398             self.acl_tbl_idx.get(key),
399             self.build_ip6_match(nh=socket.IPPROTO_TCP, src_port=sport, dst_port=dport),
400         )
401         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
402         self.acl_active_table = key
403
404         self.pg_enable_capture(self.pg_interfaces)
405         self.pg_start()
406
407         pkts = self.pg1.get_capture(len(pkts))
408         self.verify_capture(self.pg1, pkts, TCP)
409         self.pg0.assert_nothing_captured(remark="packets forwarded")
410         self.pg2.assert_nothing_captured(remark="packets forwarded")
411
412
413 class TestClassifierIP6Out(TestClassifier):
414     """Classifier output IP6 Test Case"""
415
416     @classmethod
417     def setUpClass(cls):
418         super(TestClassifierIP6Out, cls).setUpClass()
419         cls.af = socket.AF_INET6
420
421     def test_acl_ip_out(self):
422         """Output IP6 ACL test
423
424         Test scenario for basic IP ACL with source IP
425             - Create IPv6 stream for pg1 -> pg0 interface.
426             - Create ACL with source IP address.
427             - Send and verify received packets on pg0 interface.
428         """
429
430         # Basic oACL testing with source IP
431         pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes)
432         self.pg1.add_stream(pkts)
433
434         key = "ip6_out"
435         self.create_classify_table(
436             key,
437             self.build_ip6_mask(src_ip="ffffffffffffffffffffffffffffffff"),
438             data_offset=0,
439         )
440         self.create_classify_session(
441             self.acl_tbl_idx.get(key), self.build_ip6_match(src_ip=self.pg1.remote_ip6)
442         )
443         self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
444         self.acl_active_table = key
445
446         self.pg_enable_capture(self.pg_interfaces)
447         self.pg_start()
448
449         pkts = self.pg0.get_capture(len(pkts))
450         self.verify_capture(self.pg0, pkts)
451         self.pg1.assert_nothing_captured(remark="packets forwarded")
452         self.pg2.assert_nothing_captured(remark="packets forwarded")
453
454
455 class TestClassifierIP6MAC(TestClassifier):
456     """Classifier IP6 MAC Test Case"""
457
458     @classmethod
459     def setUpClass(cls):
460         super(TestClassifierIP6MAC, cls).setUpClass()
461         cls.af = socket.AF_INET6
462
463     def test_acl_mac(self):
464         """IP6 MAC iACL test
465
466         Test scenario for basic MAC ACL with source MAC
467             - Create IPv6 stream for pg0 -> pg2 interface.
468             - Create ACL with source MAC address.
469             - Send and verify received packets on pg2 interface.
470         """
471
472         # Basic iACL testing with source MAC
473         pkts = self.create_stream(self.pg0, self.pg2, self.pg_if_packet_sizes)
474         self.pg0.add_stream(pkts)
475
476         key = "mac"
477         self.create_classify_table(
478             key, self.build_mac_mask(src_mac="ffffffffffff"), data_offset=-14
479         )
480         self.create_classify_session(
481             self.acl_tbl_idx.get(key), self.build_mac_match(src_mac=self.pg0.remote_mac)
482         )
483         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
484         self.acl_active_table = key
485
486         self.pg_enable_capture(self.pg_interfaces)
487         self.pg_start()
488
489         pkts = self.pg2.get_capture(len(pkts))
490         self.verify_capture(self.pg2, pkts)
491         self.pg0.assert_nothing_captured(remark="packets forwarded")
492         self.pg1.assert_nothing_captured(remark="packets forwarded")
493
494
495 if __name__ == "__main__":
496     unittest.main(testRunner=VppTestRunner)