7 from framework import VppTestCase, VppTestRunner
9 from scapy.packet import Raw
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP, UDP, TCP
15 class TestClassifier(VppTestCase):
16 """ Classifier Test Case """
21 Perform standard class setup (defined by class method setUpClass in
22 class VppTestCase) before running the test case, set test case related
23 variables and configure VPP.
25 super(TestClassifier, cls).setUpClass()
26 cls.acl_active_table = ''
30 Perform test setup before test case.
33 - create 4 pg interfaces
34 - untagged pg0/pg1/pg2 interface
35 pg0 -------> pg1 (IP ACL)
41 - put it into UP state
43 - resolve neighbor address using ARP
45 :ivar list interfaces: pg interfaces.
46 :ivar list pg_if_packet_sizes: packet sizes in test.
47 :ivar dict acl_tbl_idx: ACL table index.
48 :ivar int pbr_vrfid: VRF id for PBR test.
50 self.reset_packet_infos()
51 super(TestClassifier, self).setUp()
53 # create 4 pg interfaces
54 self.create_pg_interfaces(range(4))
56 # packet sizes to test
57 self.pg_if_packet_sizes = [64, 9018]
59 self.interfaces = list(self.pg_interfaces)
65 # setup all interfaces
66 for intf in self.interfaces:
72 """Run standard test teardown and acl related log."""
74 self.logger.info(self.vapi.ppcli("show inacl type ip4"))
75 self.logger.info(self.vapi.ppcli("show outacl type ip4"))
76 self.logger.info(self.vapi.cli("show classify table verbose"))
77 self.logger.info(self.vapi.cli("show ip fib"))
78 if self.acl_active_table == 'ip_out':
79 self.output_acl_set_interface(
80 self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
81 self.acl_active_table = ''
82 elif self.acl_active_table != '':
83 self.input_acl_set_interface(
84 self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
85 self.acl_active_table = ''
86 for intf in self.interfaces:
90 super(TestClassifier, self).tearDown()
92 def config_pbr_fib_entry(self, intf, is_add=1):
93 """Configure fib entry to route traffic toward PBR VRF table
95 :param VppInterface intf: destination interface to be routed for PBR.
99 self.vapi.ip_add_del_route(intf.local_ip4n,
102 table_id=self.pbr_vrfid,
105 def create_stream(self, src_if, dst_if, packet_sizes,
106 proto_l=UDP(sport=1234, dport=5678)):
107 """Create input packet stream for defined interfaces.
109 :param VppInterface src_if: Source Interface for packet stream.
110 :param VppInterface dst_if: Destination Interface for packet stream.
111 :param list packet_sizes: packet size to test.
112 :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
116 for size in packet_sizes:
117 info = self.create_packet_info(src_if, dst_if)
118 payload = self.info_to_payload(info)
119 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
120 IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) /
124 self.extend_packet(p, size)
128 def verify_capture(self, dst_if, capture, proto_l=UDP):
129 """Verify captured input packet stream for defined interface.
131 :param VppInterface dst_if: Interface to verify captured packet stream.
132 :param list capture: Captured packet stream.
133 :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
135 self.logger.info("Verifying capture on interface %s" % dst_if.name)
137 for i in self.interfaces:
138 last_info[i.sw_if_index] = None
139 dst_sw_if_index = dst_if.sw_if_index
140 for packet in capture:
142 ip_received = packet[IP]
143 proto_received = packet[proto_l]
144 payload_info = self.payload_to_info(str(packet[Raw]))
145 packet_index = payload_info.index
146 self.assertEqual(payload_info.dst, dst_sw_if_index)
148 "Got packet on port %s: src=%u (id=%u)" %
149 (dst_if.name, payload_info.src, packet_index))
150 next_info = self.get_next_packet_info_for_interface2(
151 payload_info.src, dst_sw_if_index,
152 last_info[payload_info.src])
153 last_info[payload_info.src] = next_info
154 self.assertTrue(next_info is not None)
155 self.assertEqual(packet_index, next_info.index)
156 saved_packet = next_info.data
157 ip_saved = saved_packet[IP]
158 proto_saved = saved_packet[proto_l]
159 # Check standard fields
160 self.assertEqual(ip_received.src, ip_saved.src)
161 self.assertEqual(ip_received.dst, ip_saved.dst)
162 self.assertEqual(proto_received.sport, proto_saved.sport)
163 self.assertEqual(proto_received.dport, proto_saved.dport)
165 self.logger.error(ppp("Unexpected or invalid packet:", packet))
167 for i in self.interfaces:
168 remaining_packet = self.get_next_packet_info_for_interface2(
169 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
170 self.assertTrue(remaining_packet is None,
171 "Interface %s: Packet expected from interface %s "
172 "didn't arrive" % (dst_if.name, i.name))
174 def verify_vrf(self, vrf_id):
176 Check if the FIB table / VRF ID is configured.
178 :param int vrf_id: The FIB table / VRF ID to be verified.
179 :return: 1 if the FIB table / VRF ID is configured, otherwise return 0.
181 ip_fib_dump = self.vapi.ip_fib_dump()
183 for ip_fib_details in ip_fib_dump:
184 if ip_fib_details[2] == vrf_id:
187 self.logger.info("IPv4 VRF ID %d is not configured" % vrf_id)
190 self.logger.info("IPv4 VRF ID %d is configured" % vrf_id)
194 def build_ip_mask(proto='', src_ip='', dst_ip='',
195 src_port='', dst_port=''):
196 """Build IP ACL mask data with hexstring format.
198 :param str proto: protocol number <0-ff>
199 :param str src_ip: source ip address <0-ffffffff>
200 :param str dst_ip: destination ip address <0-ffffffff>
201 :param str src_port: source port number <0-ffff>
202 :param str dst_port: destination port number <0-ffff>
205 return ('{:0>20}{:0>12}{:0>8}{:0>4}{:0>4}'.format(
206 proto, src_ip, dst_ip, src_port, dst_port)).rstrip('0')
209 def build_ip_match(proto=0, src_ip='', dst_ip='',
210 src_port=0, dst_port=0):
211 """Build IP ACL match data with hexstring format.
213 :param int proto: protocol number with valid option "x"
214 :param str src_ip: source ip address with format of "x.x.x.x"
215 :param str dst_ip: destination ip address with format of "x.x.x.x"
216 :param int src_port: source port number "x"
217 :param int dst_port: destination port number "x"
220 src_ip = binascii.hexlify(socket.inet_aton(src_ip))
222 dst_ip = binascii.hexlify(socket.inet_aton(dst_ip))
224 return ('{:0>20}{:0>12}{:0>8}{:0>4}{:0>4}'.format(
225 hex(proto)[2:], src_ip, dst_ip, hex(src_port)[2:],
226 hex(dst_port)[2:])).rstrip('0')
229 def build_mac_mask(dst_mac='', src_mac='', ether_type=''):
230 """Build MAC ACL mask data with hexstring format.
232 :param str dst_mac: source MAC address <0-ffffffffffff>
233 :param str src_mac: destination MAC address <0-ffffffffffff>
234 :param str ether_type: ethernet type <0-ffff>
237 return ('{:0>12}{:0>12}{:0>4}'.format(dst_mac, src_mac,
238 ether_type)).rstrip('0')
241 def build_mac_match(dst_mac='', src_mac='', ether_type=''):
242 """Build MAC ACL match data with hexstring format.
244 :param str dst_mac: source MAC address <x:x:x:x:x:x>
245 :param str src_mac: destination MAC address <x:x:x:x:x:x>
246 :param str ether_type: ethernet type <0-ffff>
249 dst_mac = dst_mac.replace(':', '')
251 src_mac = src_mac.replace(':', '')
253 return ('{:0>12}{:0>12}{:0>4}'.format(dst_mac, src_mac,
254 ether_type)).rstrip('0')
256 def create_classify_table(self, key, mask, data_offset=0):
257 """Create Classify Table
259 :param str key: key for classify table (ex, ACL name).
260 :param str mask: mask value for interested traffic.
261 :param int data_offset:
263 r = self.vapi.classify_add_del_table(
265 mask=binascii.unhexlify(mask),
266 match_n_vectors=(len(mask) - 1) // 32 + 1,
269 current_data_offset=data_offset)
270 self.assertIsNotNone(r, 'No response msg for add_del_table')
271 self.acl_tbl_idx[key] = r.new_table_index
273 def create_classify_session(self, table_index, match, pbr_option=0,
275 """Create Classify Session
277 :param int table_index: table index to identify classify table.
278 :param str match: matched value for interested traffic.
279 :param int pbr_option: enable/disable PBR feature.
280 :param int vrfid: VRF id.
281 :param int is_add: option to configure classify session.
282 - create(1) or delete(0)
284 r = self.vapi.classify_add_del_session(
287 binascii.unhexlify(match),
291 self.assertIsNotNone(r, 'No response msg for add_del_session')
293 def input_acl_set_interface(self, intf, table_index, is_add=1):
294 """Configure Input ACL interface
296 :param VppInterface intf: Interface to apply Input ACL feature.
297 :param int table_index: table index to identify classify table.
298 :param int is_add: option to configure classify session.
299 - enable(1) or disable(0)
301 r = self.vapi.input_acl_set_interface(
304 ip4_table_index=table_index)
305 self.assertIsNotNone(r, 'No response msg for acl_set_interface')
307 def output_acl_set_interface(self, intf, table_index, is_add=1):
308 """Configure Output ACL interface
310 :param VppInterface intf: Interface to apply Output ACL feature.
311 :param int table_index: table index to identify classify table.
312 :param int is_add: option to configure classify session.
313 - enable(1) or disable(0)
315 r = self.vapi.output_acl_set_interface(
318 ip4_table_index=table_index)
319 self.assertIsNotNone(r, 'No response msg for acl_set_interface')
322 # Tests split to different test case classes because of issue reported in
324 class TestClassifierIP(TestClassifier):
325 """ Classifier IP Test Case """
327 def test_iacl_src_ip(self):
328 """ Source IP iACL test
330 Test scenario for basic IP ACL with source IP
331 - Create IPv4 stream for pg0 -> pg1 interface.
332 - Create iACL with source IP address.
333 - Send and verify received packets on pg1 interface.
336 # Basic iACL testing with source IP
337 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
338 self.pg0.add_stream(pkts)
341 self.create_classify_table(key, self.build_ip_mask(src_ip='ffffffff'))
342 self.create_classify_session(
343 self.acl_tbl_idx.get(key),
344 self.build_ip_match(src_ip=self.pg0.remote_ip4))
345 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
346 self.acl_active_table = key
348 self.pg_enable_capture(self.pg_interfaces)
351 pkts = self.pg1.get_capture(len(pkts))
352 self.verify_capture(self.pg1, pkts)
353 self.pg0.assert_nothing_captured(remark="packets forwarded")
354 self.pg2.assert_nothing_captured(remark="packets forwarded")
355 self.pg3.assert_nothing_captured(remark="packets forwarded")
357 def test_iacl_dst_ip(self):
358 """ Destination IP iACL test
360 Test scenario for basic IP ACL with destination IP
361 - Create IPv4 stream for pg0 -> pg1 interface.
362 - Create iACL with destination IP address.
363 - Send and verify received packets on pg1 interface.
366 # Basic iACL testing with destination IP
367 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
368 self.pg0.add_stream(pkts)
371 self.create_classify_table(key, self.build_ip_mask(dst_ip='ffffffff'))
372 self.create_classify_session(
373 self.acl_tbl_idx.get(key),
374 self.build_ip_match(dst_ip=self.pg1.remote_ip4))
375 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
376 self.acl_active_table = key
378 self.pg_enable_capture(self.pg_interfaces)
381 pkts = self.pg1.get_capture(len(pkts))
382 self.verify_capture(self.pg1, pkts)
383 self.pg0.assert_nothing_captured(remark="packets forwarded")
384 self.pg2.assert_nothing_captured(remark="packets forwarded")
385 self.pg3.assert_nothing_captured(remark="packets forwarded")
387 def test_iacl_src_dst_ip(self):
388 """ Source and destination IP iACL test
390 Test scenario for basic IP ACL with source and destination IP
391 - Create IPv4 stream for pg0 -> pg1 interface.
392 - Create iACL with source and destination IP addresses.
393 - Send and verify received packets on pg1 interface.
396 # Basic iACL testing with source and destination IP
397 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
398 self.pg0.add_stream(pkts)
401 self.create_classify_table(
402 key, self.build_ip_mask(src_ip='ffffffff', dst_ip='ffffffff'))
403 self.create_classify_session(
404 self.acl_tbl_idx.get(key),
405 self.build_ip_match(src_ip=self.pg0.remote_ip4,
406 dst_ip=self.pg1.remote_ip4))
407 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
408 self.acl_active_table = key
410 self.pg_enable_capture(self.pg_interfaces)
413 pkts = self.pg1.get_capture(len(pkts))
414 self.verify_capture(self.pg1, pkts)
415 self.pg0.assert_nothing_captured(remark="packets forwarded")
416 self.pg2.assert_nothing_captured(remark="packets forwarded")
417 self.pg3.assert_nothing_captured(remark="packets forwarded")
420 class TestClassifierUDP(TestClassifier):
421 """ Classifier UDP proto Test Case """
423 def test_iacl_proto_udp(self):
424 """ UDP protocol iACL test
426 Test scenario for basic protocol ACL with UDP protocol
427 - Create IPv4 stream for pg0 -> pg1 interface.
428 - Create iACL with UDP IP protocol.
429 - Send and verify received packets on pg1 interface.
432 # Basic iACL testing with UDP protocol
433 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
434 self.pg0.add_stream(pkts)
437 self.create_classify_table(key, self.build_ip_mask(proto='ff'))
438 self.create_classify_session(
439 self.acl_tbl_idx.get(key),
440 self.build_ip_match(proto=socket.IPPROTO_UDP))
441 self.input_acl_set_interface(
442 self.pg0, self.acl_tbl_idx.get(key))
443 self.acl_active_table = key
445 self.pg_enable_capture(self.pg_interfaces)
448 pkts = self.pg1.get_capture(len(pkts))
449 self.verify_capture(self.pg1, pkts)
450 self.pg0.assert_nothing_captured(remark="packets forwarded")
451 self.pg2.assert_nothing_captured(remark="packets forwarded")
452 self.pg3.assert_nothing_captured(remark="packets forwarded")
454 def test_iacl_proto_udp_sport(self):
455 """ UDP source port iACL test
457 Test scenario for basic protocol ACL with UDP and sport
458 - Create IPv4 stream for pg0 -> pg1 interface.
459 - Create iACL with UDP IP protocol and defined sport.
460 - Send and verify received packets on pg1 interface.
463 # Basic iACL testing with UDP and sport
465 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
466 UDP(sport=sport, dport=5678))
467 self.pg0.add_stream(pkts)
469 key = 'proto_udp_sport'
470 self.create_classify_table(
471 key, self.build_ip_mask(proto='ff', src_port='ffff'))
472 self.create_classify_session(
473 self.acl_tbl_idx.get(key),
474 self.build_ip_match(proto=socket.IPPROTO_UDP, src_port=sport))
475 self.input_acl_set_interface(
476 self.pg0, self.acl_tbl_idx.get(key))
477 self.acl_active_table = key
479 self.pg_enable_capture(self.pg_interfaces)
482 pkts = self.pg1.get_capture(len(pkts))
483 self.verify_capture(self.pg1, pkts)
484 self.pg0.assert_nothing_captured(remark="packets forwarded")
485 self.pg2.assert_nothing_captured(remark="packets forwarded")
486 self.pg3.assert_nothing_captured(remark="packets forwarded")
488 def test_iacl_proto_udp_dport(self):
489 """ UDP destination port iACL test
491 Test scenario for basic protocol ACL with UDP and dport
492 - Create IPv4 stream for pg0 -> pg1 interface.
493 - Create iACL with UDP IP protocol and defined dport.
494 - Send and verify received packets on pg1 interface.
497 # Basic iACL testing with UDP and dport
499 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
500 UDP(sport=1234, dport=dport))
501 self.pg0.add_stream(pkts)
503 key = 'proto_udp_dport'
504 self.create_classify_table(
505 key, self.build_ip_mask(proto='ff', dst_port='ffff'))
506 self.create_classify_session(
507 self.acl_tbl_idx.get(key),
508 self.build_ip_match(proto=socket.IPPROTO_UDP, dst_port=dport))
509 self.input_acl_set_interface(
510 self.pg0, self.acl_tbl_idx.get(key))
511 self.acl_active_table = key
513 self.pg_enable_capture(self.pg_interfaces)
516 pkts = self.pg1.get_capture(len(pkts))
517 self.verify_capture(self.pg1, pkts)
518 self.pg0.assert_nothing_captured(remark="packets forwarded")
519 self.pg2.assert_nothing_captured(remark="packets forwarded")
520 self.pg3.assert_nothing_captured(remark="packets forwarded")
522 def test_iacl_proto_udp_sport_dport(self):
523 """ UDP source and destination ports iACL test
525 Test scenario for basic protocol ACL with UDP and sport and dport
526 - Create IPv4 stream for pg0 -> pg1 interface.
527 - Create iACL with UDP IP protocol and defined sport and dport.
528 - Send and verify received packets on pg1 interface.
531 # Basic iACL testing with UDP and sport and dport
534 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
535 UDP(sport=sport, dport=dport))
536 self.pg0.add_stream(pkts)
538 key = 'proto_udp_ports'
539 self.create_classify_table(
541 self.build_ip_mask(proto='ff', src_port='ffff', dst_port='ffff'))
542 self.create_classify_session(
543 self.acl_tbl_idx.get(key),
544 self.build_ip_match(proto=socket.IPPROTO_UDP, src_port=sport,
546 self.input_acl_set_interface(
547 self.pg0, self.acl_tbl_idx.get(key))
548 self.acl_active_table = key
550 self.pg_enable_capture(self.pg_interfaces)
553 pkts = self.pg1.get_capture(len(pkts))
554 self.verify_capture(self.pg1, pkts)
555 self.pg0.assert_nothing_captured(remark="packets forwarded")
556 self.pg2.assert_nothing_captured(remark="packets forwarded")
557 self.pg3.assert_nothing_captured(remark="packets forwarded")
560 class TestClassifierTCP(TestClassifier):
561 """ Classifier TCP proto Test Case """
563 def test_iacl_proto_tcp(self):
564 """ TCP protocol iACL test
566 Test scenario for basic protocol ACL with TCP protocol
567 - Create IPv4 stream for pg0 -> pg1 interface.
568 - Create iACL with TCP IP protocol.
569 - Send and verify received packets on pg1 interface.
572 # Basic iACL testing with TCP protocol
573 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
574 TCP(sport=1234, dport=5678))
575 self.pg0.add_stream(pkts)
578 self.create_classify_table(key, self.build_ip_mask(proto='ff'))
579 self.create_classify_session(
580 self.acl_tbl_idx.get(key),
581 self.build_ip_match(proto=socket.IPPROTO_TCP))
582 self.input_acl_set_interface(
583 self.pg0, self.acl_tbl_idx.get(key))
584 self.acl_active_table = key
586 self.pg_enable_capture(self.pg_interfaces)
589 pkts = self.pg1.get_capture(len(pkts))
590 self.verify_capture(self.pg1, pkts, TCP)
591 self.pg0.assert_nothing_captured(remark="packets forwarded")
592 self.pg2.assert_nothing_captured(remark="packets forwarded")
593 self.pg3.assert_nothing_captured(remark="packets forwarded")
595 def test_iacl_proto_tcp_sport(self):
596 """ TCP source port iACL test
598 Test scenario for basic protocol ACL with TCP and sport
599 - Create IPv4 stream for pg0 -> pg1 interface.
600 - Create iACL with TCP IP protocol and defined sport.
601 - Send and verify received packets on pg1 interface.
604 # Basic iACL testing with TCP and sport
606 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
607 TCP(sport=sport, dport=5678))
608 self.pg0.add_stream(pkts)
610 key = 'proto_tcp_sport'
611 self.create_classify_table(
612 key, self.build_ip_mask(proto='ff', src_port='ffff'))
613 self.create_classify_session(
614 self.acl_tbl_idx.get(key),
615 self.build_ip_match(proto=socket.IPPROTO_TCP, src_port=sport))
616 self.input_acl_set_interface(
617 self.pg0, self.acl_tbl_idx.get(key))
618 self.acl_active_table = key
620 self.pg_enable_capture(self.pg_interfaces)
623 pkts = self.pg1.get_capture(len(pkts))
624 self.verify_capture(self.pg1, pkts, TCP)
625 self.pg0.assert_nothing_captured(remark="packets forwarded")
626 self.pg2.assert_nothing_captured(remark="packets forwarded")
627 self.pg3.assert_nothing_captured(remark="packets forwarded")
629 def test_iacl_proto_tcp_dport(self):
630 """ TCP destination port iACL test
632 Test scenario for basic protocol ACL with TCP and dport
633 - Create IPv4 stream for pg0 -> pg1 interface.
634 - Create iACL with TCP IP protocol and defined dport.
635 - Send and verify received packets on pg1 interface.
638 # Basic iACL testing with TCP and dport
640 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
641 TCP(sport=1234, dport=dport))
642 self.pg0.add_stream(pkts)
644 key = 'proto_tcp_sport'
645 self.create_classify_table(
646 key, self.build_ip_mask(proto='ff', dst_port='ffff'))
647 self.create_classify_session(
648 self.acl_tbl_idx.get(key),
649 self.build_ip_match(proto=socket.IPPROTO_TCP, dst_port=dport))
650 self.input_acl_set_interface(
651 self.pg0, self.acl_tbl_idx.get(key))
652 self.acl_active_table = key
654 self.pg_enable_capture(self.pg_interfaces)
657 pkts = self.pg1.get_capture(len(pkts))
658 self.verify_capture(self.pg1, pkts, TCP)
659 self.pg0.assert_nothing_captured(remark="packets forwarded")
660 self.pg2.assert_nothing_captured(remark="packets forwarded")
661 self.pg3.assert_nothing_captured(remark="packets forwarded")
663 def test_iacl_proto_tcp_sport_dport(self):
664 """ TCP source and destination ports iACL test
666 Test scenario for basic protocol ACL with TCP and sport and dport
667 - Create IPv4 stream for pg0 -> pg1 interface.
668 - Create iACL with TCP IP protocol and defined sport and dport.
669 - Send and verify received packets on pg1 interface.
672 # Basic iACL testing with TCP and sport and dport
675 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
676 TCP(sport=sport, dport=dport))
677 self.pg0.add_stream(pkts)
679 key = 'proto_tcp_ports'
680 self.create_classify_table(
682 self.build_ip_mask(proto='ff', src_port='ffff', dst_port='ffff'))
683 self.create_classify_session(
684 self.acl_tbl_idx.get(key),
685 self.build_ip_match(proto=socket.IPPROTO_TCP, src_port=sport,
687 self.input_acl_set_interface(
688 self.pg0, self.acl_tbl_idx.get(key))
689 self.acl_active_table = key
691 self.pg_enable_capture(self.pg_interfaces)
694 pkts = self.pg1.get_capture(len(pkts))
695 self.verify_capture(self.pg1, pkts, TCP)
696 self.pg0.assert_nothing_captured(remark="packets forwarded")
697 self.pg2.assert_nothing_captured(remark="packets forwarded")
698 self.pg3.assert_nothing_captured(remark="packets forwarded")
701 class TestClassifierIPOut(TestClassifier):
702 """ Classifier output IP Test Case """
704 def test_acl_ip_out(self):
705 """ Output IP ACL test
707 Test scenario for basic IP ACL with source IP
708 - Create IPv4 stream for pg1 -> pg0 interface.
709 - Create ACL with source IP address.
710 - Send and verify received packets on pg0 interface.
713 # Basic oACL testing with source IP
714 pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes)
715 self.pg1.add_stream(pkts)
718 self.create_classify_table(
719 key, self.build_ip_mask(src_ip='ffffffff'), data_offset=0)
720 self.create_classify_session(
721 self.acl_tbl_idx.get(key),
722 self.build_ip_match(src_ip=self.pg1.remote_ip4))
723 self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
724 self.acl_active_table = key
726 self.pg_enable_capture(self.pg_interfaces)
729 pkts = self.pg0.get_capture(len(pkts))
730 self.verify_capture(self.pg0, pkts)
731 self.pg1.assert_nothing_captured(remark="packets forwarded")
732 self.pg2.assert_nothing_captured(remark="packets forwarded")
733 self.pg3.assert_nothing_captured(remark="packets forwarded")
736 class TestClassifierMAC(TestClassifier):
737 """ Classifier MAC Test Case """
739 def test_acl_mac(self):
742 Test scenario for basic MAC ACL with source MAC
743 - Create IPv4 stream for pg0 -> pg2 interface.
744 - Create ACL with source MAC address.
745 - Send and verify received packets on pg2 interface.
748 # Basic iACL testing with source MAC
749 pkts = self.create_stream(self.pg0, self.pg2, self.pg_if_packet_sizes)
750 self.pg0.add_stream(pkts)
753 self.create_classify_table(
754 key, self.build_mac_mask(src_mac='ffffffffffff'), data_offset=-14)
755 self.create_classify_session(
756 self.acl_tbl_idx.get(key),
757 self.build_mac_match(src_mac=self.pg0.remote_mac))
758 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
759 self.acl_active_table = key
761 self.pg_enable_capture(self.pg_interfaces)
764 pkts = self.pg2.get_capture(len(pkts))
765 self.verify_capture(self.pg2, pkts)
766 self.pg0.assert_nothing_captured(remark="packets forwarded")
767 self.pg1.assert_nothing_captured(remark="packets forwarded")
768 self.pg3.assert_nothing_captured(remark="packets forwarded")
771 class TestClassifierPBR(TestClassifier):
772 """ Classifier PBR Test Case """
774 def test_acl_pbr(self):
777 Test scenario for PBR with source IP
778 - Create IPv4 stream for pg0 -> pg3 interface.
779 - Configure PBR fib entry for packet forwarding.
780 - Send and verify received packets on pg3 interface.
783 # PBR testing with source IP
784 pkts = self.create_stream(self.pg0, self.pg3, self.pg_if_packet_sizes)
785 self.pg0.add_stream(pkts)
788 self.create_classify_table(key, self.build_ip_mask(src_ip='ffffffff'))
790 # this will create the VRF/table in which we will insert the route
791 self.create_classify_session(
792 self.acl_tbl_idx.get(key),
793 self.build_ip_match(src_ip=self.pg0.remote_ip4),
794 pbr_option, self.pbr_vrfid)
795 self.assertTrue(self.verify_vrf(self.pbr_vrfid))
796 self.config_pbr_fib_entry(self.pg3)
797 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
799 self.pg_enable_capture(self.pg_interfaces)
802 pkts = self.pg3.get_capture(len(pkts))
803 self.verify_capture(self.pg3, pkts)
804 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key), 0)
805 self.pg0.assert_nothing_captured(remark="packets forwarded")
806 self.pg1.assert_nothing_captured(remark="packets forwarded")
807 self.pg2.assert_nothing_captured(remark="packets forwarded")
809 # remove the classify session and the route
810 self.config_pbr_fib_entry(self.pg3, is_add=0)
811 self.create_classify_session(
812 self.acl_tbl_idx.get(key),
813 self.build_ip_match(src_ip=self.pg0.remote_ip4),
814 pbr_option, self.pbr_vrfid, is_add=0)
816 # and the table should be gone.
817 self.assertFalse(self.verify_vrf(self.pbr_vrfid))
819 if __name__ == '__main__':
820 unittest.main(testRunner=VppTestRunner)