7 from framework import VppTestCase, VppTestRunner
9 from scapy.packet import Raw
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP, UDP, TCP
13 from template_classifier import TestClassifier
14 from vpp_ip_route import VppIpRoute, VppRoutePath
15 from vpp_ip import INVALID_INDEX
18 # Tests split to different test case classes because of issue reported in
20 class TestClassifierIP(TestClassifier):
21 """ Classifier IP Test Case """
25 super(TestClassifierIP, cls).setUpClass()
28 def tearDownClass(cls):
29 super(TestClassifierIP, cls).tearDownClass()
31 def test_iacl_src_ip(self):
32 """ Source IP iACL test
34 Test scenario for basic IP ACL with source IP
35 - Create IPv4 stream for pg0 -> pg1 interface.
36 - Create iACL with source IP address.
37 - Send and verify received packets on pg1 interface.
40 # Basic iACL testing with source IP
41 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
42 self.pg0.add_stream(pkts)
45 self.create_classify_table(key, self.build_ip_mask(src_ip='ffffffff'))
46 self.create_classify_session(
47 self.acl_tbl_idx.get(key),
48 self.build_ip_match(src_ip=self.pg0.remote_ip4))
49 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
50 self.acl_active_table = key
52 self.pg_enable_capture(self.pg_interfaces)
55 pkts = self.pg1.get_capture(len(pkts))
56 self.verify_capture(self.pg1, pkts)
57 self.pg0.assert_nothing_captured(remark="packets forwarded")
58 self.pg2.assert_nothing_captured(remark="packets forwarded")
59 self.pg3.assert_nothing_captured(remark="packets forwarded")
61 def test_iacl_dst_ip(self):
62 """ Destination IP iACL test
64 Test scenario for basic IP ACL with destination IP
65 - Create IPv4 stream for pg0 -> pg1 interface.
66 - Create iACL with destination IP address.
67 - Send and verify received packets on pg1 interface.
70 # Basic iACL testing with destination IP
71 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
72 self.pg0.add_stream(pkts)
75 self.create_classify_table(key, self.build_ip_mask(dst_ip='ffffffff'))
76 self.create_classify_session(
77 self.acl_tbl_idx.get(key),
78 self.build_ip_match(dst_ip=self.pg1.remote_ip4))
79 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
80 self.acl_active_table = key
82 self.pg_enable_capture(self.pg_interfaces)
85 pkts = self.pg1.get_capture(len(pkts))
86 self.verify_capture(self.pg1, pkts)
87 self.pg0.assert_nothing_captured(remark="packets forwarded")
88 self.pg2.assert_nothing_captured(remark="packets forwarded")
89 self.pg3.assert_nothing_captured(remark="packets forwarded")
91 def test_iacl_src_dst_ip(self):
92 """ Source and destination IP iACL test
94 Test scenario for basic IP ACL with source and destination IP
95 - Create IPv4 stream for pg0 -> pg1 interface.
96 - Create iACL with source and destination IP addresses.
97 - Send and verify received packets on pg1 interface.
100 # Basic iACL testing with source and destination IP
101 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
102 self.pg0.add_stream(pkts)
105 self.create_classify_table(
106 key, self.build_ip_mask(src_ip='ffffffff', dst_ip='ffffffff'))
107 self.create_classify_session(
108 self.acl_tbl_idx.get(key),
109 self.build_ip_match(src_ip=self.pg0.remote_ip4,
110 dst_ip=self.pg1.remote_ip4))
111 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
112 self.acl_active_table = key
114 self.pg_enable_capture(self.pg_interfaces)
117 pkts = self.pg1.get_capture(len(pkts))
118 self.verify_capture(self.pg1, pkts)
119 self.pg0.assert_nothing_captured(remark="packets forwarded")
120 self.pg2.assert_nothing_captured(remark="packets forwarded")
121 self.pg3.assert_nothing_captured(remark="packets forwarded")
124 class TestClassifierUDP(TestClassifier):
125 """ Classifier UDP proto Test Case """
129 super(TestClassifierUDP, cls).setUpClass()
132 def tearDownClass(cls):
133 super(TestClassifierUDP, cls).tearDownClass()
135 def test_iacl_proto_udp(self):
136 """ UDP protocol iACL test
138 Test scenario for basic protocol ACL with UDP protocol
139 - Create IPv4 stream for pg0 -> pg1 interface.
140 - Create iACL with UDP IP protocol.
141 - Send and verify received packets on pg1 interface.
144 # Basic iACL testing with UDP protocol
145 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
146 self.pg0.add_stream(pkts)
149 self.create_classify_table(key, self.build_ip_mask(proto='ff'))
150 self.create_classify_session(
151 self.acl_tbl_idx.get(key),
152 self.build_ip_match(proto=socket.IPPROTO_UDP))
153 self.input_acl_set_interface(
154 self.pg0, self.acl_tbl_idx.get(key))
155 self.acl_active_table = key
157 self.pg_enable_capture(self.pg_interfaces)
160 pkts = self.pg1.get_capture(len(pkts))
161 self.verify_capture(self.pg1, pkts)
162 self.pg0.assert_nothing_captured(remark="packets forwarded")
163 self.pg2.assert_nothing_captured(remark="packets forwarded")
164 self.pg3.assert_nothing_captured(remark="packets forwarded")
166 def test_iacl_proto_udp_sport(self):
167 """ UDP source port iACL test
169 Test scenario for basic protocol ACL with UDP and sport
170 - Create IPv4 stream for pg0 -> pg1 interface.
171 - Create iACL with UDP IP protocol and defined sport.
172 - Send and verify received packets on pg1 interface.
175 # Basic iACL testing with UDP and sport
177 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
178 UDP(sport=sport, dport=5678))
179 self.pg0.add_stream(pkts)
181 key = 'proto_udp_sport'
182 self.create_classify_table(
183 key, self.build_ip_mask(proto='ff', src_port='ffff'))
184 self.create_classify_session(
185 self.acl_tbl_idx.get(key),
186 self.build_ip_match(proto=socket.IPPROTO_UDP, src_port=sport))
187 self.input_acl_set_interface(
188 self.pg0, self.acl_tbl_idx.get(key))
189 self.acl_active_table = key
191 self.pg_enable_capture(self.pg_interfaces)
194 pkts = self.pg1.get_capture(len(pkts))
195 self.verify_capture(self.pg1, pkts)
196 self.pg0.assert_nothing_captured(remark="packets forwarded")
197 self.pg2.assert_nothing_captured(remark="packets forwarded")
198 self.pg3.assert_nothing_captured(remark="packets forwarded")
200 def test_iacl_proto_udp_dport(self):
201 """ UDP destination port iACL test
203 Test scenario for basic protocol ACL with UDP and dport
204 - Create IPv4 stream for pg0 -> pg1 interface.
205 - Create iACL with UDP IP protocol and defined dport.
206 - Send and verify received packets on pg1 interface.
209 # Basic iACL testing with UDP and dport
211 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
212 UDP(sport=1234, dport=dport))
213 self.pg0.add_stream(pkts)
215 key = 'proto_udp_dport'
216 self.create_classify_table(
217 key, self.build_ip_mask(proto='ff', dst_port='ffff'))
218 self.create_classify_session(
219 self.acl_tbl_idx.get(key),
220 self.build_ip_match(proto=socket.IPPROTO_UDP, dst_port=dport))
221 self.input_acl_set_interface(
222 self.pg0, self.acl_tbl_idx.get(key))
223 self.acl_active_table = key
225 self.pg_enable_capture(self.pg_interfaces)
228 pkts = self.pg1.get_capture(len(pkts))
229 self.verify_capture(self.pg1, pkts)
230 self.pg0.assert_nothing_captured(remark="packets forwarded")
231 self.pg2.assert_nothing_captured(remark="packets forwarded")
232 self.pg3.assert_nothing_captured(remark="packets forwarded")
234 def test_iacl_proto_udp_sport_dport(self):
235 """ UDP source and destination ports iACL test
237 Test scenario for basic protocol ACL with UDP and sport and dport
238 - Create IPv4 stream for pg0 -> pg1 interface.
239 - Create iACL with UDP IP protocol and defined sport and dport.
240 - Send and verify received packets on pg1 interface.
243 # Basic iACL testing with UDP and sport and dport
246 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
247 UDP(sport=sport, dport=dport))
248 self.pg0.add_stream(pkts)
250 key = 'proto_udp_ports'
251 self.create_classify_table(
253 self.build_ip_mask(proto='ff', src_port='ffff', dst_port='ffff'))
254 self.create_classify_session(
255 self.acl_tbl_idx.get(key),
256 self.build_ip_match(proto=socket.IPPROTO_UDP, src_port=sport,
258 self.input_acl_set_interface(
259 self.pg0, self.acl_tbl_idx.get(key))
260 self.acl_active_table = key
262 self.pg_enable_capture(self.pg_interfaces)
265 pkts = self.pg1.get_capture(len(pkts))
266 self.verify_capture(self.pg1, pkts)
267 self.pg0.assert_nothing_captured(remark="packets forwarded")
268 self.pg2.assert_nothing_captured(remark="packets forwarded")
269 self.pg3.assert_nothing_captured(remark="packets forwarded")
272 class TestClassifierTCP(TestClassifier):
273 """ Classifier TCP proto Test Case """
277 super(TestClassifierTCP, cls).setUpClass()
280 def tearDownClass(cls):
281 super(TestClassifierTCP, cls).tearDownClass()
283 def test_iacl_proto_tcp(self):
284 """ TCP protocol iACL test
286 Test scenario for basic protocol ACL with TCP protocol
287 - Create IPv4 stream for pg0 -> pg1 interface.
288 - Create iACL with TCP IP protocol.
289 - Send and verify received packets on pg1 interface.
292 # Basic iACL testing with TCP protocol
293 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
294 TCP(sport=1234, dport=5678))
295 self.pg0.add_stream(pkts)
298 self.create_classify_table(key, self.build_ip_mask(proto='ff'))
299 self.create_classify_session(
300 self.acl_tbl_idx.get(key),
301 self.build_ip_match(proto=socket.IPPROTO_TCP))
302 self.input_acl_set_interface(
303 self.pg0, self.acl_tbl_idx.get(key))
304 self.acl_active_table = key
306 self.pg_enable_capture(self.pg_interfaces)
309 pkts = self.pg1.get_capture(len(pkts))
310 self.verify_capture(self.pg1, pkts, TCP)
311 self.pg0.assert_nothing_captured(remark="packets forwarded")
312 self.pg2.assert_nothing_captured(remark="packets forwarded")
313 self.pg3.assert_nothing_captured(remark="packets forwarded")
315 def test_iacl_proto_tcp_sport(self):
316 """ TCP source port iACL test
318 Test scenario for basic protocol ACL with TCP and sport
319 - Create IPv4 stream for pg0 -> pg1 interface.
320 - Create iACL with TCP IP protocol and defined sport.
321 - Send and verify received packets on pg1 interface.
324 # Basic iACL testing with TCP and sport
326 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
327 TCP(sport=sport, dport=5678))
328 self.pg0.add_stream(pkts)
330 key = 'proto_tcp_sport'
331 self.create_classify_table(
332 key, self.build_ip_mask(proto='ff', src_port='ffff'))
333 self.create_classify_session(
334 self.acl_tbl_idx.get(key),
335 self.build_ip_match(proto=socket.IPPROTO_TCP, src_port=sport))
336 self.input_acl_set_interface(
337 self.pg0, self.acl_tbl_idx.get(key))
338 self.acl_active_table = key
340 self.pg_enable_capture(self.pg_interfaces)
343 pkts = self.pg1.get_capture(len(pkts))
344 self.verify_capture(self.pg1, pkts, TCP)
345 self.pg0.assert_nothing_captured(remark="packets forwarded")
346 self.pg2.assert_nothing_captured(remark="packets forwarded")
347 self.pg3.assert_nothing_captured(remark="packets forwarded")
349 def test_iacl_proto_tcp_dport(self):
350 """ TCP destination port iACL test
352 Test scenario for basic protocol ACL with TCP and dport
353 - Create IPv4 stream for pg0 -> pg1 interface.
354 - Create iACL with TCP IP protocol and defined dport.
355 - Send and verify received packets on pg1 interface.
358 # Basic iACL testing with TCP and dport
360 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
361 TCP(sport=1234, dport=dport))
362 self.pg0.add_stream(pkts)
364 key = 'proto_tcp_sport'
365 self.create_classify_table(
366 key, self.build_ip_mask(proto='ff', dst_port='ffff'))
367 self.create_classify_session(
368 self.acl_tbl_idx.get(key),
369 self.build_ip_match(proto=socket.IPPROTO_TCP, dst_port=dport))
370 self.input_acl_set_interface(
371 self.pg0, self.acl_tbl_idx.get(key))
372 self.acl_active_table = key
374 self.pg_enable_capture(self.pg_interfaces)
377 pkts = self.pg1.get_capture(len(pkts))
378 self.verify_capture(self.pg1, pkts, TCP)
379 self.pg0.assert_nothing_captured(remark="packets forwarded")
380 self.pg2.assert_nothing_captured(remark="packets forwarded")
381 self.pg3.assert_nothing_captured(remark="packets forwarded")
383 def test_iacl_proto_tcp_sport_dport(self):
384 """ TCP source and destination ports iACL test
386 Test scenario for basic protocol ACL with TCP and sport and dport
387 - Create IPv4 stream for pg0 -> pg1 interface.
388 - Create iACL with TCP IP protocol and defined sport and dport.
389 - Send and verify received packets on pg1 interface.
392 # Basic iACL testing with TCP and sport and dport
395 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
396 TCP(sport=sport, dport=dport))
397 self.pg0.add_stream(pkts)
399 key = 'proto_tcp_ports'
400 self.create_classify_table(
402 self.build_ip_mask(proto='ff', src_port='ffff', dst_port='ffff'))
403 self.create_classify_session(
404 self.acl_tbl_idx.get(key),
405 self.build_ip_match(proto=socket.IPPROTO_TCP, src_port=sport,
407 self.input_acl_set_interface(
408 self.pg0, self.acl_tbl_idx.get(key))
409 self.acl_active_table = key
411 self.pg_enable_capture(self.pg_interfaces)
414 pkts = self.pg1.get_capture(len(pkts))
415 self.verify_capture(self.pg1, pkts, TCP)
416 self.pg0.assert_nothing_captured(remark="packets forwarded")
417 self.pg2.assert_nothing_captured(remark="packets forwarded")
418 self.pg3.assert_nothing_captured(remark="packets forwarded")
421 class TestClassifierIPOut(TestClassifier):
422 """ Classifier output IP Test Case """
426 super(TestClassifierIPOut, cls).setUpClass()
429 def tearDownClass(cls):
430 super(TestClassifierIPOut, cls).tearDownClass()
432 def test_acl_ip_out(self):
433 """ Output IP ACL test
435 Test scenario for basic IP ACL with source IP
436 - Create IPv4 stream for pg1 -> pg0 interface.
437 - Create ACL with source IP address.
438 - Send and verify received packets on pg0 interface.
441 # Basic oACL testing with source IP
442 pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes)
443 self.pg1.add_stream(pkts)
446 self.create_classify_table(
447 key, self.build_ip_mask(src_ip='ffffffff'), data_offset=0)
448 self.create_classify_session(
449 self.acl_tbl_idx.get(key),
450 self.build_ip_match(src_ip=self.pg1.remote_ip4))
451 self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
452 self.acl_active_table = key
454 self.pg_enable_capture(self.pg_interfaces)
457 pkts = self.pg0.get_capture(len(pkts))
458 self.verify_capture(self.pg0, pkts)
459 self.pg1.assert_nothing_captured(remark="packets forwarded")
460 self.pg2.assert_nothing_captured(remark="packets forwarded")
461 self.pg3.assert_nothing_captured(remark="packets forwarded")
464 class TestClassifierMAC(TestClassifier):
465 """ Classifier MAC Test Case """
469 super(TestClassifierMAC, cls).setUpClass()
472 def tearDownClass(cls):
473 super(TestClassifierMAC, cls).tearDownClass()
475 def test_acl_mac(self):
478 Test scenario for basic MAC ACL with source MAC
479 - Create IPv4 stream for pg0 -> pg2 interface.
480 - Create ACL with source MAC address.
481 - Send and verify received packets on pg2 interface.
484 # Basic iACL testing with source MAC
485 pkts = self.create_stream(self.pg0, self.pg2, self.pg_if_packet_sizes)
486 self.pg0.add_stream(pkts)
489 self.create_classify_table(
490 key, self.build_mac_mask(src_mac='ffffffffffff'), data_offset=-14)
491 self.create_classify_session(
492 self.acl_tbl_idx.get(key),
493 self.build_mac_match(src_mac=self.pg0.remote_mac))
494 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
495 self.acl_active_table = key
497 self.pg_enable_capture(self.pg_interfaces)
500 pkts = self.pg2.get_capture(len(pkts))
501 self.verify_capture(self.pg2, pkts)
502 self.pg0.assert_nothing_captured(remark="packets forwarded")
503 self.pg1.assert_nothing_captured(remark="packets forwarded")
504 self.pg3.assert_nothing_captured(remark="packets forwarded")
507 class TestClassifierPBR(TestClassifier):
508 """ Classifier PBR Test Case """
512 super(TestClassifierPBR, cls).setUpClass()
515 def tearDownClass(cls):
516 super(TestClassifierPBR, cls).tearDownClass()
518 def test_acl_pbr(self):
521 Test scenario for PBR with source IP
522 - Create IPv4 stream for pg0 -> pg3 interface.
523 - Configure PBR fib entry for packet forwarding.
524 - Send and verify received packets on pg3 interface.
527 # PBR testing with source IP
528 pkts = self.create_stream(self.pg0, self.pg3, self.pg_if_packet_sizes)
529 self.pg0.add_stream(pkts)
532 self.create_classify_table(key, self.build_ip_mask(src_ip='ffffffff'))
534 # this will create the VRF/table in which we will insert the route
535 self.create_classify_session(
536 self.acl_tbl_idx.get(key),
537 self.build_ip_match(src_ip=self.pg0.remote_ip4),
538 pbr_option, self.pbr_vrfid)
539 self.assertTrue(self.verify_vrf(self.pbr_vrfid))
540 r = VppIpRoute(self, self.pg3.local_ip4, 24,
541 [VppRoutePath(self.pg3.remote_ip4,
543 table_id=self.pbr_vrfid)
546 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
548 self.pg_enable_capture(self.pg_interfaces)
551 pkts = self.pg3.get_capture(len(pkts))
552 self.verify_capture(self.pg3, pkts)
553 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key), 0)
554 self.pg0.assert_nothing_captured(remark="packets forwarded")
555 self.pg1.assert_nothing_captured(remark="packets forwarded")
556 self.pg2.assert_nothing_captured(remark="packets forwarded")
558 # remove the classify session and the route
559 r.remove_vpp_config()
560 self.create_classify_session(
561 self.acl_tbl_idx.get(key),
562 self.build_ip_match(src_ip=self.pg0.remote_ip4),
563 pbr_option, self.pbr_vrfid, is_add=0)
565 # and the table should be gone.
566 self.assertFalse(self.verify_vrf(self.pbr_vrfid))
568 if __name__ == '__main__':
569 unittest.main(testRunner=VppTestRunner)