7 from framework import VppTestCase, VppTestRunner
9 from scapy.packet import Raw
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet6 import IPv6, UDP, TCP
13 from template_classifier import TestClassifier
16 class TestClassifierIP6(TestClassifier):
17 """Classifier IP6 Test Case"""
21 super(TestClassifierIP6, cls).setUpClass()
22 cls.af = socket.AF_INET6
25 def tearDownClass(cls):
26 super(TestClassifierIP6, cls).tearDownClass()
28 def test_iacl_src_ip(self):
29 """Source IP6 iACL test
31 Test scenario for basic IP ACL with source IP
32 - Create IPv6 stream for pg0 -> pg1 interface.
33 - Create iACL with source IP address.
34 - Send and verify received packets on pg1 interface.
37 # Basic iACL testing with source IP
38 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
39 self.pg0.add_stream(pkts)
42 self.create_classify_table(
43 key, self.build_ip6_mask(src_ip="ffffffffffffffffffffffffffffffff")
45 self.create_classify_session(
46 self.acl_tbl_idx.get(key), self.build_ip6_match(src_ip=self.pg0.remote_ip6)
48 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
49 self.acl_active_table = key
51 self.pg_enable_capture(self.pg_interfaces)
54 pkts = self.pg1.get_capture(len(pkts))
55 self.verify_capture(self.pg1, pkts)
56 self.pg0.assert_nothing_captured(remark="packets forwarded")
57 self.pg2.assert_nothing_captured(remark="packets forwarded")
59 def test_iacl_dst_ip(self):
60 """Destination IP6 iACL test
62 Test scenario for basic IP ACL with destination IP
63 - Create IPv6 stream for pg0 -> pg1 interface.
64 - Create iACL with destination IP address.
65 - Send and verify received packets on pg1 interface.
68 # Basic iACL testing with destination IP
69 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
70 self.pg0.add_stream(pkts)
73 self.create_classify_table(
74 key, self.build_ip6_mask(dst_ip="ffffffffffffffffffffffffffffffff")
76 self.create_classify_session(
77 self.acl_tbl_idx.get(key), self.build_ip6_match(dst_ip=self.pg1.remote_ip6)
79 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
80 self.acl_active_table = key
82 self.pg_enable_capture(self.pg_interfaces)
85 pkts = self.pg1.get_capture(len(pkts))
86 self.verify_capture(self.pg1, pkts)
87 self.pg0.assert_nothing_captured(remark="packets forwarded")
88 self.pg2.assert_nothing_captured(remark="packets forwarded")
90 def test_iacl_src_dst_ip(self):
91 """Source and destination IP6 iACL test
93 Test scenario for basic IP ACL with source and destination IP
94 - Create IPv4 stream for pg0 -> pg1 interface.
95 - Create iACL with source and destination IP addresses.
96 - Send and verify received packets on pg1 interface.
99 # Basic iACL testing with source and destination IP
100 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
101 self.pg0.add_stream(pkts)
104 self.create_classify_table(
107 src_ip="ffffffffffffffffffffffffffffffff",
108 dst_ip="ffffffffffffffffffffffffffffffff",
111 self.create_classify_session(
112 self.acl_tbl_idx.get(key),
113 self.build_ip6_match(
114 src_ip=self.pg0.remote_ip6, dst_ip=self.pg1.remote_ip6
117 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
118 self.acl_active_table = key
120 self.pg_enable_capture(self.pg_interfaces)
123 pkts = self.pg1.get_capture(len(pkts))
124 self.verify_capture(self.pg1, pkts)
125 self.pg0.assert_nothing_captured(remark="packets forwarded")
126 self.pg2.assert_nothing_captured(remark="packets forwarded")
129 # Tests split to different test case classes because of issue reported in
131 class TestClassifierIP6UDP(TestClassifier):
132 """Classifier IP6 UDP proto Test Case"""
136 super(TestClassifierIP6UDP, cls).setUpClass()
137 cls.af = socket.AF_INET6
139 def test_iacl_proto_udp(self):
140 """IP6 UDP protocol iACL test
142 Test scenario for basic protocol ACL with UDP protocol
143 - Create IPv6 stream for pg0 -> pg1 interface.
144 - Create iACL with UDP IP protocol.
145 - Send and verify received packets on pg1 interface.
148 # Basic iACL testing with UDP protocol
149 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
150 self.pg0.add_stream(pkts)
153 self.create_classify_table(key, self.build_ip6_mask(nh="ff"))
154 self.create_classify_session(
155 self.acl_tbl_idx.get(key), self.build_ip6_match(nh=socket.IPPROTO_UDP)
157 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
158 self.acl_active_table = key
160 self.pg_enable_capture(self.pg_interfaces)
163 pkts = self.pg1.get_capture(len(pkts))
164 self.verify_capture(self.pg1, pkts)
165 self.pg0.assert_nothing_captured(remark="packets forwarded")
166 self.pg2.assert_nothing_captured(remark="packets forwarded")
168 def test_iacl_proto_udp_sport(self):
169 """IP6 UDP source port iACL test
171 Test scenario for basic protocol ACL with UDP and sport
172 - Create IPv6 stream for pg0 -> pg1 interface.
173 - Create iACL with UDP IP protocol and defined sport.
174 - Send and verify received packets on pg1 interface.
177 # Basic iACL testing with UDP and sport
179 pkts = self.create_stream(
180 self.pg0, self.pg1, self.pg_if_packet_sizes, UDP(sport=sport, dport=5678)
182 self.pg0.add_stream(pkts)
185 self.create_classify_table(key, self.build_ip6_mask(nh="ff", src_port="ffff"))
186 self.create_classify_session(
187 self.acl_tbl_idx.get(key),
188 self.build_ip6_match(nh=socket.IPPROTO_UDP, src_port=sport),
190 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
191 self.acl_active_table = key
193 self.pg_enable_capture(self.pg_interfaces)
196 pkts = self.pg1.get_capture(len(pkts))
197 self.verify_capture(self.pg1, pkts)
198 self.pg0.assert_nothing_captured(remark="packets forwarded")
199 self.pg2.assert_nothing_captured(remark="packets forwarded")
201 def test_iacl_proto_udp_dport(self):
202 """IP6 UDP destination port iACL test
204 Test scenario for basic protocol ACL with UDP and dport
205 - Create IPv6 stream for pg0 -> pg1 interface.
206 - Create iACL with UDP IP protocol and defined dport.
207 - Send and verify received packets on pg1 interface.
210 # Basic iACL testing with UDP and dport
212 pkts = self.create_stream(
213 self.pg0, self.pg1, self.pg_if_packet_sizes, UDP(sport=1234, dport=dport)
215 self.pg0.add_stream(pkts)
218 self.create_classify_table(key, self.build_ip6_mask(nh="ff", dst_port="ffff"))
219 self.create_classify_session(
220 self.acl_tbl_idx.get(key),
221 self.build_ip6_match(nh=socket.IPPROTO_UDP, dst_port=dport),
223 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
224 self.acl_active_table = key
226 self.pg_enable_capture(self.pg_interfaces)
229 pkts = self.pg1.get_capture(len(pkts))
230 self.verify_capture(self.pg1, pkts)
231 self.pg0.assert_nothing_captured(remark="packets forwarded")
232 self.pg2.assert_nothing_captured(remark="packets forwarded")
234 def test_iacl_proto_udp_sport_dport(self):
235 """IP6 UDP source and destination ports iACL test
237 Test scenario for basic protocol ACL with UDP and sport and dport
238 - Create IPv6 stream for pg0 -> pg1 interface.
239 - Create iACL with UDP IP protocol and defined sport and dport.
240 - Send and verify received packets on pg1 interface.
243 # Basic iACL testing with UDP and sport and dport
246 pkts = self.create_stream(
247 self.pg0, self.pg1, self.pg_if_packet_sizes, UDP(sport=sport, dport=dport)
249 self.pg0.add_stream(pkts)
252 self.create_classify_table(
253 key, self.build_ip6_mask(nh="ff", src_port="ffff", dst_port="ffff")
255 self.create_classify_session(
256 self.acl_tbl_idx.get(key),
257 self.build_ip6_match(nh=socket.IPPROTO_UDP, src_port=sport, dst_port=dport),
259 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
260 self.acl_active_table = key
262 self.pg_enable_capture(self.pg_interfaces)
265 pkts = self.pg1.get_capture(len(pkts))
266 self.verify_capture(self.pg1, pkts)
267 self.pg0.assert_nothing_captured(remark="packets forwarded")
268 self.pg2.assert_nothing_captured(remark="packets forwarded")
271 class TestClassifierIP6TCP(TestClassifier):
272 """Classifier IP6 TCP proto Test Case"""
276 super(TestClassifierIP6TCP, cls).setUpClass()
277 cls.af = socket.AF_INET6
279 def test_iacl_proto_tcp(self):
280 """IP6 TCP protocol iACL test
282 Test scenario for basic protocol ACL with TCP protocol
283 - Create IPv6 stream for pg0 -> pg1 interface.
284 - Create iACL with TCP IP protocol.
285 - Send and verify received packets on pg1 interface.
288 # Basic iACL testing with TCP protocol
289 pkts = self.create_stream(
290 self.pg0, self.pg1, self.pg_if_packet_sizes, TCP(sport=1234, dport=5678)
292 self.pg0.add_stream(pkts)
295 self.create_classify_table(key, self.build_ip6_mask(nh="ff"))
296 self.create_classify_session(
297 self.acl_tbl_idx.get(key), self.build_ip6_match(nh=socket.IPPROTO_TCP)
299 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
300 self.acl_active_table = key
302 self.pg_enable_capture(self.pg_interfaces)
305 pkts = self.pg1.get_capture(len(pkts))
306 self.verify_capture(self.pg1, pkts, TCP)
307 self.pg0.assert_nothing_captured(remark="packets forwarded")
308 self.pg2.assert_nothing_captured(remark="packets forwarded")
310 def test_iacl_proto_tcp_sport(self):
311 """IP6 TCP source port iACL test
313 Test scenario for basic protocol ACL with TCP and sport
314 - Create IPv6 stream for pg0 -> pg1 interface.
315 - Create iACL with TCP IP protocol and defined sport.
316 - Send and verify received packets on pg1 interface.
319 # Basic iACL testing with TCP and sport
321 pkts = self.create_stream(
322 self.pg0, self.pg1, self.pg_if_packet_sizes, TCP(sport=sport, dport=5678)
324 self.pg0.add_stream(pkts)
327 self.create_classify_table(key, self.build_ip6_mask(nh="ff", src_port="ffff"))
328 self.create_classify_session(
329 self.acl_tbl_idx.get(key),
330 self.build_ip6_match(nh=socket.IPPROTO_TCP, src_port=sport),
332 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
333 self.acl_active_table = key
335 self.pg_enable_capture(self.pg_interfaces)
338 pkts = self.pg1.get_capture(len(pkts))
339 self.verify_capture(self.pg1, pkts, TCP)
340 self.pg0.assert_nothing_captured(remark="packets forwarded")
341 self.pg2.assert_nothing_captured(remark="packets forwarded")
343 def test_iacl_proto_tcp_dport(self):
344 """IP6 TCP destination port iACL test
346 Test scenario for basic protocol ACL with TCP and dport
347 - Create IPv6 stream for pg0 -> pg1 interface.
348 - Create iACL with TCP IP protocol and defined dport.
349 - Send and verify received packets on pg1 interface.
352 # Basic iACL testing with TCP and dport
354 pkts = self.create_stream(
355 self.pg0, self.pg1, self.pg_if_packet_sizes, TCP(sport=1234, dport=dport)
357 self.pg0.add_stream(pkts)
360 self.create_classify_table(key, self.build_ip6_mask(nh="ff", dst_port="ffff"))
361 self.create_classify_session(
362 self.acl_tbl_idx.get(key),
363 self.build_ip6_match(nh=socket.IPPROTO_TCP, dst_port=dport),
365 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
366 self.acl_active_table = key
368 self.pg_enable_capture(self.pg_interfaces)
371 pkts = self.pg1.get_capture(len(pkts))
372 self.verify_capture(self.pg1, pkts, TCP)
373 self.pg0.assert_nothing_captured(remark="packets forwarded")
374 self.pg2.assert_nothing_captured(remark="packets forwarded")
376 def test_iacl_proto_tcp_sport_dport(self):
377 """IP6 TCP source and destination ports iACL test
379 Test scenario for basic protocol ACL with TCP and sport and dport
380 - Create IPv6 stream for pg0 -> pg1 interface.
381 - Create iACL with TCP IP protocol and defined sport and dport.
382 - Send and verify received packets on pg1 interface.
385 # Basic iACL testing with TCP and sport and dport
388 pkts = self.create_stream(
389 self.pg0, self.pg1, self.pg_if_packet_sizes, TCP(sport=sport, dport=dport)
391 self.pg0.add_stream(pkts)
394 self.create_classify_table(
395 key, self.build_ip6_mask(nh="ff", src_port="ffff", dst_port="ffff")
397 self.create_classify_session(
398 self.acl_tbl_idx.get(key),
399 self.build_ip6_match(nh=socket.IPPROTO_TCP, src_port=sport, dst_port=dport),
401 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
402 self.acl_active_table = key
404 self.pg_enable_capture(self.pg_interfaces)
407 pkts = self.pg1.get_capture(len(pkts))
408 self.verify_capture(self.pg1, pkts, TCP)
409 self.pg0.assert_nothing_captured(remark="packets forwarded")
410 self.pg2.assert_nothing_captured(remark="packets forwarded")
413 class TestClassifierIP6Out(TestClassifier):
414 """Classifier output IP6 Test Case"""
418 super(TestClassifierIP6Out, cls).setUpClass()
419 cls.af = socket.AF_INET6
421 def test_acl_ip_out(self):
422 """Output IP6 ACL test
424 Test scenario for basic IP ACL with source IP
425 - Create IPv6 stream for pg1 -> pg0 interface.
426 - Create ACL with source IP address.
427 - Send and verify received packets on pg0 interface.
430 # Basic oACL testing with source IP
431 pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes)
432 self.pg1.add_stream(pkts)
435 self.create_classify_table(
437 self.build_ip6_mask(src_ip="ffffffffffffffffffffffffffffffff"),
440 self.create_classify_session(
441 self.acl_tbl_idx.get(key), self.build_ip6_match(src_ip=self.pg1.remote_ip6)
443 self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
444 self.acl_active_table = key
446 self.pg_enable_capture(self.pg_interfaces)
449 pkts = self.pg0.get_capture(len(pkts))
450 self.verify_capture(self.pg0, pkts)
451 self.pg1.assert_nothing_captured(remark="packets forwarded")
452 self.pg2.assert_nothing_captured(remark="packets forwarded")
455 class TestClassifierIP6MAC(TestClassifier):
456 """Classifier IP6 MAC Test Case"""
460 super(TestClassifierIP6MAC, cls).setUpClass()
461 cls.af = socket.AF_INET6
463 def test_acl_mac(self):
466 Test scenario for basic MAC ACL with source MAC
467 - Create IPv6 stream for pg0 -> pg2 interface.
468 - Create ACL with source MAC address.
469 - Send and verify received packets on pg2 interface.
472 # Basic iACL testing with source MAC
473 pkts = self.create_stream(self.pg0, self.pg2, self.pg_if_packet_sizes)
474 self.pg0.add_stream(pkts)
477 self.create_classify_table(
478 key, self.build_mac_mask(src_mac="ffffffffffff"), data_offset=-14
480 self.create_classify_session(
481 self.acl_tbl_idx.get(key), self.build_mac_match(src_mac=self.pg0.remote_mac)
483 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
484 self.acl_active_table = key
486 self.pg_enable_capture(self.pg_interfaces)
489 pkts = self.pg2.get_capture(len(pkts))
490 self.verify_capture(self.pg2, pkts)
491 self.pg0.assert_nothing_captured(remark="packets forwarded")
492 self.pg1.assert_nothing_captured(remark="packets forwarded")
495 if __name__ == "__main__":
496 unittest.main(testRunner=VppTestRunner)