vlib: fix code of getting numa node with specific cpu_id
[vpp.git] / test / test_classifier_ip6.py
1 #!/usr/bin/env python3
2
3 import unittest
4 import socket
5 import binascii
6
7 from framework import VppTestCase, VppTestRunner
8
9 from scapy.packet import Raw
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet6 import IPv6, UDP, TCP
12 from util import ppp
13 from template_classifier import TestClassifier
14
15
16 class TestClassifierIP6(TestClassifier):
17     """ Classifier IP6 Test Case """
18
19     @classmethod
20     def setUpClass(cls):
21         super(TestClassifierIP6, cls).setUpClass()
22         cls.af = socket.AF_INET6
23
24     @classmethod
25     def tearDownClass(cls):
26         super(TestClassifierIP6, cls).tearDownClass()
27
28     def test_iacl_src_ip(self):
29         """ Source IP6 iACL test
30
31         Test scenario for basic IP ACL with source IP
32             - Create IPv6 stream for pg0 -> pg1 interface.
33             - Create iACL with source IP address.
34             - Send and verify received packets on pg1 interface.
35         """
36
37         # Basic iACL testing with source IP
38         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
39         self.pg0.add_stream(pkts)
40
41         key = 'ip6_src'
42         self.create_classify_table(
43             key,
44             self.build_ip6_mask(src_ip='ffffffffffffffffffffffffffffffff'))
45         self.create_classify_session(
46             self.acl_tbl_idx.get(key),
47             self.build_ip6_match(src_ip=self.pg0.remote_ip6))
48         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
49         self.acl_active_table = key
50
51         self.pg_enable_capture(self.pg_interfaces)
52         self.pg_start()
53
54         pkts = self.pg1.get_capture(len(pkts))
55         self.verify_capture(self.pg1, pkts)
56         self.pg0.assert_nothing_captured(remark="packets forwarded")
57         self.pg2.assert_nothing_captured(remark="packets forwarded")
58
59     def test_iacl_dst_ip(self):
60         """ Destination IP6 iACL test
61
62         Test scenario for basic IP ACL with destination IP
63             - Create IPv6 stream for pg0 -> pg1 interface.
64             - Create iACL with destination IP address.
65             - Send and verify received packets on pg1 interface.
66         """
67
68         # Basic iACL testing with destination IP
69         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
70         self.pg0.add_stream(pkts)
71
72         key = 'ip6_dst'
73         self.create_classify_table(
74             key,
75             self.build_ip6_mask(dst_ip='ffffffffffffffffffffffffffffffff'))
76         self.create_classify_session(
77             self.acl_tbl_idx.get(key),
78             self.build_ip6_match(dst_ip=self.pg1.remote_ip6))
79         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
80         self.acl_active_table = key
81
82         self.pg_enable_capture(self.pg_interfaces)
83         self.pg_start()
84
85         pkts = self.pg1.get_capture(len(pkts))
86         self.verify_capture(self.pg1, pkts)
87         self.pg0.assert_nothing_captured(remark="packets forwarded")
88         self.pg2.assert_nothing_captured(remark="packets forwarded")
89
90     def test_iacl_src_dst_ip(self):
91         """ Source and destination IP6 iACL test
92
93         Test scenario for basic IP ACL with source and destination IP
94             - Create IPv4 stream for pg0 -> pg1 interface.
95             - Create iACL with source and destination IP addresses.
96             - Send and verify received packets on pg1 interface.
97         """
98
99         # Basic iACL testing with source and destination IP
100         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
101         self.pg0.add_stream(pkts)
102
103         key = 'ip6'
104         self.create_classify_table(
105             key,
106             self.build_ip6_mask(src_ip='ffffffffffffffffffffffffffffffff',
107                                 dst_ip='ffffffffffffffffffffffffffffffff'))
108         self.create_classify_session(
109             self.acl_tbl_idx.get(key),
110             self.build_ip6_match(src_ip=self.pg0.remote_ip6,
111                                  dst_ip=self.pg1.remote_ip6))
112         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
113         self.acl_active_table = key
114
115         self.pg_enable_capture(self.pg_interfaces)
116         self.pg_start()
117
118         pkts = self.pg1.get_capture(len(pkts))
119         self.verify_capture(self.pg1, pkts)
120         self.pg0.assert_nothing_captured(remark="packets forwarded")
121         self.pg2.assert_nothing_captured(remark="packets forwarded")
122
123
124 # Tests split to different test case classes because of issue reported in
125 # ticket VPP-1336
126 class TestClassifierIP6UDP(TestClassifier):
127     """ Classifier IP6 UDP proto Test Case """
128
129     @classmethod
130     def setUpClass(cls):
131         super(TestClassifierIP6UDP, cls).setUpClass()
132         cls.af = socket.AF_INET6
133
134     def test_iacl_proto_udp(self):
135         """ IP6 UDP protocol iACL test
136
137         Test scenario for basic protocol ACL with UDP protocol
138             - Create IPv6 stream for pg0 -> pg1 interface.
139             - Create iACL with UDP IP protocol.
140             - Send and verify received packets on pg1 interface.
141         """
142
143         # Basic iACL testing with UDP protocol
144         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
145         self.pg0.add_stream(pkts)
146
147         key = 'nh_udp'
148         self.create_classify_table(key, self.build_ip6_mask(nh='ff'))
149         self.create_classify_session(
150             self.acl_tbl_idx.get(key),
151             self.build_ip6_match(nh=socket.IPPROTO_UDP))
152         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
153         self.acl_active_table = key
154
155         self.pg_enable_capture(self.pg_interfaces)
156         self.pg_start()
157
158         pkts = self.pg1.get_capture(len(pkts))
159         self.verify_capture(self.pg1, pkts)
160         self.pg0.assert_nothing_captured(remark="packets forwarded")
161         self.pg2.assert_nothing_captured(remark="packets forwarded")
162
163     def test_iacl_proto_udp_sport(self):
164         """ IP6 UDP source port iACL test
165
166         Test scenario for basic protocol ACL with UDP and sport
167             - Create IPv6 stream for pg0 -> pg1 interface.
168             - Create iACL with UDP IP protocol and defined sport.
169             - Send and verify received packets on pg1 interface.
170         """
171
172         # Basic iACL testing with UDP and sport
173         sport = 38
174         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
175                                   UDP(sport=sport, dport=5678))
176         self.pg0.add_stream(pkts)
177
178         key = 'nh_udp_sport'
179         self.create_classify_table(
180             key, self.build_ip6_mask(nh='ff', src_port='ffff'))
181         self.create_classify_session(
182             self.acl_tbl_idx.get(key),
183             self.build_ip6_match(nh=socket.IPPROTO_UDP, src_port=sport))
184         self.input_acl_set_interface(
185             self.pg0, self.acl_tbl_idx.get(key))
186         self.acl_active_table = key
187
188         self.pg_enable_capture(self.pg_interfaces)
189         self.pg_start()
190
191         pkts = self.pg1.get_capture(len(pkts))
192         self.verify_capture(self.pg1, pkts)
193         self.pg0.assert_nothing_captured(remark="packets forwarded")
194         self.pg2.assert_nothing_captured(remark="packets forwarded")
195
196     def test_iacl_proto_udp_dport(self):
197         """ IP6 UDP destination port iACL test
198
199         Test scenario for basic protocol ACL with UDP and dport
200             - Create IPv6 stream for pg0 -> pg1 interface.
201             - Create iACL with UDP IP protocol and defined dport.
202             - Send and verify received packets on pg1 interface.
203         """
204
205         # Basic iACL testing with UDP and dport
206         dport = 427
207         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
208                                   UDP(sport=1234, dport=dport))
209         self.pg0.add_stream(pkts)
210
211         key = 'nh_udp_dport'
212         self.create_classify_table(
213             key, self.build_ip6_mask(nh='ff', dst_port='ffff'))
214         self.create_classify_session(
215             self.acl_tbl_idx.get(key),
216             self.build_ip6_match(nh=socket.IPPROTO_UDP, dst_port=dport))
217         self.input_acl_set_interface(
218             self.pg0, self.acl_tbl_idx.get(key))
219         self.acl_active_table = key
220
221         self.pg_enable_capture(self.pg_interfaces)
222         self.pg_start()
223
224         pkts = self.pg1.get_capture(len(pkts))
225         self.verify_capture(self.pg1, pkts)
226         self.pg0.assert_nothing_captured(remark="packets forwarded")
227         self.pg2.assert_nothing_captured(remark="packets forwarded")
228
229     def test_iacl_proto_udp_sport_dport(self):
230         """ IP6 UDP source and destination ports iACL test
231
232         Test scenario for basic protocol ACL with UDP and sport and dport
233             - Create IPv6 stream for pg0 -> pg1 interface.
234             - Create iACL with UDP IP protocol and defined sport and dport.
235             - Send and verify received packets on pg1 interface.
236         """
237
238         # Basic iACL testing with UDP and sport and dport
239         sport = 13720
240         dport = 9080
241         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
242                                   UDP(sport=sport, dport=dport))
243         self.pg0.add_stream(pkts)
244
245         key = 'nh_udp_ports'
246         self.create_classify_table(
247             key,
248             self.build_ip6_mask(nh='ff', src_port='ffff', dst_port='ffff'))
249         self.create_classify_session(
250             self.acl_tbl_idx.get(key),
251             self.build_ip6_match(nh=socket.IPPROTO_UDP, src_port=sport,
252                                  dst_port=dport))
253         self.input_acl_set_interface(
254             self.pg0, self.acl_tbl_idx.get(key))
255         self.acl_active_table = key
256
257         self.pg_enable_capture(self.pg_interfaces)
258         self.pg_start()
259
260         pkts = self.pg1.get_capture(len(pkts))
261         self.verify_capture(self.pg1, pkts)
262         self.pg0.assert_nothing_captured(remark="packets forwarded")
263         self.pg2.assert_nothing_captured(remark="packets forwarded")
264
265
266 class TestClassifierIP6TCP(TestClassifier):
267     """ Classifier IP6 TCP proto Test Case """
268
269     @classmethod
270     def setUpClass(cls):
271         super(TestClassifierIP6TCP, cls).setUpClass()
272         cls.af = socket.AF_INET6
273
274     def test_iacl_proto_tcp(self):
275         """ IP6 TCP protocol iACL test
276
277         Test scenario for basic protocol ACL with TCP protocol
278             - Create IPv6 stream for pg0 -> pg1 interface.
279             - Create iACL with TCP IP protocol.
280             - Send and verify received packets on pg1 interface.
281         """
282
283         # Basic iACL testing with TCP protocol
284         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
285                                   TCP(sport=1234, dport=5678))
286         self.pg0.add_stream(pkts)
287
288         key = 'nh_tcp'
289         self.create_classify_table(key, self.build_ip6_mask(nh='ff'))
290         self.create_classify_session(
291             self.acl_tbl_idx.get(key),
292             self.build_ip6_match(nh=socket.IPPROTO_TCP))
293         self.input_acl_set_interface(
294             self.pg0, self.acl_tbl_idx.get(key))
295         self.acl_active_table = key
296
297         self.pg_enable_capture(self.pg_interfaces)
298         self.pg_start()
299
300         pkts = self.pg1.get_capture(len(pkts))
301         self.verify_capture(self.pg1, pkts, TCP)
302         self.pg0.assert_nothing_captured(remark="packets forwarded")
303         self.pg2.assert_nothing_captured(remark="packets forwarded")
304
305     def test_iacl_proto_tcp_sport(self):
306         """ IP6 TCP source port iACL test
307
308         Test scenario for basic protocol ACL with TCP and sport
309             - Create IPv6 stream for pg0 -> pg1 interface.
310             - Create iACL with TCP IP protocol and defined sport.
311             - Send and verify received packets on pg1 interface.
312         """
313
314         # Basic iACL testing with TCP and sport
315         sport = 38
316         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
317                                   TCP(sport=sport, dport=5678))
318         self.pg0.add_stream(pkts)
319
320         key = 'nh_tcp_sport'
321         self.create_classify_table(
322             key, self.build_ip6_mask(nh='ff', src_port='ffff'))
323         self.create_classify_session(
324             self.acl_tbl_idx.get(key),
325             self.build_ip6_match(nh=socket.IPPROTO_TCP, src_port=sport))
326         self.input_acl_set_interface(
327             self.pg0, self.acl_tbl_idx.get(key))
328         self.acl_active_table = key
329
330         self.pg_enable_capture(self.pg_interfaces)
331         self.pg_start()
332
333         pkts = self.pg1.get_capture(len(pkts))
334         self.verify_capture(self.pg1, pkts, TCP)
335         self.pg0.assert_nothing_captured(remark="packets forwarded")
336         self.pg2.assert_nothing_captured(remark="packets forwarded")
337
338     def test_iacl_proto_tcp_dport(self):
339         """ IP6 TCP destination port iACL test
340
341         Test scenario for basic protocol ACL with TCP and dport
342             - Create IPv6 stream for pg0 -> pg1 interface.
343             - Create iACL with TCP IP protocol and defined dport.
344             - Send and verify received packets on pg1 interface.
345         """
346
347         # Basic iACL testing with TCP and dport
348         dport = 427
349         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
350                                   TCP(sport=1234, dport=dport))
351         self.pg0.add_stream(pkts)
352
353         key = 'nh_tcp_dport'
354         self.create_classify_table(
355             key, self.build_ip6_mask(nh='ff', dst_port='ffff'))
356         self.create_classify_session(
357             self.acl_tbl_idx.get(key),
358             self.build_ip6_match(nh=socket.IPPROTO_TCP, dst_port=dport))
359         self.input_acl_set_interface(
360             self.pg0, self.acl_tbl_idx.get(key))
361         self.acl_active_table = key
362
363         self.pg_enable_capture(self.pg_interfaces)
364         self.pg_start()
365
366         pkts = self.pg1.get_capture(len(pkts))
367         self.verify_capture(self.pg1, pkts, TCP)
368         self.pg0.assert_nothing_captured(remark="packets forwarded")
369         self.pg2.assert_nothing_captured(remark="packets forwarded")
370
371     def test_iacl_proto_tcp_sport_dport(self):
372         """ IP6 TCP source and destination ports iACL test
373
374         Test scenario for basic protocol ACL with TCP and sport and dport
375             - Create IPv6 stream for pg0 -> pg1 interface.
376             - Create iACL with TCP IP protocol and defined sport and dport.
377             - Send and verify received packets on pg1 interface.
378         """
379
380         # Basic iACL testing with TCP and sport and dport
381         sport = 13720
382         dport = 9080
383         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
384                                   TCP(sport=sport, dport=dport))
385         self.pg0.add_stream(pkts)
386
387         key = 'nh_tcp_ports'
388         self.create_classify_table(
389             key,
390             self.build_ip6_mask(nh='ff', src_port='ffff', dst_port='ffff'))
391         self.create_classify_session(
392             self.acl_tbl_idx.get(key),
393             self.build_ip6_match(nh=socket.IPPROTO_TCP, src_port=sport,
394                                  dst_port=dport))
395         self.input_acl_set_interface(
396             self.pg0, self.acl_tbl_idx.get(key))
397         self.acl_active_table = key
398
399         self.pg_enable_capture(self.pg_interfaces)
400         self.pg_start()
401
402         pkts = self.pg1.get_capture(len(pkts))
403         self.verify_capture(self.pg1, pkts, TCP)
404         self.pg0.assert_nothing_captured(remark="packets forwarded")
405         self.pg2.assert_nothing_captured(remark="packets forwarded")
406
407
408 class TestClassifierIP6Out(TestClassifier):
409     """ Classifier output IP6 Test Case """
410
411     @classmethod
412     def setUpClass(cls):
413         super(TestClassifierIP6Out, cls).setUpClass()
414         cls.af = socket.AF_INET6
415
416     def test_acl_ip_out(self):
417         """ Output IP6 ACL test
418
419         Test scenario for basic IP ACL with source IP
420             - Create IPv6 stream for pg1 -> pg0 interface.
421             - Create ACL with source IP address.
422             - Send and verify received packets on pg0 interface.
423         """
424
425         # Basic oACL testing with source IP
426         pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes)
427         self.pg1.add_stream(pkts)
428
429         key = 'ip6_out'
430         self.create_classify_table(
431             key,
432             self.build_ip6_mask(src_ip='ffffffffffffffffffffffffffffffff'),
433             data_offset=0)
434         self.create_classify_session(
435             self.acl_tbl_idx.get(key),
436             self.build_ip6_match(src_ip=self.pg1.remote_ip6))
437         self.output_acl_set_interface(
438             self.pg0, self.acl_tbl_idx.get(key))
439         self.acl_active_table = key
440
441         self.pg_enable_capture(self.pg_interfaces)
442         self.pg_start()
443
444         pkts = self.pg0.get_capture(len(pkts))
445         self.verify_capture(self.pg0, pkts)
446         self.pg1.assert_nothing_captured(remark="packets forwarded")
447         self.pg2.assert_nothing_captured(remark="packets forwarded")
448
449
450 class TestClassifierIP6MAC(TestClassifier):
451     """ Classifier IP6 MAC Test Case """
452
453     @classmethod
454     def setUpClass(cls):
455         super(TestClassifierIP6MAC, cls).setUpClass()
456         cls.af = socket.AF_INET6
457
458     def test_acl_mac(self):
459         """ IP6 MAC iACL test
460
461         Test scenario for basic MAC ACL with source MAC
462             - Create IPv6 stream for pg0 -> pg2 interface.
463             - Create ACL with source MAC address.
464             - Send and verify received packets on pg2 interface.
465         """
466
467         # Basic iACL testing with source MAC
468         pkts = self.create_stream(self.pg0, self.pg2, self.pg_if_packet_sizes)
469         self.pg0.add_stream(pkts)
470
471         key = 'mac'
472         self.create_classify_table(
473             key, self.build_mac_mask(src_mac='ffffffffffff'), data_offset=-14)
474         self.create_classify_session(
475             self.acl_tbl_idx.get(key),
476             self.build_mac_match(src_mac=self.pg0.remote_mac))
477         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
478         self.acl_active_table = key
479
480         self.pg_enable_capture(self.pg_interfaces)
481         self.pg_start()
482
483         pkts = self.pg2.get_capture(len(pkts))
484         self.verify_capture(self.pg2, pkts)
485         self.pg0.assert_nothing_captured(remark="packets forwarded")
486         self.pg1.assert_nothing_captured(remark="packets forwarded")
487
488
489 if __name__ == '__main__':
490     unittest.main(testRunner=VppTestRunner)