2 """ Classifier-based L2 ACL Test Case HLD:
8 from scapy.packet import Raw
9 from scapy.data import ETH_P_IP
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP, TCP, UDP, ICMP
12 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
13 from scapy.layers.inet6 import IPv6ExtHdrFragment
14 from asfframework import VppTestRunner
15 from util import Host, ppp
16 from template_classifier import TestClassifier
19 class TestClassifyAcl(TestClassifier):
20 """Classifier-based L2 input and output ACL Test Case"""
36 proto = [[6, 17], [1, 58]]
37 proto_map = {1: "ICMP", 58: "ICMPv6EchoRequest", 6: "TCP", 17: "UDP"}
49 udp_sport_to = udp_sport_from + 5
50 udp_dport_from = 20000
51 udp_dport_to = udp_dport_from + 5000
53 tcp_sport_to = tcp_sport_from + 5
54 tcp_dport_from = 40000
55 tcp_dport_to = tcp_dport_from + 5000
58 udp_sport_to_2 = udp_sport_from_2 + 5
59 udp_dport_from_2 = 30000
60 udp_dport_to_2 = udp_dport_from_2 + 5000
61 tcp_sport_from_2 = 130
62 tcp_sport_to_2 = tcp_sport_from_2 + 5
63 tcp_dport_from_2 = 20000
64 tcp_dport_to_2 = tcp_dport_from_2 + 5000
66 icmp4_type = 8 # echo request
68 icmp6_type = 128 # echo request
84 Perform standard class setup (defined by class method setUpClass in
85 class VppTestCase) before running the test case, set test case related
86 variables and configure VPP.
88 super(TestClassifyAcl, cls).setUpClass()
92 # Create 2 pg interfaces
93 cls.create_pg_interfaces(range(2))
95 # Packet flows mapping pg0 -> pg1, pg2 etc.
97 cls.flows[cls.pg0] = [cls.pg1]
100 cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
102 # Create BD with MAC learning and unknown unicast flooding disabled
103 # and put interfaces to this BD
104 cls.vapi.bridge_domain_add_del_v2(
105 bd_id=cls.bd_id, uu_flood=1, learn=1, flood=1, forward=1, is_add=1
107 for pg_if in cls.pg_interfaces:
108 cls.vapi.sw_interface_set_l2_bridge(
109 rx_sw_if_index=pg_if.sw_if_index, bd_id=cls.bd_id
112 # Set up all interfaces
113 for i in cls.pg_interfaces:
116 # Mapping between packet-generator index and lists of test hosts
117 cls.hosts_by_pg_idx = dict()
118 for pg_if in cls.pg_interfaces:
119 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
121 # Create list of deleted hosts
122 cls.deleted_hosts_by_pg_idx = dict()
123 for pg_if in cls.pg_interfaces:
124 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
126 # warm-up the mac address tables
129 # Holder of the active classify table key
130 cls.acl_active_table = ""
133 super(TestClassifyAcl, cls).tearDownClass()
137 def tearDownClass(cls):
138 super(TestClassifyAcl, cls).tearDownClass()
141 super(TestClassifyAcl, self).setUp()
142 self.acl_tbl_idx = {}
146 Show various debug prints after each test.
148 if not self.vpp_dead:
149 if self.acl_active_table == "mac_inout":
150 self.output_acl_set_interface(
151 self.pg1, self.acl_tbl_idx.get(self.acl_active_table), 0
153 self.input_acl_set_interface(
154 self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0
156 self.acl_active_table = ""
157 elif self.acl_active_table == "mac_out":
158 self.output_acl_set_interface(
159 self.pg1, self.acl_tbl_idx.get(self.acl_active_table), 0
161 self.acl_active_table = ""
162 elif self.acl_active_table == "mac_in":
163 self.input_acl_set_interface(
164 self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0
166 self.acl_active_table = ""
168 super(TestClassifyAcl, self).tearDown()
170 def create_classify_session(
171 self, intf, table_index, match, hit_next_index=0xFFFFFFFF, is_add=1
173 """Create Classify Session
175 :param VppInterface intf: Interface to apply classify session.
176 :param int table_index: table index to identify classify table.
177 :param str match: matched value for interested traffic.
178 :param int is_add: option to configure classify session.
179 - create(1) or delete(0)
181 mask_match, mask_match_len = self._resolve_mask_match(match)
182 r = self.vapi.classify_add_del_session(
184 table_index=table_index,
186 match_len=mask_match_len,
187 hit_next_index=hit_next_index,
189 self.assertIsNotNone(r, "No response msg for add_del_session")
191 def create_hosts(self, count, start=0):
193 Create required number of host MAC addresses and distribute them among
194 interfaces. Create host IPv4 address for every host MAC address.
196 :param int count: Number of hosts to create MAC/IPv4 addresses for.
197 :param int start: Number to start numbering from.
199 n_int = len(self.pg_interfaces)
200 macs_per_if = count // n_int
202 for pg_if in self.pg_interfaces:
204 start_nr = macs_per_if * i + start
206 count + start if i == (n_int - 1) else macs_per_if * (i + 1) + start
208 hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
209 for j in range(start_nr, end_nr):
211 "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
212 "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
213 "2017:dead:%02x::%u" % (pg_if.sw_if_index, j),
217 def create_upper_layer(self, packet_index, proto, ports=0):
218 p = self.proto_map[proto]
222 sport=random.randint(self.udp_sport_from, self.udp_sport_to),
223 dport=random.randint(self.udp_dport_from, self.udp_dport_to),
226 return UDP(sport=ports, dport=ports)
230 sport=random.randint(self.tcp_sport_from, self.tcp_sport_to),
231 dport=random.randint(self.tcp_dport_from, self.tcp_dport_to),
234 return TCP(sport=ports, dport=ports)
250 Create input packet stream for defined interface using hosts or
253 :param object src_if: Interface to create packet stream for.
254 :param list packet_sizes: List of required packet sizes.
255 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
256 :return: Stream of packets.
259 if self.flows.__contains__(src_if):
260 src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
261 for dst_if in self.flows[src_if]:
262 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
263 n_int = len(dst_hosts) * len(src_hosts)
264 for i in range(0, n_int):
265 dst_host = dst_hosts[i // len(src_hosts)]
266 src_host = src_hosts[i % len(src_hosts)]
267 pkt_info = self.create_packet_info(src_if, dst_if)
273 pkt_info.ip = random.choice([0, 1])
275 pkt_info.proto = random.choice(self.proto[self.IP])
277 pkt_info.proto = proto
278 payload = self.info_to_payload(pkt_info)
279 p = Ether(dst=dst_host.mac, src=src_host.mac)
281 p = Ether(dst=dst_host.mac, src=src_host.mac, type=etype)
283 p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
285 p /= IPv6ExtHdrFragment(offset=64, m=1)
289 src=src_host.ip4, dst=dst_host.ip4, flags=1, frag=64
292 p /= IP(src=src_host.ip4, dst=dst_host.ip4)
293 if traffic_type == self.ICMP:
295 p /= ICMPv6EchoRequest(
296 type=self.icmp6_type, code=self.icmp6_code
299 p /= ICMP(type=self.icmp4_type, code=self.icmp4_code)
301 p /= self.create_upper_layer(i, pkt_info.proto, ports)
304 pkt_info.data = p.copy()
306 size = random.choice(packet_sizes)
307 self.extend_packet(p, size)
311 def verify_capture(self, pg_if, capture, traffic_type=0, ip_type=0, etype=-1):
313 Verify captured input packet stream for defined interface.
315 :param object pg_if: Interface to verify captured packet stream for.
316 :param list capture: Captured packet stream.
317 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
320 for i in self.pg_interfaces:
321 last_info[i.sw_if_index] = None
322 dst_sw_if_index = pg_if.sw_if_index
323 for packet in capture:
325 if packet[Ether].type != etype:
326 self.logger.error(ppp("Unexpected ethertype in packet:", packet))
330 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
331 if traffic_type == self.ICMP and ip_type == self.IPV6:
332 payload_info = self.payload_to_info(packet[ICMPv6EchoRequest].data)
333 payload = packet[ICMPv6EchoRequest]
335 payload_info = self.payload_to_info(packet[Raw])
336 payload = packet[self.proto_map[payload_info.proto]]
339 ppp("Unexpected or invalid packet (outside network):", packet)
344 self.assertEqual(payload_info.ip, ip_type)
345 if traffic_type == self.ICMP:
347 if payload_info.ip == 0:
348 self.assertEqual(payload.type, self.icmp4_type)
349 self.assertEqual(payload.code, self.icmp4_code)
351 self.assertEqual(payload.type, self.icmp6_type)
352 self.assertEqual(payload.code, self.icmp6_code)
355 ppp("Unexpected or invalid packet (outside network):", packet)
360 ip_version = IPv6 if payload_info.ip == 1 else IP
362 ip = packet[ip_version]
363 packet_index = payload_info.index
365 self.assertEqual(payload_info.dst, dst_sw_if_index)
367 "Got packet on port %s: src=%u (id=%u)"
368 % (pg_if.name, payload_info.src, packet_index)
370 next_info = self.get_next_packet_info_for_interface2(
371 payload_info.src, dst_sw_if_index, last_info[payload_info.src]
373 last_info[payload_info.src] = next_info
374 self.assertTrue(next_info is not None)
375 self.assertEqual(packet_index, next_info.index)
376 saved_packet = next_info.data
377 # Check standard fields
378 self.assertEqual(ip.src, saved_packet[ip_version].src)
379 self.assertEqual(ip.dst, saved_packet[ip_version].dst)
380 p = self.proto_map[payload_info.proto]
383 self.assertEqual(tcp.sport, saved_packet[TCP].sport)
384 self.assertEqual(tcp.dport, saved_packet[TCP].dport)
387 self.assertEqual(udp.sport, saved_packet[UDP].sport)
388 self.assertEqual(udp.dport, saved_packet[UDP].dport)
390 self.logger.error(ppp("Unexpected or invalid packet:", packet))
392 for i in self.pg_interfaces:
393 remaining_packet = self.get_next_packet_info_for_interface2(
394 i, dst_sw_if_index, last_info[i.sw_if_index]
397 remaining_packet is None,
398 "Port %u: Packet expected from source %u didn't arrive"
399 % (dst_sw_if_index, i.sw_if_index),
402 def run_traffic_no_check(self):
404 # Create incoming packet streams for packet-generator interfaces
405 for i in self.pg_interfaces:
406 if self.flows.__contains__(i):
407 pkts = self.create_stream(i, self.pg_if_packet_sizes)
411 # Enable packet capture and start packet sending
412 self.pg_enable_capture(self.pg_interfaces)
426 # Create incoming packet streams for packet-generator interfaces
428 for i in self.pg_interfaces:
429 if self.flows.__contains__(i):
430 pkts = self.create_stream(
432 self.pg_if_packet_sizes,
443 pkts_cnt += len(pkts)
445 # Enable packet capture and start packet sendingself.IPV
446 self.pg_enable_capture(self.pg_interfaces)
450 # Verify outgoing packet streams per packet-generator interface
451 for src_if in self.pg_interfaces:
452 if self.flows.__contains__(src_if):
453 for dst_if in self.flows[src_if]:
454 capture = dst_if.get_capture(pkts_cnt)
455 self.logger.info("Verifying capture on interface %s" % dst_if.name)
456 self.verify_capture(dst_if, capture, traffic_type, ip_type, etype)
458 def run_verify_negat_test(
459 self, traffic_type=0, ip_type=0, proto=-1, ports=0, frags=False, etype=-1
462 self.reset_packet_infos()
463 for i in self.pg_interfaces:
464 if self.flows.__contains__(i):
465 pkts = self.create_stream(
467 self.pg_if_packet_sizes,
479 # Enable packet capture and start packet sending
480 self.pg_enable_capture(self.pg_interfaces)
484 # Verify outgoing packet streams per packet-generator interface
485 for src_if in self.pg_interfaces:
486 if self.flows.__contains__(src_if):
487 for dst_if in self.flows[src_if]:
488 self.logger.info("Verifying capture on interface %s" % dst_if.name)
489 capture = dst_if.get_capture(0)
490 self.assertEqual(len(capture), 0)
492 def build_classify_table(
499 hit_next_index=0xFFFFFFFF,
502 a_mask = self.build_mac_mask(
503 src_mac=src_mac, dst_mac=dst_mac, ether_type=ether_type
505 self.create_classify_table(key, a_mask)
506 for host in self.hosts_by_pg_idx[self.pg0.sw_if_index]:
507 s_mac = host.mac if src_mac else ""
509 for dst_if in self.flows[self.pg0]:
510 for dst_host in self.hosts_by_pg_idx[dst_if.sw_if_index]:
511 self.create_classify_session(
513 self.acl_tbl_idx.get(key),
514 self.build_mac_match(
515 src_mac=s_mac, dst_mac=dst_host.mac, ether_type=etype
517 hit_next_index=hit_next_index,
520 self.create_classify_session(
522 self.acl_tbl_idx.get(key),
523 self.build_mac_match(src_mac=s_mac, dst_mac="", ether_type=etype),
524 hit_next_index=hit_next_index,
527 def test_0000_warmup_test(self):
528 """Learn the MAC addresses"""
530 self.run_traffic_no_check()
532 def test_0010_inacl_permit_src_mac(self):
533 """Input L2 ACL test - permit source MAC
535 Test scenario for basic IP ACL with source IP
536 - Create IPv4 stream for pg0 -> pg1 interface.
537 - Create ACL with source MAC address.
538 - Send and verify received packets on pg1 interface.
541 self.build_classify_table(src_mac="ffffffffffff", key=key)
542 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
543 self.acl_active_table = key
544 self.run_verify_test(self.IP, self.IPV4, -1)
546 def test_0011_inacl_permit_dst_mac(self):
547 """Input L2 ACL test - permit destination MAC
549 Test scenario for basic IP ACL with source IP
550 - Create IPv4 stream for pg0 -> pg1 interface.
551 - Create ACL with destination MAC address.
552 - Send and verify received packets on pg1 interface.
555 self.build_classify_table(dst_mac="ffffffffffff", key=key)
556 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
557 self.acl_active_table = key
558 self.run_verify_test(self.IP, self.IPV4, -1)
560 def test_0012_inacl_permit_src_dst_mac(self):
561 """Input L2 ACL test - permit source and destination MAC
563 Test scenario for basic IP ACL with source IP
564 - Create IPv4 stream for pg0 -> pg1 interface.
565 - Create ACL with source and destination MAC addresses.
566 - Send and verify received packets on pg1 interface.
569 self.build_classify_table(
570 src_mac="ffffffffffff", dst_mac="ffffffffffff", key=key
572 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
573 self.acl_active_table = key
574 self.run_verify_test(self.IP, self.IPV4, -1)
576 def test_0013_inacl_permit_ether_type(self):
577 """Input L2 ACL test - permit ether_type
579 Test scenario for basic IP ACL with source IP
580 - Create IPv4 stream for pg0 -> pg1 interface.
581 - Create ACL with destination MAC address.
582 - Send and verify received packets on pg1 interface.
585 self.build_classify_table(ether_type="ffff", etype=hex(ETH_P_IP)[2:], key=key)
586 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
587 self.acl_active_table = key
588 self.run_verify_test(self.IP, self.IPV4, -1)
590 def test_0015_inacl_deny(self):
591 """Input L2 ACL test - deny
593 Test scenario for basic IP ACL with source IP
594 - Create IPv4 stream for pg0 -> pg1 interface.
596 - Create ACL with source MAC address.
597 - Send and verify no received packets on pg1 interface.
600 self.build_classify_table(src_mac="ffffffffffff", hit_next_index=0, key=key)
601 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
602 self.acl_active_table = key
603 self.run_verify_negat_test(self.IP, self.IPV4, -1)
605 def test_0020_outacl_permit(self):
606 """Output L2 ACL test - permit
608 Test scenario for basic IP ACL with source IP
609 - Create IPv4 stream for pg0 -> pg1 interface.
610 - Create ACL with source MAC address.
611 - Send and verify received packets on pg1 interface.
614 self.build_classify_table(src_mac="ffffffffffff", key=key)
615 self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get(key))
616 self.acl_active_table = key
617 self.run_verify_test(self.IP, self.IPV4, -1)
619 def test_0025_outacl_deny(self):
620 """Output L2 ACL test - deny
622 Test scenario for basic IP ACL with source IP
623 - Create IPv4 stream for pg0 -> pg1 interface.
624 - Create ACL with source MAC address.
625 - Send and verify no received packets on pg1 interface.
628 self.build_classify_table(src_mac="ffffffffffff", hit_next_index=0, key=key)
629 self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get(key))
630 self.acl_active_table = key
631 self.run_verify_negat_test(self.IP, self.IPV4, -1)
633 def test_0030_inoutacl_permit(self):
634 """Input+Output L2 ACL test - permit
636 Test scenario for basic IP ACL with source IP
637 - Create IPv4 stream for pg0 -> pg1 interface.
638 - Create ACLs with source MAC address.
639 - Send and verify received packets on pg1 interface.
642 self.build_classify_table(src_mac="ffffffffffff", key=key)
643 self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get(key))
644 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
645 self.acl_active_table = key
646 self.run_verify_test(self.IP, self.IPV4, -1)
649 if __name__ == "__main__":
650 unittest.main(testRunner=VppTestRunner)