2 """ Classifier-based L2 ACL Test Case HLD:
11 from scapy.packet import Raw
12 from scapy.data import ETH_P_IP
13 from scapy.layers.l2 import Ether
14 from scapy.layers.inet import IP, TCP, UDP, ICMP
15 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
16 from scapy.layers.inet6 import IPv6ExtHdrFragment
17 from framework import VppTestCase, VppTestRunner
18 from util import Host, ppp
19 from template_classifier import TestClassifier
22 class TestClassifyAcl(TestClassifier):
23 """Classifier-based L2 input and output ACL Test Case"""
39 proto = [[6, 17], [1, 58]]
40 proto_map = {1: "ICMP", 58: "ICMPv6EchoRequest", 6: "TCP", 17: "UDP"}
52 udp_sport_to = udp_sport_from + 5
53 udp_dport_from = 20000
54 udp_dport_to = udp_dport_from + 5000
56 tcp_sport_to = tcp_sport_from + 5
57 tcp_dport_from = 40000
58 tcp_dport_to = tcp_dport_from + 5000
61 udp_sport_to_2 = udp_sport_from_2 + 5
62 udp_dport_from_2 = 30000
63 udp_dport_to_2 = udp_dport_from_2 + 5000
64 tcp_sport_from_2 = 130
65 tcp_sport_to_2 = tcp_sport_from_2 + 5
66 tcp_dport_from_2 = 20000
67 tcp_dport_to_2 = tcp_dport_from_2 + 5000
69 icmp4_type = 8 # echo request
71 icmp6_type = 128 # echo request
87 Perform standard class setup (defined by class method setUpClass in
88 class VppTestCase) before running the test case, set test case related
89 variables and configure VPP.
91 super(TestClassifyAcl, cls).setUpClass()
95 # Create 2 pg interfaces
96 cls.create_pg_interfaces(range(2))
98 # Packet flows mapping pg0 -> pg1, pg2 etc.
100 cls.flows[cls.pg0] = [cls.pg1]
103 cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
105 # Create BD with MAC learning and unknown unicast flooding disabled
106 # and put interfaces to this BD
107 cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1, learn=1)
108 for pg_if in cls.pg_interfaces:
109 cls.vapi.sw_interface_set_l2_bridge(
110 rx_sw_if_index=pg_if.sw_if_index, bd_id=cls.bd_id
113 # Set up all interfaces
114 for i in cls.pg_interfaces:
117 # Mapping between packet-generator index and lists of test hosts
118 cls.hosts_by_pg_idx = dict()
119 for pg_if in cls.pg_interfaces:
120 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
122 # Create list of deleted hosts
123 cls.deleted_hosts_by_pg_idx = dict()
124 for pg_if in cls.pg_interfaces:
125 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
127 # warm-up the mac address tables
130 # Holder of the active classify table key
131 cls.acl_active_table = ""
134 super(TestClassifyAcl, cls).tearDownClass()
138 def tearDownClass(cls):
139 super(TestClassifyAcl, cls).tearDownClass()
142 super(TestClassifyAcl, self).setUp()
143 self.acl_tbl_idx = {}
147 Show various debug prints after each test.
149 if not self.vpp_dead:
150 if self.acl_active_table == "mac_inout":
151 self.output_acl_set_interface(
152 self.pg1, self.acl_tbl_idx.get(self.acl_active_table), 0
154 self.input_acl_set_interface(
155 self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0
157 self.acl_active_table = ""
158 elif self.acl_active_table == "mac_out":
159 self.output_acl_set_interface(
160 self.pg1, self.acl_tbl_idx.get(self.acl_active_table), 0
162 self.acl_active_table = ""
163 elif self.acl_active_table == "mac_in":
164 self.input_acl_set_interface(
165 self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0
167 self.acl_active_table = ""
169 super(TestClassifyAcl, self).tearDown()
171 def create_classify_session(
172 self, intf, table_index, match, hit_next_index=0xFFFFFFFF, is_add=1
174 """Create Classify Session
176 :param VppInterface intf: Interface to apply classify session.
177 :param int table_index: table index to identify classify table.
178 :param str match: matched value for interested traffic.
179 :param int is_add: option to configure classify session.
180 - create(1) or delete(0)
182 mask_match, mask_match_len = self._resolve_mask_match(match)
183 r = self.vapi.classify_add_del_session(
185 table_index=table_index,
187 match_len=mask_match_len,
188 hit_next_index=hit_next_index,
190 self.assertIsNotNone(r, "No response msg for add_del_session")
192 def create_hosts(self, count, start=0):
194 Create required number of host MAC addresses and distribute them among
195 interfaces. Create host IPv4 address for every host MAC address.
197 :param int count: Number of hosts to create MAC/IPv4 addresses for.
198 :param int start: Number to start numbering from.
200 n_int = len(self.pg_interfaces)
201 macs_per_if = count // n_int
203 for pg_if in self.pg_interfaces:
205 start_nr = macs_per_if * i + start
207 count + start if i == (n_int - 1) else macs_per_if * (i + 1) + start
209 hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
210 for j in range(start_nr, end_nr):
212 "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
213 "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
214 "2017:dead:%02x::%u" % (pg_if.sw_if_index, j),
218 def create_upper_layer(self, packet_index, proto, ports=0):
219 p = self.proto_map[proto]
223 sport=random.randint(self.udp_sport_from, self.udp_sport_to),
224 dport=random.randint(self.udp_dport_from, self.udp_dport_to),
227 return UDP(sport=ports, dport=ports)
231 sport=random.randint(self.tcp_sport_from, self.tcp_sport_to),
232 dport=random.randint(self.tcp_dport_from, self.tcp_dport_to),
235 return TCP(sport=ports, dport=ports)
251 Create input packet stream for defined interface using hosts or
254 :param object src_if: Interface to create packet stream for.
255 :param list packet_sizes: List of required packet sizes.
256 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
257 :return: Stream of packets.
260 if self.flows.__contains__(src_if):
261 src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
262 for dst_if in self.flows[src_if]:
263 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
264 n_int = len(dst_hosts) * len(src_hosts)
265 for i in range(0, n_int):
266 dst_host = dst_hosts[i // len(src_hosts)]
267 src_host = src_hosts[i % len(src_hosts)]
268 pkt_info = self.create_packet_info(src_if, dst_if)
274 pkt_info.ip = random.choice([0, 1])
276 pkt_info.proto = random.choice(self.proto[self.IP])
278 pkt_info.proto = proto
279 payload = self.info_to_payload(pkt_info)
280 p = Ether(dst=dst_host.mac, src=src_host.mac)
282 p = Ether(dst=dst_host.mac, src=src_host.mac, type=etype)
284 p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
286 p /= IPv6ExtHdrFragment(offset=64, m=1)
290 src=src_host.ip4, dst=dst_host.ip4, flags=1, frag=64
293 p /= IP(src=src_host.ip4, dst=dst_host.ip4)
294 if traffic_type == self.ICMP:
296 p /= ICMPv6EchoRequest(
297 type=self.icmp6_type, code=self.icmp6_code
300 p /= ICMP(type=self.icmp4_type, code=self.icmp4_code)
302 p /= self.create_upper_layer(i, pkt_info.proto, ports)
305 pkt_info.data = p.copy()
307 size = random.choice(packet_sizes)
308 self.extend_packet(p, size)
312 def verify_capture(self, pg_if, capture, traffic_type=0, ip_type=0, etype=-1):
314 Verify captured input packet stream for defined interface.
316 :param object pg_if: Interface to verify captured packet stream for.
317 :param list capture: Captured packet stream.
318 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
321 for i in self.pg_interfaces:
322 last_info[i.sw_if_index] = None
323 dst_sw_if_index = pg_if.sw_if_index
324 for packet in capture:
326 if packet[Ether].type != etype:
327 self.logger.error(ppp("Unexpected ethertype in packet:", packet))
331 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
332 if traffic_type == self.ICMP and ip_type == self.IPV6:
333 payload_info = self.payload_to_info(packet[ICMPv6EchoRequest].data)
334 payload = packet[ICMPv6EchoRequest]
336 payload_info = self.payload_to_info(packet[Raw])
337 payload = packet[self.proto_map[payload_info.proto]]
340 ppp("Unexpected or invalid packet (outside network):", packet)
345 self.assertEqual(payload_info.ip, ip_type)
346 if traffic_type == self.ICMP:
348 if payload_info.ip == 0:
349 self.assertEqual(payload.type, self.icmp4_type)
350 self.assertEqual(payload.code, self.icmp4_code)
352 self.assertEqual(payload.type, self.icmp6_type)
353 self.assertEqual(payload.code, self.icmp6_code)
356 ppp("Unexpected or invalid packet (outside network):", packet)
361 ip_version = IPv6 if payload_info.ip == 1 else IP
363 ip = packet[ip_version]
364 packet_index = payload_info.index
366 self.assertEqual(payload_info.dst, dst_sw_if_index)
368 "Got packet on port %s: src=%u (id=%u)"
369 % (pg_if.name, payload_info.src, packet_index)
371 next_info = self.get_next_packet_info_for_interface2(
372 payload_info.src, dst_sw_if_index, last_info[payload_info.src]
374 last_info[payload_info.src] = next_info
375 self.assertTrue(next_info is not None)
376 self.assertEqual(packet_index, next_info.index)
377 saved_packet = next_info.data
378 # Check standard fields
379 self.assertEqual(ip.src, saved_packet[ip_version].src)
380 self.assertEqual(ip.dst, saved_packet[ip_version].dst)
381 p = self.proto_map[payload_info.proto]
384 self.assertEqual(tcp.sport, saved_packet[TCP].sport)
385 self.assertEqual(tcp.dport, saved_packet[TCP].dport)
388 self.assertEqual(udp.sport, saved_packet[UDP].sport)
389 self.assertEqual(udp.dport, saved_packet[UDP].dport)
391 self.logger.error(ppp("Unexpected or invalid packet:", packet))
393 for i in self.pg_interfaces:
394 remaining_packet = self.get_next_packet_info_for_interface2(
395 i, dst_sw_if_index, last_info[i.sw_if_index]
398 remaining_packet is None,
399 "Port %u: Packet expected from source %u didn't arrive"
400 % (dst_sw_if_index, i.sw_if_index),
403 def run_traffic_no_check(self):
405 # Create incoming packet streams for packet-generator interfaces
406 for i in self.pg_interfaces:
407 if self.flows.__contains__(i):
408 pkts = self.create_stream(i, self.pg_if_packet_sizes)
412 # Enable packet capture and start packet sending
413 self.pg_enable_capture(self.pg_interfaces)
427 # Create incoming packet streams for packet-generator interfaces
429 for i in self.pg_interfaces:
430 if self.flows.__contains__(i):
431 pkts = self.create_stream(
433 self.pg_if_packet_sizes,
444 pkts_cnt += len(pkts)
446 # Enable packet capture and start packet sendingself.IPV
447 self.pg_enable_capture(self.pg_interfaces)
451 # Verify outgoing packet streams per packet-generator interface
452 for src_if in self.pg_interfaces:
453 if self.flows.__contains__(src_if):
454 for dst_if in self.flows[src_if]:
455 capture = dst_if.get_capture(pkts_cnt)
456 self.logger.info("Verifying capture on interface %s" % dst_if.name)
457 self.verify_capture(dst_if, capture, traffic_type, ip_type, etype)
459 def run_verify_negat_test(
460 self, traffic_type=0, ip_type=0, proto=-1, ports=0, frags=False, etype=-1
463 self.reset_packet_infos()
464 for i in self.pg_interfaces:
465 if self.flows.__contains__(i):
466 pkts = self.create_stream(
468 self.pg_if_packet_sizes,
480 # Enable packet capture and start packet sending
481 self.pg_enable_capture(self.pg_interfaces)
485 # Verify outgoing packet streams per packet-generator interface
486 for src_if in self.pg_interfaces:
487 if self.flows.__contains__(src_if):
488 for dst_if in self.flows[src_if]:
489 self.logger.info("Verifying capture on interface %s" % dst_if.name)
490 capture = dst_if.get_capture(0)
491 self.assertEqual(len(capture), 0)
493 def build_classify_table(
500 hit_next_index=0xFFFFFFFF,
503 a_mask = self.build_mac_mask(
504 src_mac=src_mac, dst_mac=dst_mac, ether_type=ether_type
506 self.create_classify_table(key, a_mask)
507 for host in self.hosts_by_pg_idx[self.pg0.sw_if_index]:
508 s_mac = host.mac if src_mac else ""
510 for dst_if in self.flows[self.pg0]:
511 for dst_host in self.hosts_by_pg_idx[dst_if.sw_if_index]:
512 self.create_classify_session(
514 self.acl_tbl_idx.get(key),
515 self.build_mac_match(
516 src_mac=s_mac, dst_mac=dst_host.mac, ether_type=etype
518 hit_next_index=hit_next_index,
521 self.create_classify_session(
523 self.acl_tbl_idx.get(key),
524 self.build_mac_match(src_mac=s_mac, dst_mac="", ether_type=etype),
525 hit_next_index=hit_next_index,
528 def test_0000_warmup_test(self):
529 """Learn the MAC addresses"""
531 self.run_traffic_no_check()
533 def test_0010_inacl_permit_src_mac(self):
534 """Input L2 ACL test - permit source MAC
536 Test scenario for basic IP ACL with source IP
537 - Create IPv4 stream for pg0 -> pg1 interface.
538 - Create ACL with source MAC address.
539 - Send and verify received packets on pg1 interface.
542 self.build_classify_table(src_mac="ffffffffffff", key=key)
543 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
544 self.acl_active_table = key
545 self.run_verify_test(self.IP, self.IPV4, -1)
547 def test_0011_inacl_permit_dst_mac(self):
548 """Input L2 ACL test - permit destination MAC
550 Test scenario for basic IP ACL with source IP
551 - Create IPv4 stream for pg0 -> pg1 interface.
552 - Create ACL with destination MAC address.
553 - Send and verify received packets on pg1 interface.
556 self.build_classify_table(dst_mac="ffffffffffff", key=key)
557 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
558 self.acl_active_table = key
559 self.run_verify_test(self.IP, self.IPV4, -1)
561 def test_0012_inacl_permit_src_dst_mac(self):
562 """Input L2 ACL test - permit source and destination MAC
564 Test scenario for basic IP ACL with source IP
565 - Create IPv4 stream for pg0 -> pg1 interface.
566 - Create ACL with source and destination MAC addresses.
567 - Send and verify received packets on pg1 interface.
570 self.build_classify_table(
571 src_mac="ffffffffffff", dst_mac="ffffffffffff", key=key
573 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
574 self.acl_active_table = key
575 self.run_verify_test(self.IP, self.IPV4, -1)
577 def test_0013_inacl_permit_ether_type(self):
578 """Input L2 ACL test - permit ether_type
580 Test scenario for basic IP ACL with source IP
581 - Create IPv4 stream for pg0 -> pg1 interface.
582 - Create ACL with destination MAC address.
583 - Send and verify received packets on pg1 interface.
586 self.build_classify_table(ether_type="ffff", etype=hex(ETH_P_IP)[2:], key=key)
587 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
588 self.acl_active_table = key
589 self.run_verify_test(self.IP, self.IPV4, -1)
591 def test_0015_inacl_deny(self):
592 """Input L2 ACL test - deny
594 Test scenario for basic IP ACL with source IP
595 - Create IPv4 stream for pg0 -> pg1 interface.
597 - Create ACL with source MAC address.
598 - Send and verify no received packets on pg1 interface.
601 self.build_classify_table(src_mac="ffffffffffff", hit_next_index=0, key=key)
602 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
603 self.acl_active_table = key
604 self.run_verify_negat_test(self.IP, self.IPV4, -1)
606 def test_0020_outacl_permit(self):
607 """Output L2 ACL test - permit
609 Test scenario for basic IP ACL with source IP
610 - Create IPv4 stream for pg0 -> pg1 interface.
611 - Create ACL with source MAC address.
612 - Send and verify received packets on pg1 interface.
615 self.build_classify_table(src_mac="ffffffffffff", key=key)
616 self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get(key))
617 self.acl_active_table = key
618 self.run_verify_test(self.IP, self.IPV4, -1)
620 def test_0025_outacl_deny(self):
621 """Output L2 ACL test - deny
623 Test scenario for basic IP ACL with source IP
624 - Create IPv4 stream for pg0 -> pg1 interface.
625 - Create ACL with source MAC address.
626 - Send and verify no received packets on pg1 interface.
629 self.build_classify_table(src_mac="ffffffffffff", hit_next_index=0, key=key)
630 self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get(key))
631 self.acl_active_table = key
632 self.run_verify_negat_test(self.IP, self.IPV4, -1)
634 def test_0030_inoutacl_permit(self):
635 """Input+Output L2 ACL test - permit
637 Test scenario for basic IP ACL with source IP
638 - Create IPv4 stream for pg0 -> pg1 interface.
639 - Create ACLs with source MAC address.
640 - Send and verify received packets on pg1 interface.
643 self.build_classify_table(src_mac="ffffffffffff", key=key)
644 self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get(key))
645 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
646 self.acl_active_table = key
647 self.run_verify_test(self.IP, self.IPV4, -1)
650 if __name__ == "__main__":
651 unittest.main(testRunner=VppTestRunner)