2 """ Classifier-based L2 ACL Test Case HLD:
11 from scapy.packet import Raw
12 from scapy.layers.l2 import Ether
13 from scapy.layers.inet import IP, TCP, UDP, ICMP
14 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
15 from scapy.layers.inet6 import IPv6ExtHdrFragment
16 from framework import VppTestCase, VppTestRunner
17 from util import Host, ppp
20 class TestClassifyAcl(VppTestCase):
21 """ Classifier-based L2 input and output ACL Test Case """
37 proto = [[6, 17], [1, 58]]
38 proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
50 udp_sport_to = udp_sport_from + 5
51 udp_dport_from = 20000
52 udp_dport_to = udp_dport_from + 5000
54 tcp_sport_to = tcp_sport_from + 5
55 tcp_dport_from = 40000
56 tcp_dport_to = tcp_dport_from + 5000
59 udp_sport_to_2 = udp_sport_from_2 + 5
60 udp_dport_from_2 = 30000
61 udp_dport_to_2 = udp_dport_from_2 + 5000
62 tcp_sport_from_2 = 130
63 tcp_sport_to_2 = tcp_sport_from_2 + 5
64 tcp_dport_from_2 = 20000
65 tcp_dport_to_2 = tcp_dport_from_2 + 5000
67 icmp4_type = 8 # echo request
69 icmp6_type = 128 # echo request
85 Perform standard class setup (defined by class method setUpClass in
86 class VppTestCase) before running the test case, set test case related
87 variables and configure VPP.
89 super(TestClassifyAcl, cls).setUpClass()
92 # Create 2 pg interfaces
93 cls.create_pg_interfaces(range(2))
95 # Packet flows mapping pg0 -> pg1, pg2 etc.
97 cls.flows[cls.pg0] = [cls.pg1]
100 cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
102 # Create BD with MAC learning and unknown unicast flooding disabled
103 # and put interfaces to this BD
104 cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
106 for pg_if in cls.pg_interfaces:
107 cls.vapi.sw_interface_set_l2_bridge(pg_if.sw_if_index,
110 # Set up all interfaces
111 for i in cls.pg_interfaces:
114 # Mapping between packet-generator index and lists of test hosts
115 cls.hosts_by_pg_idx = dict()
116 for pg_if in cls.pg_interfaces:
117 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
119 # Create list of deleted hosts
120 cls.deleted_hosts_by_pg_idx = dict()
121 for pg_if in cls.pg_interfaces:
122 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
124 # warm-up the mac address tables
128 super(TestClassifyAcl, cls).tearDownClass()
132 super(TestClassifyAcl, self).setUp()
134 self.acl_tbl_idx = {}
135 self.reset_packet_infos()
139 Show various debug prints after each test.
141 super(TestClassifyAcl, self).tearDown()
142 if not self.vpp_dead:
143 self.logger.info(self.vapi.ppcli("show inacl type l2"))
144 self.logger.info(self.vapi.ppcli("show outacl type l2"))
145 self.logger.info(self.vapi.ppcli("show classify tables verbose"))
146 self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
150 def build_ip_mask(proto='', src_ip='', dst_ip='',
151 src_port='', dst_port=''):
152 """Build IP ACL mask data with hexstring format
154 :param str proto: protocol number <0-ff>
155 :param str src_ip: source ip address <0-ffffffff>
156 :param str dst_ip: destination ip address <0-ffffffff>
157 :param str src_port: source port number <0-ffff>
158 :param str dst_port: destination port number <0-ffff>
161 return ('{:0>20}{:0>12}{:0>8}{:0>12}{:0>4}'.format(
162 proto, src_ip, dst_ip, src_port, dst_port)).rstrip('0')
165 def build_ip_match(proto='', src_ip='', dst_ip='',
166 src_port='', dst_port=''):
167 """Build IP ACL match data with hexstring format
169 :param str proto: protocol number with valid option "<0-ff>"
170 :param str src_ip: source ip address with format of "x.x.x.x"
171 :param str dst_ip: destination ip address with format of "x.x.x.x"
172 :param str src_port: source port number <0-ffff>
173 :param str dst_port: destination port number <0-ffff>
176 src_ip = socket.inet_aton(src_ip).encode('hex')
178 dst_ip = socket.inet_aton(dst_ip).encode('hex')
180 return ('{:0>20}{:0>12}{:0>8}{:0>12}{:0>4}'.format(
181 proto, src_ip, dst_ip, src_port, dst_port)).rstrip('0')
184 def build_mac_mask(dst_mac='', src_mac='', ether_type=''):
185 """Build MAC ACL mask data with hexstring format
187 :param str dst_mac: source MAC address <0-ffffffffffff>
188 :param str src_mac: destination MAC address <0-ffffffffffff>
189 :param str ether_type: ethernet type <0-ffff>
192 return ('{:0>12}{:0>12}{:0>4}'.format(dst_mac, src_mac,
193 ether_type)).rstrip('0')
196 def build_mac_match(dst_mac='', src_mac='', ether_type=''):
197 """Build MAC ACL match data with hexstring format
199 :param str dst_mac: source MAC address <x:x:x:x:x:x>
200 :param str src_mac: destination MAC address <x:x:x:x:x:x>
201 :param str ether_type: ethernet type <0-ffff>
204 dst_mac = dst_mac.replace(':', '')
206 src_mac = src_mac.replace(':', '')
208 return ('{:0>12}{:0>12}{:0>4}'.format(dst_mac, src_mac,
209 ether_type)).rstrip('0')
211 def create_classify_table(self, key, mask, data_offset=0, is_add=1):
212 """Create Classify Table
214 :param str key: key for classify table (ex, ACL name).
215 :param str mask: mask value for interested traffic.
216 :param int match_n_vectors:
217 :param int is_add: option to configure classify table.
218 - create(1) or delete(0)
220 r = self.vapi.classify_add_del_table(
222 binascii.unhexlify(mask),
223 match_n_vectors=(len(mask) - 1) // 32 + 1,
226 current_data_offset=data_offset)
227 self.assertIsNotNone(r, msg='No response msg for add_del_table')
228 self.acl_tbl_idx[key] = r.new_table_index
230 def create_classify_session(self, intf, table_index, match,
231 hit_next_index=0xffffffff, is_add=1):
232 """Create Classify Session
234 :param VppInterface intf: Interface to apply classify session.
235 :param int table_index: table index to identify classify table.
236 :param str match: matched value for interested traffic.
237 :param int pbr_action: enable/disable PBR feature.
238 :param int vrfid: VRF id.
239 :param int is_add: option to configure classify session.
240 - create(1) or delete(0)
242 r = self.vapi.classify_add_del_session(
245 binascii.unhexlify(match),
246 hit_next_index=hit_next_index)
247 self.assertIsNotNone(r, msg='No response msg for add_del_session')
249 def input_acl_set_interface(self, intf, table_index, is_add=1):
250 """Configure Input ACL interface
252 :param VppInterface intf: Interface to apply Input ACL feature.
253 :param int table_index: table index to identify classify table.
254 :param int is_add: option to configure classify session.
255 - enable(1) or disable(0)
257 r = self.vapi.input_acl_set_interface(
260 l2_table_index=table_index)
261 self.assertIsNotNone(r, msg='No response msg for acl_set_interface')
263 def output_acl_set_interface(self, intf, table_index, is_add=1):
264 """Configure Output ACL interface
266 :param VppInterface intf: Interface to apply Output ACL feature.
267 :param int table_index: table index to identify classify table.
268 :param int is_add: option to configure classify session.
269 - enable(1) or disable(0)
271 r = self.vapi.output_acl_set_interface(
274 l2_table_index=table_index)
275 self.assertIsNotNone(r, msg='No response msg for acl_set_interface')
277 def create_hosts(self, count, start=0):
279 Create required number of host MAC addresses and distribute them among
280 interfaces. Create host IPv4 address for every host MAC address.
282 :param int count: Number of hosts to create MAC/IPv4 addresses for.
283 :param int start: Number to start numbering from.
285 n_int = len(self.pg_interfaces)
286 macs_per_if = count / n_int
288 for pg_if in self.pg_interfaces:
290 start_nr = macs_per_if * i + start
291 end_nr = count + start if i == (n_int - 1) \
292 else macs_per_if * (i + 1) + start
293 hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
294 for j in range(start_nr, end_nr):
296 "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
297 "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
298 "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
301 def create_upper_layer(self, packet_index, proto, ports=0):
302 p = self.proto_map[proto]
305 return UDP(sport=random.randint(self.udp_sport_from,
307 dport=random.randint(self.udp_dport_from,
310 return UDP(sport=ports, dport=ports)
313 return TCP(sport=random.randint(self.tcp_sport_from,
315 dport=random.randint(self.tcp_dport_from,
318 return TCP(sport=ports, dport=ports)
321 def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
322 proto=-1, ports=0, fragments=False,
323 pkt_raw=True, etype=-1):
325 Create input packet stream for defined interface using hosts or
328 :param object src_if: Interface to create packet stream for.
329 :param list packet_sizes: List of required packet sizes.
330 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
331 :return: Stream of packets.
334 if self.flows.__contains__(src_if):
335 src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
336 for dst_if in self.flows[src_if]:
337 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
338 n_int = len(dst_hosts) * len(src_hosts)
339 for i in range(0, n_int):
340 dst_host = dst_hosts[i / len(src_hosts)]
341 src_host = src_hosts[i % len(src_hosts)]
342 pkt_info = self.create_packet_info(src_if, dst_if)
348 pkt_info.ip = random.choice([0, 1])
350 pkt_info.proto = random.choice(self.proto[self.IP])
352 pkt_info.proto = proto
353 payload = self.info_to_payload(pkt_info)
354 p = Ether(dst=dst_host.mac, src=src_host.mac)
356 p = Ether(dst=dst_host.mac,
360 p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
362 p /= IPv6ExtHdrFragment(offset=64, m=1)
365 p /= IP(src=src_host.ip4, dst=dst_host.ip4,
368 p /= IP(src=src_host.ip4, dst=dst_host.ip4)
369 if traffic_type == self.ICMP:
371 p /= ICMPv6EchoRequest(type=self.icmp6_type,
372 code=self.icmp6_code)
374 p /= ICMP(type=self.icmp4_type,
375 code=self.icmp4_code)
377 p /= self.create_upper_layer(i, pkt_info.proto, ports)
380 pkt_info.data = p.copy()
382 size = random.choice(packet_sizes)
383 self.extend_packet(p, size)
387 def verify_capture(self, pg_if, capture,
388 traffic_type=0, ip_type=0, etype=-1):
390 Verify captured input packet stream for defined interface.
392 :param object pg_if: Interface to verify captured packet stream for.
393 :param list capture: Captured packet stream.
394 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
397 for i in self.pg_interfaces:
398 last_info[i.sw_if_index] = None
399 dst_sw_if_index = pg_if.sw_if_index
400 for packet in capture:
402 if packet[Ether].type != etype:
403 self.logger.error(ppp("Unexpected ethertype in packet:",
408 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
409 if traffic_type == self.ICMP and ip_type == self.IPV6:
410 payload_info = self.payload_to_info(
411 packet[ICMPv6EchoRequest].data)
412 payload = packet[ICMPv6EchoRequest]
414 payload_info = self.payload_to_info(str(packet[Raw]))
415 payload = packet[self.proto_map[payload_info.proto]]
417 self.logger.error(ppp("Unexpected or invalid packet "
418 "(outside network):", packet))
422 self.assertEqual(payload_info.ip, ip_type)
423 if traffic_type == self.ICMP:
425 if payload_info.ip == 0:
426 self.assertEqual(payload.type, self.icmp4_type)
427 self.assertEqual(payload.code, self.icmp4_code)
429 self.assertEqual(payload.type, self.icmp6_type)
430 self.assertEqual(payload.code, self.icmp6_code)
432 self.logger.error(ppp("Unexpected or invalid packet "
433 "(outside network):", packet))
437 ip_version = IPv6 if payload_info.ip == 1 else IP
439 ip = packet[ip_version]
440 packet_index = payload_info.index
442 self.assertEqual(payload_info.dst, dst_sw_if_index)
443 self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
444 (pg_if.name, payload_info.src,
446 next_info = self.get_next_packet_info_for_interface2(
447 payload_info.src, dst_sw_if_index,
448 last_info[payload_info.src])
449 last_info[payload_info.src] = next_info
450 self.assertTrue(next_info is not None)
451 self.assertEqual(packet_index, next_info.index)
452 saved_packet = next_info.data
453 # Check standard fields
454 self.assertEqual(ip.src, saved_packet[ip_version].src)
455 self.assertEqual(ip.dst, saved_packet[ip_version].dst)
456 p = self.proto_map[payload_info.proto]
459 self.assertEqual(tcp.sport, saved_packet[
461 self.assertEqual(tcp.dport, saved_packet[
465 self.assertEqual(udp.sport, saved_packet[
467 self.assertEqual(udp.dport, saved_packet[
470 self.logger.error(ppp("Unexpected or invalid packet:",
473 for i in self.pg_interfaces:
474 remaining_packet = self.get_next_packet_info_for_interface2(
475 i, dst_sw_if_index, last_info[i.sw_if_index])
477 remaining_packet is None,
478 "Port %u: Packet expected from source %u didn't arrive" %
479 (dst_sw_if_index, i.sw_if_index))
481 def run_traffic_no_check(self):
483 # Create incoming packet streams for packet-generator interfaces
484 for i in self.pg_interfaces:
485 if self.flows.__contains__(i):
486 pkts = self.create_stream(i, self.pg_if_packet_sizes)
490 # Enable packet capture and start packet sending
491 self.pg_enable_capture(self.pg_interfaces)
494 def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
495 frags=False, pkt_raw=True, etype=-1):
497 # Create incoming packet streams for packet-generator interfaces
499 for i in self.pg_interfaces:
500 if self.flows.__contains__(i):
501 pkts = self.create_stream(i, self.pg_if_packet_sizes,
502 traffic_type, ip_type, proto, ports,
503 frags, pkt_raw, etype)
506 pkts_cnt += len(pkts)
508 # Enable packet capture and start packet sendingself.IPV
509 self.pg_enable_capture(self.pg_interfaces)
513 # Verify outgoing packet streams per packet-generator interface
514 for src_if in self.pg_interfaces:
515 if self.flows.__contains__(src_if):
516 for dst_if in self.flows[src_if]:
517 capture = dst_if.get_capture(pkts_cnt)
518 self.logger.info("Verifying capture on interface %s" %
520 self.verify_capture(dst_if, capture,
521 traffic_type, ip_type, etype)
523 def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
524 ports=0, frags=False, etype=-1):
526 self.reset_packet_infos()
527 for i in self.pg_interfaces:
528 if self.flows.__contains__(i):
529 pkts = self.create_stream(i, self.pg_if_packet_sizes,
530 traffic_type, ip_type, proto, ports,
535 # Enable packet capture and start packet sending
536 self.pg_enable_capture(self.pg_interfaces)
540 # Verify outgoing packet streams per packet-generator interface
541 for src_if in self.pg_interfaces:
542 if self.flows.__contains__(src_if):
543 for dst_if in self.flows[src_if]:
544 self.logger.info("Verifying capture on interface %s" %
546 capture = dst_if.get_capture(0)
547 self.assertEqual(len(capture), 0)
549 def build_classify_table(self, hit_next_index=0xffffffff):
550 # Basic ACL testing with source MAC
551 a_mask = self.build_mac_mask(src_mac='ffffffffffff')
552 self.create_classify_table('ip', a_mask)
553 for host in self.hosts_by_pg_idx[self.pg0.sw_if_index]:
554 self.create_classify_session(
555 self.pg0, self.acl_tbl_idx.get('ip'),
556 self.build_mac_match(src_mac=host.mac),
557 hit_next_index=hit_next_index)
559 def test_0000_warmup_test(self):
560 """ Learn the MAC addresses
563 self.run_traffic_no_check()
565 def test_0010_inacl_permit(self):
566 """ Input L2 ACL test - permit
568 Test scenario for basic IP ACL with source IP
569 - Create IPv4 stream for pg0 -> pg1 interface.
570 - Create ACL with source MAC address.
571 - Send and verify received packets on pg1 interface.
573 self.build_classify_table()
574 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get('ip'))
575 self.run_verify_test(self.IP, self.IPV4, -1)
576 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get('ip'), 0)
578 def test_0015_inacl_deny(self):
579 """ Input L2 ACL test - deny
581 Test scenario for basic IP ACL with source IP
582 - Create IPv4 stream for pg0 -> pg1 interface.
583 - Create ACL with source MAC address.
584 - Send and verify no received packets on pg1 interface.
586 self.build_classify_table(hit_next_index=0)
587 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get('ip'))
588 self.run_verify_negat_test(self.IP, self.IPV4, -1)
589 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get('ip'), 0)
591 def test_0020_outacl_permit(self):
592 """ Output L2 ACL test - permit
594 Test scenario for basic IP ACL with source IP
595 - Create IPv4 stream for pg0 -> pg1 interface.
596 - Create ACL with source MAC address.
597 - Send and verify received packets on pg1 interface.
599 self.build_classify_table()
600 self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get('ip'))
601 self.run_verify_test(self.IP, self.IPV4, -1)
602 self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get('ip'), 0)
604 def test_0025_outacl_deny(self):
605 """ Output L2 ACL test - deny
607 Test scenario for basic IP ACL with source IP
608 - Create IPv4 stream for pg0 -> pg1 interface.
609 - Create ACL with source MAC address.
610 - Send and verify no received packets on pg1 interface.
612 self.build_classify_table(hit_next_index=0)
613 self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get('ip'))
614 self.run_verify_negat_test(self.IP, self.IPV4, -1)
615 self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get('ip'), 0)
617 def test_0030_inoutacl_permit(self):
618 """ Input+Output L2 ACL test - permit
620 Test scenario for basic IP ACL with source IP
621 - Create IPv4 stream for pg0 -> pg1 interface.
622 - Create ACLs with source MAC address.
623 - Send and verify received packets on pg1 interface.
625 self.build_classify_table()
626 self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get('ip'))
627 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get('ip'))
628 self.run_verify_test(self.IP, self.IPV4, -1)
629 self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get('ip'), 0)
630 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get('ip'), 0)
632 if __name__ == '__main__':
633 unittest.main(testRunner=VppTestRunner)