2 """ Classifier-based L2 ACL Test Case HLD:
11 from scapy.packet import Raw
12 from scapy.data import ETH_P_IP
13 from scapy.layers.l2 import Ether
14 from scapy.layers.inet import IP, TCP, UDP, ICMP
15 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
16 from scapy.layers.inet6 import IPv6ExtHdrFragment
17 from framework import VppTestCase, VppTestRunner
18 from util import Host, ppp
21 class TestClassifyAcl(VppTestCase):
22 """ Classifier-based L2 input and output ACL Test Case """
38 proto = [[6, 17], [1, 58]]
39 proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
51 udp_sport_to = udp_sport_from + 5
52 udp_dport_from = 20000
53 udp_dport_to = udp_dport_from + 5000
55 tcp_sport_to = tcp_sport_from + 5
56 tcp_dport_from = 40000
57 tcp_dport_to = tcp_dport_from + 5000
60 udp_sport_to_2 = udp_sport_from_2 + 5
61 udp_dport_from_2 = 30000
62 udp_dport_to_2 = udp_dport_from_2 + 5000
63 tcp_sport_from_2 = 130
64 tcp_sport_to_2 = tcp_sport_from_2 + 5
65 tcp_dport_from_2 = 20000
66 tcp_dport_to_2 = tcp_dport_from_2 + 5000
68 icmp4_type = 8 # echo request
70 icmp6_type = 128 # echo request
86 Perform standard class setup (defined by class method setUpClass in
87 class VppTestCase) before running the test case, set test case related
88 variables and configure VPP.
90 super(TestClassifyAcl, cls).setUpClass()
93 # Create 2 pg interfaces
94 cls.create_pg_interfaces(range(2))
96 # Packet flows mapping pg0 -> pg1, pg2 etc.
98 cls.flows[cls.pg0] = [cls.pg1]
101 cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
103 # Create BD with MAC learning and unknown unicast flooding disabled
104 # and put interfaces to this BD
105 cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
107 for pg_if in cls.pg_interfaces:
108 cls.vapi.sw_interface_set_l2_bridge(
109 rx_sw_if_index=pg_if.sw_if_index, bd_id=cls.bd_id)
111 # Set up all interfaces
112 for i in cls.pg_interfaces:
115 # Mapping between packet-generator index and lists of test hosts
116 cls.hosts_by_pg_idx = dict()
117 for pg_if in cls.pg_interfaces:
118 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
120 # Create list of deleted hosts
121 cls.deleted_hosts_by_pg_idx = dict()
122 for pg_if in cls.pg_interfaces:
123 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
125 # warm-up the mac address tables
128 # Holder of the active classify table key
129 cls.acl_active_table = ''
132 super(TestClassifyAcl, cls).tearDownClass()
136 def tearDownClass(cls):
137 super(TestClassifyAcl, cls).tearDownClass()
140 super(TestClassifyAcl, self).setUp()
142 self.acl_tbl_idx = {}
143 self.reset_packet_infos()
147 Show various debug prints after each test.
149 if not self.vpp_dead:
150 self.logger.info(self.vapi.ppcli("show inacl type l2"))
151 self.logger.info(self.vapi.ppcli("show outacl type l2"))
152 self.logger.info(self.vapi.ppcli("show classify tables verbose"))
153 self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
155 if self.acl_active_table == 'mac_inout':
156 self.output_acl_set_interface(
157 self.pg1, self.acl_tbl_idx.get(self.acl_active_table), 0)
158 self.input_acl_set_interface(
159 self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
160 self.acl_active_table = ''
161 elif self.acl_active_table == 'mac_out':
162 self.output_acl_set_interface(
163 self.pg1, self.acl_tbl_idx.get(self.acl_active_table), 0)
164 self.acl_active_table = ''
165 elif self.acl_active_table == 'mac_in':
166 self.input_acl_set_interface(
167 self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
168 self.acl_active_table = ''
170 super(TestClassifyAcl, self).tearDown()
173 def build_mac_mask(dst_mac='', src_mac='', ether_type=''):
174 """Build MAC ACL mask data with hexstring format
176 :param str dst_mac: source MAC address <0-ffffffffffff>
177 :param str src_mac: destination MAC address <0-ffffffffffff>
178 :param str ether_type: ethernet type <0-ffff>
181 return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
182 dst_mac, src_mac, ether_type)).rstrip('0')
185 def build_mac_match(dst_mac='', src_mac='', ether_type=''):
186 """Build MAC ACL match data with hexstring format
188 :param str dst_mac: source MAC address <x:x:x:x:x:x>
189 :param str src_mac: destination MAC address <x:x:x:x:x:x>
190 :param str ether_type: ethernet type <0-ffff>
193 dst_mac = dst_mac.replace(':', '')
195 src_mac = src_mac.replace(':', '')
197 return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
198 dst_mac, src_mac, ether_type)).rstrip('0')
200 def create_classify_table(self, key, mask, data_offset=0, is_add=1):
201 """Create Classify Table
203 :param str key: key for classify table (ex, ACL name).
204 :param str mask: mask value for interested traffic.
205 :param int match_n_vectors:
206 :param int is_add: option to configure classify table.
207 - create(1) or delete(0)
209 r = self.vapi.classify_add_del_table(
211 binascii.unhexlify(mask),
212 match_n_vectors=(len(mask) - 1) // 32 + 1,
215 current_data_offset=data_offset)
216 self.assertIsNotNone(r, 'No response msg for add_del_table')
217 self.acl_tbl_idx[key] = r.new_table_index
219 def create_classify_session(self, intf, table_index, match,
220 hit_next_index=0xffffffff, is_add=1):
221 """Create Classify Session
223 :param VppInterface intf: Interface to apply classify session.
224 :param int table_index: table index to identify classify table.
225 :param str match: matched value for interested traffic.
226 :param int pbr_action: enable/disable PBR feature.
227 :param int vrfid: VRF id.
228 :param int is_add: option to configure classify session.
229 - create(1) or delete(0)
231 r = self.vapi.classify_add_del_session(
234 binascii.unhexlify(match),
235 hit_next_index=hit_next_index)
236 self.assertIsNotNone(r, 'No response msg for add_del_session')
238 def input_acl_set_interface(self, intf, table_index, is_add=1):
239 """Configure Input ACL interface
241 :param VppInterface intf: Interface to apply Input ACL feature.
242 :param int table_index: table index to identify classify table.
243 :param int is_add: option to configure classify session.
244 - enable(1) or disable(0)
246 r = self.vapi.input_acl_set_interface(
249 l2_table_index=table_index)
250 self.assertIsNotNone(r, 'No response msg for acl_set_interface')
252 def output_acl_set_interface(self, intf, table_index, is_add=1):
253 """Configure Output ACL interface
255 :param VppInterface intf: Interface to apply Output ACL feature.
256 :param int table_index: table index to identify classify table.
257 :param int is_add: option to configure classify session.
258 - enable(1) or disable(0)
260 r = self.vapi.output_acl_set_interface(
263 l2_table_index=table_index)
264 self.assertIsNotNone(r, 'No response msg for acl_set_interface')
266 def create_hosts(self, count, start=0):
268 Create required number of host MAC addresses and distribute them among
269 interfaces. Create host IPv4 address for every host MAC address.
271 :param int count: Number of hosts to create MAC/IPv4 addresses for.
272 :param int start: Number to start numbering from.
274 n_int = len(self.pg_interfaces)
275 macs_per_if = count / n_int
277 for pg_if in self.pg_interfaces:
279 start_nr = macs_per_if * i + start
280 end_nr = count + start if i == (n_int - 1) \
281 else macs_per_if * (i + 1) + start
282 hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
283 for j in range(start_nr, end_nr):
285 "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
286 "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
287 "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
290 def create_upper_layer(self, packet_index, proto, ports=0):
291 p = self.proto_map[proto]
294 return UDP(sport=random.randint(self.udp_sport_from,
296 dport=random.randint(self.udp_dport_from,
299 return UDP(sport=ports, dport=ports)
302 return TCP(sport=random.randint(self.tcp_sport_from,
304 dport=random.randint(self.tcp_dport_from,
307 return TCP(sport=ports, dport=ports)
310 def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
311 proto=-1, ports=0, fragments=False,
312 pkt_raw=True, etype=-1):
314 Create input packet stream for defined interface using hosts or
317 :param object src_if: Interface to create packet stream for.
318 :param list packet_sizes: List of required packet sizes.
319 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
320 :return: Stream of packets.
323 if self.flows.__contains__(src_if):
324 src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
325 for dst_if in self.flows[src_if]:
326 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
327 n_int = len(dst_hosts) * len(src_hosts)
328 for i in range(0, n_int):
329 dst_host = dst_hosts[i / len(src_hosts)]
330 src_host = src_hosts[i % len(src_hosts)]
331 pkt_info = self.create_packet_info(src_if, dst_if)
337 pkt_info.ip = random.choice([0, 1])
339 pkt_info.proto = random.choice(self.proto[self.IP])
341 pkt_info.proto = proto
342 payload = self.info_to_payload(pkt_info)
343 p = Ether(dst=dst_host.mac, src=src_host.mac)
345 p = Ether(dst=dst_host.mac,
349 p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
351 p /= IPv6ExtHdrFragment(offset=64, m=1)
354 p /= IP(src=src_host.ip4, dst=dst_host.ip4,
357 p /= IP(src=src_host.ip4, dst=dst_host.ip4)
358 if traffic_type == self.ICMP:
360 p /= ICMPv6EchoRequest(type=self.icmp6_type,
361 code=self.icmp6_code)
363 p /= ICMP(type=self.icmp4_type,
364 code=self.icmp4_code)
366 p /= self.create_upper_layer(i, pkt_info.proto, ports)
369 pkt_info.data = p.copy()
371 size = random.choice(packet_sizes)
372 self.extend_packet(p, size)
376 def verify_capture(self, pg_if, capture,
377 traffic_type=0, ip_type=0, etype=-1):
379 Verify captured input packet stream for defined interface.
381 :param object pg_if: Interface to verify captured packet stream for.
382 :param list capture: Captured packet stream.
383 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
386 for i in self.pg_interfaces:
387 last_info[i.sw_if_index] = None
388 dst_sw_if_index = pg_if.sw_if_index
389 for packet in capture:
391 if packet[Ether].type != etype:
392 self.logger.error(ppp("Unexpected ethertype in packet:",
397 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
398 if traffic_type == self.ICMP and ip_type == self.IPV6:
399 payload_info = self.payload_to_info(
400 packet[ICMPv6EchoRequest].data)
401 payload = packet[ICMPv6EchoRequest]
403 payload_info = self.payload_to_info(packet[Raw])
404 payload = packet[self.proto_map[payload_info.proto]]
406 self.logger.error(ppp("Unexpected or invalid packet "
407 "(outside network):", packet))
411 self.assertEqual(payload_info.ip, ip_type)
412 if traffic_type == self.ICMP:
414 if payload_info.ip == 0:
415 self.assertEqual(payload.type, self.icmp4_type)
416 self.assertEqual(payload.code, self.icmp4_code)
418 self.assertEqual(payload.type, self.icmp6_type)
419 self.assertEqual(payload.code, self.icmp6_code)
421 self.logger.error(ppp("Unexpected or invalid packet "
422 "(outside network):", packet))
426 ip_version = IPv6 if payload_info.ip == 1 else IP
428 ip = packet[ip_version]
429 packet_index = payload_info.index
431 self.assertEqual(payload_info.dst, dst_sw_if_index)
432 self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
433 (pg_if.name, payload_info.src,
435 next_info = self.get_next_packet_info_for_interface2(
436 payload_info.src, dst_sw_if_index,
437 last_info[payload_info.src])
438 last_info[payload_info.src] = next_info
439 self.assertTrue(next_info is not None)
440 self.assertEqual(packet_index, next_info.index)
441 saved_packet = next_info.data
442 # Check standard fields
443 self.assertEqual(ip.src, saved_packet[ip_version].src)
444 self.assertEqual(ip.dst, saved_packet[ip_version].dst)
445 p = self.proto_map[payload_info.proto]
448 self.assertEqual(tcp.sport, saved_packet[
450 self.assertEqual(tcp.dport, saved_packet[
454 self.assertEqual(udp.sport, saved_packet[
456 self.assertEqual(udp.dport, saved_packet[
459 self.logger.error(ppp("Unexpected or invalid packet:",
462 for i in self.pg_interfaces:
463 remaining_packet = self.get_next_packet_info_for_interface2(
464 i, dst_sw_if_index, last_info[i.sw_if_index])
466 remaining_packet is None,
467 "Port %u: Packet expected from source %u didn't arrive" %
468 (dst_sw_if_index, i.sw_if_index))
470 def run_traffic_no_check(self):
472 # Create incoming packet streams for packet-generator interfaces
473 for i in self.pg_interfaces:
474 if self.flows.__contains__(i):
475 pkts = self.create_stream(i, self.pg_if_packet_sizes)
479 # Enable packet capture and start packet sending
480 self.pg_enable_capture(self.pg_interfaces)
483 def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
484 frags=False, pkt_raw=True, etype=-1):
486 # Create incoming packet streams for packet-generator interfaces
488 for i in self.pg_interfaces:
489 if self.flows.__contains__(i):
490 pkts = self.create_stream(i, self.pg_if_packet_sizes,
491 traffic_type, ip_type, proto, ports,
492 frags, pkt_raw, etype)
495 pkts_cnt += len(pkts)
497 # Enable packet capture and start packet sendingself.IPV
498 self.pg_enable_capture(self.pg_interfaces)
502 # Verify outgoing packet streams per packet-generator interface
503 for src_if in self.pg_interfaces:
504 if self.flows.__contains__(src_if):
505 for dst_if in self.flows[src_if]:
506 capture = dst_if.get_capture(pkts_cnt)
507 self.logger.info("Verifying capture on interface %s" %
509 self.verify_capture(dst_if, capture,
510 traffic_type, ip_type, etype)
512 def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
513 ports=0, frags=False, etype=-1):
515 self.reset_packet_infos()
516 for i in self.pg_interfaces:
517 if self.flows.__contains__(i):
518 pkts = self.create_stream(i, self.pg_if_packet_sizes,
519 traffic_type, ip_type, proto, ports,
524 # Enable packet capture and start packet sending
525 self.pg_enable_capture(self.pg_interfaces)
529 # Verify outgoing packet streams per packet-generator interface
530 for src_if in self.pg_interfaces:
531 if self.flows.__contains__(src_if):
532 for dst_if in self.flows[src_if]:
533 self.logger.info("Verifying capture on interface %s" %
535 capture = dst_if.get_capture(0)
536 self.assertEqual(len(capture), 0)
538 def build_classify_table(self, src_mac='', dst_mac='', ether_type='',
539 etype='', key='mac', hit_next_index=0xffffffff):
541 a_mask = self.build_mac_mask(src_mac=src_mac, dst_mac=dst_mac,
542 ether_type=ether_type)
543 self.create_classify_table(key, a_mask)
544 for host in self.hosts_by_pg_idx[self.pg0.sw_if_index]:
545 s_mac = host.mac if src_mac else ''
547 for dst_if in self.flows[self.pg0]:
548 for dst_host in self.hosts_by_pg_idx[dst_if.sw_if_index]:
549 self.create_classify_session(
550 self.pg0, self.acl_tbl_idx.get(key),
551 self.build_mac_match(src_mac=s_mac,
552 dst_mac=dst_host.mac,
554 hit_next_index=hit_next_index)
556 self.create_classify_session(
557 self.pg0, self.acl_tbl_idx.get(key),
558 self.build_mac_match(src_mac=s_mac, dst_mac='',
560 hit_next_index=hit_next_index)
562 def test_0000_warmup_test(self):
563 """ Learn the MAC addresses
566 self.run_traffic_no_check()
568 def test_0010_inacl_permit_src_mac(self):
569 """ Input L2 ACL test - permit source MAC
571 Test scenario for basic IP ACL with source IP
572 - Create IPv4 stream for pg0 -> pg1 interface.
573 - Create ACL with source MAC address.
574 - Send and verify received packets on pg1 interface.
577 self.build_classify_table(src_mac='ffffffffffff', key=key)
578 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
579 self.acl_active_table = key
580 self.run_verify_test(self.IP, self.IPV4, -1)
582 def test_0011_inacl_permit_dst_mac(self):
583 """ Input L2 ACL test - permit destination MAC
585 Test scenario for basic IP ACL with source IP
586 - Create IPv4 stream for pg0 -> pg1 interface.
587 - Create ACL with destination MAC address.
588 - Send and verify received packets on pg1 interface.
591 self.build_classify_table(dst_mac='ffffffffffff', key=key)
592 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
593 self.acl_active_table = key
594 self.run_verify_test(self.IP, self.IPV4, -1)
596 def test_0012_inacl_permit_src_dst_mac(self):
597 """ Input L2 ACL test - permit source and destination MAC
599 Test scenario for basic IP ACL with source IP
600 - Create IPv4 stream for pg0 -> pg1 interface.
601 - Create ACL with source and destination MAC addresses.
602 - Send and verify received packets on pg1 interface.
605 self.build_classify_table(
606 src_mac='ffffffffffff', dst_mac='ffffffffffff', key=key)
607 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
608 self.acl_active_table = key
609 self.run_verify_test(self.IP, self.IPV4, -1)
611 def test_0013_inacl_permit_ether_type(self):
612 """ Input L2 ACL test - permit ether_type
614 Test scenario for basic IP ACL with source IP
615 - Create IPv4 stream for pg0 -> pg1 interface.
616 - Create ACL with destination MAC address.
617 - Send and verify received packets on pg1 interface.
620 self.build_classify_table(
621 ether_type='ffff', etype=hex(ETH_P_IP)[2:], key=key)
622 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
623 self.acl_active_table = key
624 self.run_verify_test(self.IP, self.IPV4, -1)
626 def test_0015_inacl_deny(self):
627 """ Input L2 ACL test - deny
629 Test scenario for basic IP ACL with source IP
630 - Create IPv4 stream for pg0 -> pg1 interface.
632 - Create ACL with source MAC address.
633 - Send and verify no received packets on pg1 interface.
636 self.build_classify_table(
637 src_mac='ffffffffffff', hit_next_index=0, key=key)
638 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
639 self.acl_active_table = key
640 self.run_verify_negat_test(self.IP, self.IPV4, -1)
642 def test_0020_outacl_permit(self):
643 """ Output L2 ACL test - permit
645 Test scenario for basic IP ACL with source IP
646 - Create IPv4 stream for pg0 -> pg1 interface.
647 - Create ACL with source MAC address.
648 - Send and verify received packets on pg1 interface.
651 self.build_classify_table(src_mac='ffffffffffff', key=key)
652 self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get(key))
653 self.acl_active_table = key
654 self.run_verify_test(self.IP, self.IPV4, -1)
656 def test_0025_outacl_deny(self):
657 """ Output L2 ACL test - deny
659 Test scenario for basic IP ACL with source IP
660 - Create IPv4 stream for pg0 -> pg1 interface.
661 - Create ACL with source MAC address.
662 - Send and verify no received packets on pg1 interface.
665 self.build_classify_table(
666 src_mac='ffffffffffff', hit_next_index=0, key=key)
667 self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get(key))
668 self.acl_active_table = key
669 self.run_verify_negat_test(self.IP, self.IPV4, -1)
671 def test_0030_inoutacl_permit(self):
672 """ Input+Output L2 ACL test - permit
674 Test scenario for basic IP ACL with source IP
675 - Create IPv4 stream for pg0 -> pg1 interface.
676 - Create ACLs with source MAC address.
677 - Send and verify received packets on pg1 interface.
680 self.build_classify_table(src_mac='ffffffffffff', key=key)
681 self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get(key))
682 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
683 self.acl_active_table = key
684 self.run_verify_test(self.IP, self.IPV4, -1)
686 if __name__ == '__main__':
687 unittest.main(testRunner=VppTestRunner)