udp: fix csum computation when offload disabled
[vpp.git] / test / test_classifier_ip6.py
1 #!/usr/bin/env python3
2
3 import unittest
4 import socket
5 import binascii
6
7 from asfframework import VppTestRunner
8
9 from scapy.layers.inet6 import UDP, TCP
10 from template_classifier import TestClassifier
11
12
13 class TestClassifierIP6(TestClassifier):
14     """Classifier IP6 Test Case"""
15
16     @classmethod
17     def setUpClass(cls):
18         super(TestClassifierIP6, cls).setUpClass()
19         cls.af = socket.AF_INET6
20
21     @classmethod
22     def tearDownClass(cls):
23         super(TestClassifierIP6, cls).tearDownClass()
24
25     def test_iacl_src_ip(self):
26         """Source IP6 iACL test
27
28         Test scenario for basic IP ACL with source IP
29             - Create IPv6 stream for pg0 -> pg1 interface.
30             - Create iACL with source IP address.
31             - Send and verify received packets on pg1 interface.
32         """
33
34         # Basic iACL testing with source IP
35         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
36         self.pg0.add_stream(pkts)
37
38         key = "ip6_src"
39         self.create_classify_table(
40             key, self.build_ip6_mask(src_ip="ffffffffffffffffffffffffffffffff")
41         )
42         self.create_classify_session(
43             self.acl_tbl_idx.get(key), self.build_ip6_match(src_ip=self.pg0.remote_ip6)
44         )
45         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
46         self.acl_active_table = key
47
48         self.pg_enable_capture(self.pg_interfaces)
49         self.pg_start()
50
51         pkts = self.pg1.get_capture(len(pkts))
52         self.verify_capture(self.pg1, pkts)
53         self.pg0.assert_nothing_captured(remark="packets forwarded")
54         self.pg2.assert_nothing_captured(remark="packets forwarded")
55
56     def test_iacl_dst_ip(self):
57         """Destination IP6 iACL test
58
59         Test scenario for basic IP ACL with destination IP
60             - Create IPv6 stream for pg0 -> pg1 interface.
61             - Create iACL with destination IP address.
62             - Send and verify received packets on pg1 interface.
63         """
64
65         # Basic iACL testing with destination IP
66         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
67         self.pg0.add_stream(pkts)
68
69         key = "ip6_dst"
70         self.create_classify_table(
71             key, self.build_ip6_mask(dst_ip="ffffffffffffffffffffffffffffffff")
72         )
73         self.create_classify_session(
74             self.acl_tbl_idx.get(key), self.build_ip6_match(dst_ip=self.pg1.remote_ip6)
75         )
76         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
77         self.acl_active_table = key
78
79         self.pg_enable_capture(self.pg_interfaces)
80         self.pg_start()
81
82         pkts = self.pg1.get_capture(len(pkts))
83         self.verify_capture(self.pg1, pkts)
84         self.pg0.assert_nothing_captured(remark="packets forwarded")
85         self.pg2.assert_nothing_captured(remark="packets forwarded")
86
87     def test_iacl_src_dst_ip(self):
88         """Source and destination IP6 iACL test
89
90         Test scenario for basic IP ACL with source and destination IP
91             - Create IPv4 stream for pg0 -> pg1 interface.
92             - Create iACL with source and destination IP addresses.
93             - Send and verify received packets on pg1 interface.
94         """
95
96         # Basic iACL testing with source and destination IP
97         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
98         self.pg0.add_stream(pkts)
99
100         key = "ip6"
101         self.create_classify_table(
102             key,
103             self.build_ip6_mask(
104                 src_ip="ffffffffffffffffffffffffffffffff",
105                 dst_ip="ffffffffffffffffffffffffffffffff",
106             ),
107         )
108         self.create_classify_session(
109             self.acl_tbl_idx.get(key),
110             self.build_ip6_match(
111                 src_ip=self.pg0.remote_ip6, dst_ip=self.pg1.remote_ip6
112             ),
113         )
114         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
115         self.acl_active_table = key
116
117         self.pg_enable_capture(self.pg_interfaces)
118         self.pg_start()
119
120         pkts = self.pg1.get_capture(len(pkts))
121         self.verify_capture(self.pg1, pkts)
122         self.pg0.assert_nothing_captured(remark="packets forwarded")
123         self.pg2.assert_nothing_captured(remark="packets forwarded")
124
125
126 # Tests split to different test case classes because of issue reported in
127 # ticket VPP-1336
128 class TestClassifierIP6UDP(TestClassifier):
129     """Classifier IP6 UDP proto Test Case"""
130
131     @classmethod
132     def setUpClass(cls):
133         super(TestClassifierIP6UDP, cls).setUpClass()
134         cls.af = socket.AF_INET6
135
136     def test_iacl_proto_udp(self):
137         """IP6 UDP protocol iACL test
138
139         Test scenario for basic protocol ACL with UDP protocol
140             - Create IPv6 stream for pg0 -> pg1 interface.
141             - Create iACL with UDP IP protocol.
142             - Send and verify received packets on pg1 interface.
143         """
144
145         # Basic iACL testing with UDP protocol
146         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
147         self.pg0.add_stream(pkts)
148
149         key = "nh_udp"
150         self.create_classify_table(key, self.build_ip6_mask(nh="ff"))
151         self.create_classify_session(
152             self.acl_tbl_idx.get(key), self.build_ip6_match(nh=socket.IPPROTO_UDP)
153         )
154         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
155         self.acl_active_table = key
156
157         self.pg_enable_capture(self.pg_interfaces)
158         self.pg_start()
159
160         pkts = self.pg1.get_capture(len(pkts))
161         self.verify_capture(self.pg1, pkts)
162         self.pg0.assert_nothing_captured(remark="packets forwarded")
163         self.pg2.assert_nothing_captured(remark="packets forwarded")
164
165     def test_iacl_proto_udp_sport(self):
166         """IP6 UDP source port iACL test
167
168         Test scenario for basic protocol ACL with UDP and sport
169             - Create IPv6 stream for pg0 -> pg1 interface.
170             - Create iACL with UDP IP protocol and defined sport.
171             - Send and verify received packets on pg1 interface.
172         """
173
174         # Basic iACL testing with UDP and sport
175         sport = 38
176         pkts = self.create_stream(
177             self.pg0, self.pg1, self.pg_if_packet_sizes, UDP(sport=sport, dport=5678)
178         )
179         self.pg0.add_stream(pkts)
180
181         key = "nh_udp_sport"
182         self.create_classify_table(key, self.build_ip6_mask(nh="ff", src_port="ffff"))
183         self.create_classify_session(
184             self.acl_tbl_idx.get(key),
185             self.build_ip6_match(nh=socket.IPPROTO_UDP, src_port=sport),
186         )
187         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
188         self.acl_active_table = key
189
190         self.pg_enable_capture(self.pg_interfaces)
191         self.pg_start()
192
193         pkts = self.pg1.get_capture(len(pkts))
194         self.verify_capture(self.pg1, pkts)
195         self.pg0.assert_nothing_captured(remark="packets forwarded")
196         self.pg2.assert_nothing_captured(remark="packets forwarded")
197
198     def test_iacl_proto_udp_dport(self):
199         """IP6 UDP destination port iACL test
200
201         Test scenario for basic protocol ACL with UDP and dport
202             - Create IPv6 stream for pg0 -> pg1 interface.
203             - Create iACL with UDP IP protocol and defined dport.
204             - Send and verify received packets on pg1 interface.
205         """
206
207         # Basic iACL testing with UDP and dport
208         dport = 427
209         pkts = self.create_stream(
210             self.pg0, self.pg1, self.pg_if_packet_sizes, UDP(sport=1234, dport=dport)
211         )
212         self.pg0.add_stream(pkts)
213
214         key = "nh_udp_dport"
215         self.create_classify_table(key, self.build_ip6_mask(nh="ff", dst_port="ffff"))
216         self.create_classify_session(
217             self.acl_tbl_idx.get(key),
218             self.build_ip6_match(nh=socket.IPPROTO_UDP, dst_port=dport),
219         )
220         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
221         self.acl_active_table = key
222
223         self.pg_enable_capture(self.pg_interfaces)
224         self.pg_start()
225
226         pkts = self.pg1.get_capture(len(pkts))
227         self.verify_capture(self.pg1, pkts)
228         self.pg0.assert_nothing_captured(remark="packets forwarded")
229         self.pg2.assert_nothing_captured(remark="packets forwarded")
230
231     def test_iacl_proto_udp_sport_dport(self):
232         """IP6 UDP source and destination ports iACL test
233
234         Test scenario for basic protocol ACL with UDP and sport and dport
235             - Create IPv6 stream for pg0 -> pg1 interface.
236             - Create iACL with UDP IP protocol and defined sport and dport.
237             - Send and verify received packets on pg1 interface.
238         """
239
240         # Basic iACL testing with UDP and sport and dport
241         sport = 13720
242         dport = 9080
243         pkts = self.create_stream(
244             self.pg0, self.pg1, self.pg_if_packet_sizes, UDP(sport=sport, dport=dport)
245         )
246         self.pg0.add_stream(pkts)
247
248         key = "nh_udp_ports"
249         self.create_classify_table(
250             key, self.build_ip6_mask(nh="ff", src_port="ffff", dst_port="ffff")
251         )
252         self.create_classify_session(
253             self.acl_tbl_idx.get(key),
254             self.build_ip6_match(nh=socket.IPPROTO_UDP, src_port=sport, dst_port=dport),
255         )
256         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
257         self.acl_active_table = key
258
259         self.pg_enable_capture(self.pg_interfaces)
260         self.pg_start()
261
262         pkts = self.pg1.get_capture(len(pkts))
263         self.verify_capture(self.pg1, pkts)
264         self.pg0.assert_nothing_captured(remark="packets forwarded")
265         self.pg2.assert_nothing_captured(remark="packets forwarded")
266
267
268 class TestClassifierIP6TCP(TestClassifier):
269     """Classifier IP6 TCP proto Test Case"""
270
271     @classmethod
272     def setUpClass(cls):
273         super(TestClassifierIP6TCP, cls).setUpClass()
274         cls.af = socket.AF_INET6
275
276     def test_iacl_proto_tcp(self):
277         """IP6 TCP protocol iACL test
278
279         Test scenario for basic protocol ACL with TCP protocol
280             - Create IPv6 stream for pg0 -> pg1 interface.
281             - Create iACL with TCP IP protocol.
282             - Send and verify received packets on pg1 interface.
283         """
284
285         # Basic iACL testing with TCP protocol
286         pkts = self.create_stream(
287             self.pg0, self.pg1, self.pg_if_packet_sizes, TCP(sport=1234, dport=5678)
288         )
289         self.pg0.add_stream(pkts)
290
291         key = "nh_tcp"
292         self.create_classify_table(key, self.build_ip6_mask(nh="ff"))
293         self.create_classify_session(
294             self.acl_tbl_idx.get(key), self.build_ip6_match(nh=socket.IPPROTO_TCP)
295         )
296         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
297         self.acl_active_table = key
298
299         self.pg_enable_capture(self.pg_interfaces)
300         self.pg_start()
301
302         pkts = self.pg1.get_capture(len(pkts))
303         self.verify_capture(self.pg1, pkts, TCP)
304         self.pg0.assert_nothing_captured(remark="packets forwarded")
305         self.pg2.assert_nothing_captured(remark="packets forwarded")
306
307     def test_iacl_proto_tcp_sport(self):
308         """IP6 TCP source port iACL test
309
310         Test scenario for basic protocol ACL with TCP and sport
311             - Create IPv6 stream for pg0 -> pg1 interface.
312             - Create iACL with TCP IP protocol and defined sport.
313             - Send and verify received packets on pg1 interface.
314         """
315
316         # Basic iACL testing with TCP and sport
317         sport = 38
318         pkts = self.create_stream(
319             self.pg0, self.pg1, self.pg_if_packet_sizes, TCP(sport=sport, dport=5678)
320         )
321         self.pg0.add_stream(pkts)
322
323         key = "nh_tcp_sport"
324         self.create_classify_table(key, self.build_ip6_mask(nh="ff", src_port="ffff"))
325         self.create_classify_session(
326             self.acl_tbl_idx.get(key),
327             self.build_ip6_match(nh=socket.IPPROTO_TCP, src_port=sport),
328         )
329         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
330         self.acl_active_table = key
331
332         self.pg_enable_capture(self.pg_interfaces)
333         self.pg_start()
334
335         pkts = self.pg1.get_capture(len(pkts))
336         self.verify_capture(self.pg1, pkts, TCP)
337         self.pg0.assert_nothing_captured(remark="packets forwarded")
338         self.pg2.assert_nothing_captured(remark="packets forwarded")
339
340     def test_iacl_proto_tcp_dport(self):
341         """IP6 TCP destination port iACL test
342
343         Test scenario for basic protocol ACL with TCP and dport
344             - Create IPv6 stream for pg0 -> pg1 interface.
345             - Create iACL with TCP IP protocol and defined dport.
346             - Send and verify received packets on pg1 interface.
347         """
348
349         # Basic iACL testing with TCP and dport
350         dport = 427
351         pkts = self.create_stream(
352             self.pg0, self.pg1, self.pg_if_packet_sizes, TCP(sport=1234, dport=dport)
353         )
354         self.pg0.add_stream(pkts)
355
356         key = "nh_tcp_dport"
357         self.create_classify_table(key, self.build_ip6_mask(nh="ff", dst_port="ffff"))
358         self.create_classify_session(
359             self.acl_tbl_idx.get(key),
360             self.build_ip6_match(nh=socket.IPPROTO_TCP, dst_port=dport),
361         )
362         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
363         self.acl_active_table = key
364
365         self.pg_enable_capture(self.pg_interfaces)
366         self.pg_start()
367
368         pkts = self.pg1.get_capture(len(pkts))
369         self.verify_capture(self.pg1, pkts, TCP)
370         self.pg0.assert_nothing_captured(remark="packets forwarded")
371         self.pg2.assert_nothing_captured(remark="packets forwarded")
372
373     def test_iacl_proto_tcp_sport_dport(self):
374         """IP6 TCP source and destination ports iACL test
375
376         Test scenario for basic protocol ACL with TCP and sport and dport
377             - Create IPv6 stream for pg0 -> pg1 interface.
378             - Create iACL with TCP IP protocol and defined sport and dport.
379             - Send and verify received packets on pg1 interface.
380         """
381
382         # Basic iACL testing with TCP and sport and dport
383         sport = 13720
384         dport = 9080
385         pkts = self.create_stream(
386             self.pg0, self.pg1, self.pg_if_packet_sizes, TCP(sport=sport, dport=dport)
387         )
388         self.pg0.add_stream(pkts)
389
390         key = "nh_tcp_ports"
391         self.create_classify_table(
392             key, self.build_ip6_mask(nh="ff", src_port="ffff", dst_port="ffff")
393         )
394         self.create_classify_session(
395             self.acl_tbl_idx.get(key),
396             self.build_ip6_match(nh=socket.IPPROTO_TCP, src_port=sport, dst_port=dport),
397         )
398         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
399         self.acl_active_table = key
400
401         self.pg_enable_capture(self.pg_interfaces)
402         self.pg_start()
403
404         pkts = self.pg1.get_capture(len(pkts))
405         self.verify_capture(self.pg1, pkts, TCP)
406         self.pg0.assert_nothing_captured(remark="packets forwarded")
407         self.pg2.assert_nothing_captured(remark="packets forwarded")
408
409
410 class TestClassifierIP6Out(TestClassifier):
411     """Classifier output IP6 Test Case"""
412
413     @classmethod
414     def setUpClass(cls):
415         super(TestClassifierIP6Out, cls).setUpClass()
416         cls.af = socket.AF_INET6
417
418     def test_acl_ip_out(self):
419         """Output IP6 ACL test
420
421         Test scenario for basic IP ACL with source IP
422             - Create IPv6 stream for pg1 -> pg0 interface.
423             - Create ACL with source IP address.
424             - Send and verify received packets on pg0 interface.
425         """
426
427         # Basic oACL testing with source IP
428         pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes)
429         self.pg1.add_stream(pkts)
430
431         key = "ip6_out"
432         self.create_classify_table(
433             key,
434             self.build_ip6_mask(src_ip="ffffffffffffffffffffffffffffffff"),
435             data_offset=0,
436         )
437         self.create_classify_session(
438             self.acl_tbl_idx.get(key), self.build_ip6_match(src_ip=self.pg1.remote_ip6)
439         )
440         self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
441         self.acl_active_table = key
442
443         self.pg_enable_capture(self.pg_interfaces)
444         self.pg_start()
445
446         pkts = self.pg0.get_capture(len(pkts))
447         self.verify_capture(self.pg0, pkts)
448         self.pg1.assert_nothing_captured(remark="packets forwarded")
449         self.pg2.assert_nothing_captured(remark="packets forwarded")
450
451
452 class TestClassifierIP6MAC(TestClassifier):
453     """Classifier IP6 MAC Test Case"""
454
455     @classmethod
456     def setUpClass(cls):
457         super(TestClassifierIP6MAC, cls).setUpClass()
458         cls.af = socket.AF_INET6
459
460     def test_acl_mac(self):
461         """IP6 MAC iACL test
462
463         Test scenario for basic MAC ACL with source MAC
464             - Create IPv6 stream for pg0 -> pg2 interface.
465             - Create ACL with source MAC address.
466             - Send and verify received packets on pg2 interface.
467         """
468
469         # Basic iACL testing with source MAC
470         pkts = self.create_stream(self.pg0, self.pg2, self.pg_if_packet_sizes)
471         self.pg0.add_stream(pkts)
472
473         key = "mac"
474         self.create_classify_table(
475             key, self.build_mac_mask(src_mac="ffffffffffff"), data_offset=-14
476         )
477         self.create_classify_session(
478             self.acl_tbl_idx.get(key), self.build_mac_match(src_mac=self.pg0.remote_mac)
479         )
480         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
481         self.acl_active_table = key
482
483         self.pg_enable_capture(self.pg_interfaces)
484         self.pg_start()
485
486         pkts = self.pg2.get_capture(len(pkts))
487         self.verify_capture(self.pg2, pkts)
488         self.pg0.assert_nothing_captured(remark="packets forwarded")
489         self.pg1.assert_nothing_captured(remark="packets forwarded")
490
491
492 if __name__ == "__main__":
493     unittest.main(testRunner=VppTestRunner)