2 """ Classifier-based L2 ACL Test Case HLD:
11 from scapy.packet import Raw
12 from scapy.data import ETH_P_IP
13 from scapy.layers.l2 import Ether
14 from scapy.layers.inet import IP, TCP, UDP, ICMP
15 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
16 from scapy.layers.inet6 import IPv6ExtHdrFragment
17 from framework import VppTestCase, VppTestRunner
18 from util import Host, ppp
19 from template_classifier import TestClassifier
22 class TestClassifyAcl(TestClassifier):
23 """Classifier-based L2 input and output ACL Test Case"""
39 proto = [[6, 17], [1, 58]]
40 proto_map = {1: "ICMP", 58: "ICMPv6EchoRequest", 6: "TCP", 17: "UDP"}
52 udp_sport_to = udp_sport_from + 5
53 udp_dport_from = 20000
54 udp_dport_to = udp_dport_from + 5000
56 tcp_sport_to = tcp_sport_from + 5
57 tcp_dport_from = 40000
58 tcp_dport_to = tcp_dport_from + 5000
61 udp_sport_to_2 = udp_sport_from_2 + 5
62 udp_dport_from_2 = 30000
63 udp_dport_to_2 = udp_dport_from_2 + 5000
64 tcp_sport_from_2 = 130
65 tcp_sport_to_2 = tcp_sport_from_2 + 5
66 tcp_dport_from_2 = 20000
67 tcp_dport_to_2 = tcp_dport_from_2 + 5000
69 icmp4_type = 8 # echo request
71 icmp6_type = 128 # echo request
87 Perform standard class setup (defined by class method setUpClass in
88 class VppTestCase) before running the test case, set test case related
89 variables and configure VPP.
91 super(TestClassifyAcl, cls).setUpClass()
95 # Create 2 pg interfaces
96 cls.create_pg_interfaces(range(2))
98 # Packet flows mapping pg0 -> pg1, pg2 etc.
100 cls.flows[cls.pg0] = [cls.pg1]
103 cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
105 # Create BD with MAC learning and unknown unicast flooding disabled
106 # and put interfaces to this BD
107 cls.vapi.bridge_domain_add_del_v2(
108 bd_id=cls.bd_id, uu_flood=1, learn=1, flood=1, forward=1, is_add=1
110 for pg_if in cls.pg_interfaces:
111 cls.vapi.sw_interface_set_l2_bridge(
112 rx_sw_if_index=pg_if.sw_if_index, bd_id=cls.bd_id
115 # Set up all interfaces
116 for i in cls.pg_interfaces:
119 # Mapping between packet-generator index and lists of test hosts
120 cls.hosts_by_pg_idx = dict()
121 for pg_if in cls.pg_interfaces:
122 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
124 # Create list of deleted hosts
125 cls.deleted_hosts_by_pg_idx = dict()
126 for pg_if in cls.pg_interfaces:
127 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
129 # warm-up the mac address tables
132 # Holder of the active classify table key
133 cls.acl_active_table = ""
136 super(TestClassifyAcl, cls).tearDownClass()
140 def tearDownClass(cls):
141 super(TestClassifyAcl, cls).tearDownClass()
144 super(TestClassifyAcl, self).setUp()
145 self.acl_tbl_idx = {}
149 Show various debug prints after each test.
151 if not self.vpp_dead:
152 if self.acl_active_table == "mac_inout":
153 self.output_acl_set_interface(
154 self.pg1, self.acl_tbl_idx.get(self.acl_active_table), 0
156 self.input_acl_set_interface(
157 self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0
159 self.acl_active_table = ""
160 elif self.acl_active_table == "mac_out":
161 self.output_acl_set_interface(
162 self.pg1, self.acl_tbl_idx.get(self.acl_active_table), 0
164 self.acl_active_table = ""
165 elif self.acl_active_table == "mac_in":
166 self.input_acl_set_interface(
167 self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0
169 self.acl_active_table = ""
171 super(TestClassifyAcl, self).tearDown()
173 def create_classify_session(
174 self, intf, table_index, match, hit_next_index=0xFFFFFFFF, is_add=1
176 """Create Classify Session
178 :param VppInterface intf: Interface to apply classify session.
179 :param int table_index: table index to identify classify table.
180 :param str match: matched value for interested traffic.
181 :param int is_add: option to configure classify session.
182 - create(1) or delete(0)
184 mask_match, mask_match_len = self._resolve_mask_match(match)
185 r = self.vapi.classify_add_del_session(
187 table_index=table_index,
189 match_len=mask_match_len,
190 hit_next_index=hit_next_index,
192 self.assertIsNotNone(r, "No response msg for add_del_session")
194 def create_hosts(self, count, start=0):
196 Create required number of host MAC addresses and distribute them among
197 interfaces. Create host IPv4 address for every host MAC address.
199 :param int count: Number of hosts to create MAC/IPv4 addresses for.
200 :param int start: Number to start numbering from.
202 n_int = len(self.pg_interfaces)
203 macs_per_if = count // n_int
205 for pg_if in self.pg_interfaces:
207 start_nr = macs_per_if * i + start
209 count + start if i == (n_int - 1) else macs_per_if * (i + 1) + start
211 hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
212 for j in range(start_nr, end_nr):
214 "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
215 "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
216 "2017:dead:%02x::%u" % (pg_if.sw_if_index, j),
220 def create_upper_layer(self, packet_index, proto, ports=0):
221 p = self.proto_map[proto]
225 sport=random.randint(self.udp_sport_from, self.udp_sport_to),
226 dport=random.randint(self.udp_dport_from, self.udp_dport_to),
229 return UDP(sport=ports, dport=ports)
233 sport=random.randint(self.tcp_sport_from, self.tcp_sport_to),
234 dport=random.randint(self.tcp_dport_from, self.tcp_dport_to),
237 return TCP(sport=ports, dport=ports)
253 Create input packet stream for defined interface using hosts or
256 :param object src_if: Interface to create packet stream for.
257 :param list packet_sizes: List of required packet sizes.
258 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
259 :return: Stream of packets.
262 if self.flows.__contains__(src_if):
263 src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
264 for dst_if in self.flows[src_if]:
265 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
266 n_int = len(dst_hosts) * len(src_hosts)
267 for i in range(0, n_int):
268 dst_host = dst_hosts[i // len(src_hosts)]
269 src_host = src_hosts[i % len(src_hosts)]
270 pkt_info = self.create_packet_info(src_if, dst_if)
276 pkt_info.ip = random.choice([0, 1])
278 pkt_info.proto = random.choice(self.proto[self.IP])
280 pkt_info.proto = proto
281 payload = self.info_to_payload(pkt_info)
282 p = Ether(dst=dst_host.mac, src=src_host.mac)
284 p = Ether(dst=dst_host.mac, src=src_host.mac, type=etype)
286 p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
288 p /= IPv6ExtHdrFragment(offset=64, m=1)
292 src=src_host.ip4, dst=dst_host.ip4, flags=1, frag=64
295 p /= IP(src=src_host.ip4, dst=dst_host.ip4)
296 if traffic_type == self.ICMP:
298 p /= ICMPv6EchoRequest(
299 type=self.icmp6_type, code=self.icmp6_code
302 p /= ICMP(type=self.icmp4_type, code=self.icmp4_code)
304 p /= self.create_upper_layer(i, pkt_info.proto, ports)
307 pkt_info.data = p.copy()
309 size = random.choice(packet_sizes)
310 self.extend_packet(p, size)
314 def verify_capture(self, pg_if, capture, traffic_type=0, ip_type=0, etype=-1):
316 Verify captured input packet stream for defined interface.
318 :param object pg_if: Interface to verify captured packet stream for.
319 :param list capture: Captured packet stream.
320 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
323 for i in self.pg_interfaces:
324 last_info[i.sw_if_index] = None
325 dst_sw_if_index = pg_if.sw_if_index
326 for packet in capture:
328 if packet[Ether].type != etype:
329 self.logger.error(ppp("Unexpected ethertype in packet:", packet))
333 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
334 if traffic_type == self.ICMP and ip_type == self.IPV6:
335 payload_info = self.payload_to_info(packet[ICMPv6EchoRequest].data)
336 payload = packet[ICMPv6EchoRequest]
338 payload_info = self.payload_to_info(packet[Raw])
339 payload = packet[self.proto_map[payload_info.proto]]
342 ppp("Unexpected or invalid packet (outside network):", packet)
347 self.assertEqual(payload_info.ip, ip_type)
348 if traffic_type == self.ICMP:
350 if payload_info.ip == 0:
351 self.assertEqual(payload.type, self.icmp4_type)
352 self.assertEqual(payload.code, self.icmp4_code)
354 self.assertEqual(payload.type, self.icmp6_type)
355 self.assertEqual(payload.code, self.icmp6_code)
358 ppp("Unexpected or invalid packet (outside network):", packet)
363 ip_version = IPv6 if payload_info.ip == 1 else IP
365 ip = packet[ip_version]
366 packet_index = payload_info.index
368 self.assertEqual(payload_info.dst, dst_sw_if_index)
370 "Got packet on port %s: src=%u (id=%u)"
371 % (pg_if.name, payload_info.src, packet_index)
373 next_info = self.get_next_packet_info_for_interface2(
374 payload_info.src, dst_sw_if_index, last_info[payload_info.src]
376 last_info[payload_info.src] = next_info
377 self.assertTrue(next_info is not None)
378 self.assertEqual(packet_index, next_info.index)
379 saved_packet = next_info.data
380 # Check standard fields
381 self.assertEqual(ip.src, saved_packet[ip_version].src)
382 self.assertEqual(ip.dst, saved_packet[ip_version].dst)
383 p = self.proto_map[payload_info.proto]
386 self.assertEqual(tcp.sport, saved_packet[TCP].sport)
387 self.assertEqual(tcp.dport, saved_packet[TCP].dport)
390 self.assertEqual(udp.sport, saved_packet[UDP].sport)
391 self.assertEqual(udp.dport, saved_packet[UDP].dport)
393 self.logger.error(ppp("Unexpected or invalid packet:", packet))
395 for i in self.pg_interfaces:
396 remaining_packet = self.get_next_packet_info_for_interface2(
397 i, dst_sw_if_index, last_info[i.sw_if_index]
400 remaining_packet is None,
401 "Port %u: Packet expected from source %u didn't arrive"
402 % (dst_sw_if_index, i.sw_if_index),
405 def run_traffic_no_check(self):
407 # Create incoming packet streams for packet-generator interfaces
408 for i in self.pg_interfaces:
409 if self.flows.__contains__(i):
410 pkts = self.create_stream(i, self.pg_if_packet_sizes)
414 # Enable packet capture and start packet sending
415 self.pg_enable_capture(self.pg_interfaces)
429 # Create incoming packet streams for packet-generator interfaces
431 for i in self.pg_interfaces:
432 if self.flows.__contains__(i):
433 pkts = self.create_stream(
435 self.pg_if_packet_sizes,
446 pkts_cnt += len(pkts)
448 # Enable packet capture and start packet sendingself.IPV
449 self.pg_enable_capture(self.pg_interfaces)
453 # Verify outgoing packet streams per packet-generator interface
454 for src_if in self.pg_interfaces:
455 if self.flows.__contains__(src_if):
456 for dst_if in self.flows[src_if]:
457 capture = dst_if.get_capture(pkts_cnt)
458 self.logger.info("Verifying capture on interface %s" % dst_if.name)
459 self.verify_capture(dst_if, capture, traffic_type, ip_type, etype)
461 def run_verify_negat_test(
462 self, traffic_type=0, ip_type=0, proto=-1, ports=0, frags=False, etype=-1
465 self.reset_packet_infos()
466 for i in self.pg_interfaces:
467 if self.flows.__contains__(i):
468 pkts = self.create_stream(
470 self.pg_if_packet_sizes,
482 # Enable packet capture and start packet sending
483 self.pg_enable_capture(self.pg_interfaces)
487 # Verify outgoing packet streams per packet-generator interface
488 for src_if in self.pg_interfaces:
489 if self.flows.__contains__(src_if):
490 for dst_if in self.flows[src_if]:
491 self.logger.info("Verifying capture on interface %s" % dst_if.name)
492 capture = dst_if.get_capture(0)
493 self.assertEqual(len(capture), 0)
495 def build_classify_table(
502 hit_next_index=0xFFFFFFFF,
505 a_mask = self.build_mac_mask(
506 src_mac=src_mac, dst_mac=dst_mac, ether_type=ether_type
508 self.create_classify_table(key, a_mask)
509 for host in self.hosts_by_pg_idx[self.pg0.sw_if_index]:
510 s_mac = host.mac if src_mac else ""
512 for dst_if in self.flows[self.pg0]:
513 for dst_host in self.hosts_by_pg_idx[dst_if.sw_if_index]:
514 self.create_classify_session(
516 self.acl_tbl_idx.get(key),
517 self.build_mac_match(
518 src_mac=s_mac, dst_mac=dst_host.mac, ether_type=etype
520 hit_next_index=hit_next_index,
523 self.create_classify_session(
525 self.acl_tbl_idx.get(key),
526 self.build_mac_match(src_mac=s_mac, dst_mac="", ether_type=etype),
527 hit_next_index=hit_next_index,
530 def test_0000_warmup_test(self):
531 """Learn the MAC addresses"""
533 self.run_traffic_no_check()
535 def test_0010_inacl_permit_src_mac(self):
536 """Input L2 ACL test - permit source MAC
538 Test scenario for basic IP ACL with source IP
539 - Create IPv4 stream for pg0 -> pg1 interface.
540 - Create ACL with source MAC address.
541 - Send and verify received packets on pg1 interface.
544 self.build_classify_table(src_mac="ffffffffffff", key=key)
545 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
546 self.acl_active_table = key
547 self.run_verify_test(self.IP, self.IPV4, -1)
549 def test_0011_inacl_permit_dst_mac(self):
550 """Input L2 ACL test - permit destination MAC
552 Test scenario for basic IP ACL with source IP
553 - Create IPv4 stream for pg0 -> pg1 interface.
554 - Create ACL with destination MAC address.
555 - Send and verify received packets on pg1 interface.
558 self.build_classify_table(dst_mac="ffffffffffff", key=key)
559 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
560 self.acl_active_table = key
561 self.run_verify_test(self.IP, self.IPV4, -1)
563 def test_0012_inacl_permit_src_dst_mac(self):
564 """Input L2 ACL test - permit source and destination MAC
566 Test scenario for basic IP ACL with source IP
567 - Create IPv4 stream for pg0 -> pg1 interface.
568 - Create ACL with source and destination MAC addresses.
569 - Send and verify received packets on pg1 interface.
572 self.build_classify_table(
573 src_mac="ffffffffffff", dst_mac="ffffffffffff", key=key
575 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
576 self.acl_active_table = key
577 self.run_verify_test(self.IP, self.IPV4, -1)
579 def test_0013_inacl_permit_ether_type(self):
580 """Input L2 ACL test - permit ether_type
582 Test scenario for basic IP ACL with source IP
583 - Create IPv4 stream for pg0 -> pg1 interface.
584 - Create ACL with destination MAC address.
585 - Send and verify received packets on pg1 interface.
588 self.build_classify_table(ether_type="ffff", etype=hex(ETH_P_IP)[2:], key=key)
589 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
590 self.acl_active_table = key
591 self.run_verify_test(self.IP, self.IPV4, -1)
593 def test_0015_inacl_deny(self):
594 """Input L2 ACL test - deny
596 Test scenario for basic IP ACL with source IP
597 - Create IPv4 stream for pg0 -> pg1 interface.
599 - Create ACL with source MAC address.
600 - Send and verify no received packets on pg1 interface.
603 self.build_classify_table(src_mac="ffffffffffff", hit_next_index=0, key=key)
604 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
605 self.acl_active_table = key
606 self.run_verify_negat_test(self.IP, self.IPV4, -1)
608 def test_0020_outacl_permit(self):
609 """Output L2 ACL test - permit
611 Test scenario for basic IP ACL with source IP
612 - Create IPv4 stream for pg0 -> pg1 interface.
613 - Create ACL with source MAC address.
614 - Send and verify received packets on pg1 interface.
617 self.build_classify_table(src_mac="ffffffffffff", key=key)
618 self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get(key))
619 self.acl_active_table = key
620 self.run_verify_test(self.IP, self.IPV4, -1)
622 def test_0025_outacl_deny(self):
623 """Output L2 ACL test - deny
625 Test scenario for basic IP ACL with source IP
626 - Create IPv4 stream for pg0 -> pg1 interface.
627 - Create ACL with source MAC address.
628 - Send and verify no received packets on pg1 interface.
631 self.build_classify_table(src_mac="ffffffffffff", hit_next_index=0, key=key)
632 self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get(key))
633 self.acl_active_table = key
634 self.run_verify_negat_test(self.IP, self.IPV4, -1)
636 def test_0030_inoutacl_permit(self):
637 """Input+Output L2 ACL test - permit
639 Test scenario for basic IP ACL with source IP
640 - Create IPv4 stream for pg0 -> pg1 interface.
641 - Create ACLs with source MAC address.
642 - Send and verify received packets on pg1 interface.
645 self.build_classify_table(src_mac="ffffffffffff", key=key)
646 self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get(key))
647 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
648 self.acl_active_table = key
649 self.run_verify_test(self.IP, self.IPV4, -1)
652 if __name__ == "__main__":
653 unittest.main(testRunner=VppTestRunner)