2 """ Classifier-based L2 ACL Test Case HLD:
11 from scapy.packet import Raw
12 from scapy.data import ETH_P_IP
13 from scapy.layers.l2 import Ether
14 from scapy.layers.inet import IP, TCP, UDP, ICMP
15 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
16 from scapy.layers.inet6 import IPv6ExtHdrFragment
17 from framework import VppTestCase, VppTestRunner
18 from util import Host, ppp
19 from template_classifier import TestClassifier
22 class TestClassifyAcl(TestClassifier):
23 """ Classifier-based L2 input and output ACL Test Case """
39 proto = [[6, 17], [1, 58]]
40 proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
52 udp_sport_to = udp_sport_from + 5
53 udp_dport_from = 20000
54 udp_dport_to = udp_dport_from + 5000
56 tcp_sport_to = tcp_sport_from + 5
57 tcp_dport_from = 40000
58 tcp_dport_to = tcp_dport_from + 5000
61 udp_sport_to_2 = udp_sport_from_2 + 5
62 udp_dport_from_2 = 30000
63 udp_dport_to_2 = udp_dport_from_2 + 5000
64 tcp_sport_from_2 = 130
65 tcp_sport_to_2 = tcp_sport_from_2 + 5
66 tcp_dport_from_2 = 20000
67 tcp_dport_to_2 = tcp_dport_from_2 + 5000
69 icmp4_type = 8 # echo request
71 icmp6_type = 128 # echo request
87 Perform standard class setup (defined by class method setUpClass in
88 class VppTestCase) before running the test case, set test case related
89 variables and configure VPP.
91 super(TestClassifyAcl, cls).setUpClass()
95 # Create 2 pg interfaces
96 cls.create_pg_interfaces(range(2))
98 # Packet flows mapping pg0 -> pg1, pg2 etc.
100 cls.flows[cls.pg0] = [cls.pg1]
103 cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
105 # Create BD with MAC learning and unknown unicast flooding disabled
106 # and put interfaces to this BD
107 cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
109 for pg_if in cls.pg_interfaces:
110 cls.vapi.sw_interface_set_l2_bridge(
111 rx_sw_if_index=pg_if.sw_if_index, bd_id=cls.bd_id)
113 # Set up all interfaces
114 for i in cls.pg_interfaces:
117 # Mapping between packet-generator index and lists of test hosts
118 cls.hosts_by_pg_idx = dict()
119 for pg_if in cls.pg_interfaces:
120 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
122 # Create list of deleted hosts
123 cls.deleted_hosts_by_pg_idx = dict()
124 for pg_if in cls.pg_interfaces:
125 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
127 # warm-up the mac address tables
130 # Holder of the active classify table key
131 cls.acl_active_table = ''
134 super(TestClassifyAcl, cls).tearDownClass()
138 def tearDownClass(cls):
139 super(TestClassifyAcl, cls).tearDownClass()
142 super(TestClassifyAcl, self).setUp()
143 self.acl_tbl_idx = {}
147 Show various debug prints after each test.
149 if not self.vpp_dead:
150 if self.acl_active_table == 'mac_inout':
151 self.output_acl_set_interface(
152 self.pg1, self.acl_tbl_idx.get(self.acl_active_table), 0)
153 self.input_acl_set_interface(
154 self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
155 self.acl_active_table = ''
156 elif self.acl_active_table == 'mac_out':
157 self.output_acl_set_interface(
158 self.pg1, self.acl_tbl_idx.get(self.acl_active_table), 0)
159 self.acl_active_table = ''
160 elif self.acl_active_table == 'mac_in':
161 self.input_acl_set_interface(
162 self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
163 self.acl_active_table = ''
165 super(TestClassifyAcl, self).tearDown()
167 def create_classify_session(self, intf, table_index, match,
168 hit_next_index=0xffffffff, is_add=1):
169 """Create Classify Session
171 :param VppInterface intf: Interface to apply classify session.
172 :param int table_index: table index to identify classify table.
173 :param str match: matched value for interested traffic.
174 :param int is_add: option to configure classify session.
175 - create(1) or delete(0)
177 mask_match, mask_match_len = self._resolve_mask_match(match)
178 r = self.vapi.classify_add_del_session(
180 table_index=table_index,
182 match_len=mask_match_len,
183 hit_next_index=hit_next_index)
184 self.assertIsNotNone(r, 'No response msg for add_del_session')
186 def create_hosts(self, count, start=0):
188 Create required number of host MAC addresses and distribute them among
189 interfaces. Create host IPv4 address for every host MAC address.
191 :param int count: Number of hosts to create MAC/IPv4 addresses for.
192 :param int start: Number to start numbering from.
194 n_int = len(self.pg_interfaces)
195 macs_per_if = count // n_int
197 for pg_if in self.pg_interfaces:
199 start_nr = macs_per_if * i + start
200 end_nr = count + start if i == (n_int - 1) \
201 else macs_per_if * (i + 1) + start
202 hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
203 for j in range(start_nr, end_nr):
205 "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
206 "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
207 "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
210 def create_upper_layer(self, packet_index, proto, ports=0):
211 p = self.proto_map[proto]
214 return UDP(sport=random.randint(self.udp_sport_from,
216 dport=random.randint(self.udp_dport_from,
219 return UDP(sport=ports, dport=ports)
222 return TCP(sport=random.randint(self.tcp_sport_from,
224 dport=random.randint(self.tcp_dport_from,
227 return TCP(sport=ports, dport=ports)
230 def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
231 proto=-1, ports=0, fragments=False,
232 pkt_raw=True, etype=-1):
234 Create input packet stream for defined interface using hosts or
237 :param object src_if: Interface to create packet stream for.
238 :param list packet_sizes: List of required packet sizes.
239 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
240 :return: Stream of packets.
243 if self.flows.__contains__(src_if):
244 src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
245 for dst_if in self.flows[src_if]:
246 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
247 n_int = len(dst_hosts) * len(src_hosts)
248 for i in range(0, n_int):
249 dst_host = dst_hosts[i // len(src_hosts)]
250 src_host = src_hosts[i % len(src_hosts)]
251 pkt_info = self.create_packet_info(src_if, dst_if)
257 pkt_info.ip = random.choice([0, 1])
259 pkt_info.proto = random.choice(self.proto[self.IP])
261 pkt_info.proto = proto
262 payload = self.info_to_payload(pkt_info)
263 p = Ether(dst=dst_host.mac, src=src_host.mac)
265 p = Ether(dst=dst_host.mac,
269 p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
271 p /= IPv6ExtHdrFragment(offset=64, m=1)
274 p /= IP(src=src_host.ip4, dst=dst_host.ip4,
277 p /= IP(src=src_host.ip4, dst=dst_host.ip4)
278 if traffic_type == self.ICMP:
280 p /= ICMPv6EchoRequest(type=self.icmp6_type,
281 code=self.icmp6_code)
283 p /= ICMP(type=self.icmp4_type,
284 code=self.icmp4_code)
286 p /= self.create_upper_layer(i, pkt_info.proto, ports)
289 pkt_info.data = p.copy()
291 size = random.choice(packet_sizes)
292 self.extend_packet(p, size)
296 def verify_capture(self, pg_if, capture,
297 traffic_type=0, ip_type=0, etype=-1):
299 Verify captured input packet stream for defined interface.
301 :param object pg_if: Interface to verify captured packet stream for.
302 :param list capture: Captured packet stream.
303 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
306 for i in self.pg_interfaces:
307 last_info[i.sw_if_index] = None
308 dst_sw_if_index = pg_if.sw_if_index
309 for packet in capture:
311 if packet[Ether].type != etype:
312 self.logger.error(ppp("Unexpected ethertype in packet:",
317 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
318 if traffic_type == self.ICMP and ip_type == self.IPV6:
319 payload_info = self.payload_to_info(
320 packet[ICMPv6EchoRequest].data)
321 payload = packet[ICMPv6EchoRequest]
323 payload_info = self.payload_to_info(packet[Raw])
324 payload = packet[self.proto_map[payload_info.proto]]
326 self.logger.error(ppp("Unexpected or invalid packet "
327 "(outside network):", packet))
331 self.assertEqual(payload_info.ip, ip_type)
332 if traffic_type == self.ICMP:
334 if payload_info.ip == 0:
335 self.assertEqual(payload.type, self.icmp4_type)
336 self.assertEqual(payload.code, self.icmp4_code)
338 self.assertEqual(payload.type, self.icmp6_type)
339 self.assertEqual(payload.code, self.icmp6_code)
341 self.logger.error(ppp("Unexpected or invalid packet "
342 "(outside network):", packet))
346 ip_version = IPv6 if payload_info.ip == 1 else IP
348 ip = packet[ip_version]
349 packet_index = payload_info.index
351 self.assertEqual(payload_info.dst, dst_sw_if_index)
352 self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
353 (pg_if.name, payload_info.src,
355 next_info = self.get_next_packet_info_for_interface2(
356 payload_info.src, dst_sw_if_index,
357 last_info[payload_info.src])
358 last_info[payload_info.src] = next_info
359 self.assertTrue(next_info is not None)
360 self.assertEqual(packet_index, next_info.index)
361 saved_packet = next_info.data
362 # Check standard fields
363 self.assertEqual(ip.src, saved_packet[ip_version].src)
364 self.assertEqual(ip.dst, saved_packet[ip_version].dst)
365 p = self.proto_map[payload_info.proto]
368 self.assertEqual(tcp.sport, saved_packet[
370 self.assertEqual(tcp.dport, saved_packet[
374 self.assertEqual(udp.sport, saved_packet[
376 self.assertEqual(udp.dport, saved_packet[
379 self.logger.error(ppp("Unexpected or invalid packet:",
382 for i in self.pg_interfaces:
383 remaining_packet = self.get_next_packet_info_for_interface2(
384 i, dst_sw_if_index, last_info[i.sw_if_index])
386 remaining_packet is None,
387 "Port %u: Packet expected from source %u didn't arrive" %
388 (dst_sw_if_index, i.sw_if_index))
390 def run_traffic_no_check(self):
392 # Create incoming packet streams for packet-generator interfaces
393 for i in self.pg_interfaces:
394 if self.flows.__contains__(i):
395 pkts = self.create_stream(i, self.pg_if_packet_sizes)
399 # Enable packet capture and start packet sending
400 self.pg_enable_capture(self.pg_interfaces)
403 def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
404 frags=False, pkt_raw=True, etype=-1):
406 # Create incoming packet streams for packet-generator interfaces
408 for i in self.pg_interfaces:
409 if self.flows.__contains__(i):
410 pkts = self.create_stream(i, self.pg_if_packet_sizes,
411 traffic_type, ip_type, proto, ports,
412 frags, pkt_raw, etype)
415 pkts_cnt += len(pkts)
417 # Enable packet capture and start packet sendingself.IPV
418 self.pg_enable_capture(self.pg_interfaces)
422 # Verify outgoing packet streams per packet-generator interface
423 for src_if in self.pg_interfaces:
424 if self.flows.__contains__(src_if):
425 for dst_if in self.flows[src_if]:
426 capture = dst_if.get_capture(pkts_cnt)
427 self.logger.info("Verifying capture on interface %s" %
429 self.verify_capture(dst_if, capture,
430 traffic_type, ip_type, etype)
432 def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
433 ports=0, frags=False, etype=-1):
435 self.reset_packet_infos()
436 for i in self.pg_interfaces:
437 if self.flows.__contains__(i):
438 pkts = self.create_stream(i, self.pg_if_packet_sizes,
439 traffic_type, ip_type, proto, ports,
444 # Enable packet capture and start packet sending
445 self.pg_enable_capture(self.pg_interfaces)
449 # Verify outgoing packet streams per packet-generator interface
450 for src_if in self.pg_interfaces:
451 if self.flows.__contains__(src_if):
452 for dst_if in self.flows[src_if]:
453 self.logger.info("Verifying capture on interface %s" %
455 capture = dst_if.get_capture(0)
456 self.assertEqual(len(capture), 0)
458 def build_classify_table(self, src_mac='', dst_mac='', ether_type='',
459 etype='', key='mac', hit_next_index=0xffffffff):
461 a_mask = self.build_mac_mask(src_mac=src_mac, dst_mac=dst_mac,
462 ether_type=ether_type)
463 self.create_classify_table(key, a_mask)
464 for host in self.hosts_by_pg_idx[self.pg0.sw_if_index]:
465 s_mac = host.mac if src_mac else ''
467 for dst_if in self.flows[self.pg0]:
468 for dst_host in self.hosts_by_pg_idx[dst_if.sw_if_index]:
469 self.create_classify_session(
470 self.pg0, self.acl_tbl_idx.get(key),
471 self.build_mac_match(src_mac=s_mac,
472 dst_mac=dst_host.mac,
474 hit_next_index=hit_next_index)
476 self.create_classify_session(
477 self.pg0, self.acl_tbl_idx.get(key),
478 self.build_mac_match(src_mac=s_mac, dst_mac='',
480 hit_next_index=hit_next_index)
482 def test_0000_warmup_test(self):
483 """ Learn the MAC addresses
486 self.run_traffic_no_check()
488 def test_0010_inacl_permit_src_mac(self):
489 """ Input L2 ACL test - permit source MAC
491 Test scenario for basic IP ACL with source IP
492 - Create IPv4 stream for pg0 -> pg1 interface.
493 - Create ACL with source MAC address.
494 - Send and verify received packets on pg1 interface.
497 self.build_classify_table(src_mac='ffffffffffff', key=key)
498 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
499 self.acl_active_table = key
500 self.run_verify_test(self.IP, self.IPV4, -1)
502 def test_0011_inacl_permit_dst_mac(self):
503 """ Input L2 ACL test - permit destination MAC
505 Test scenario for basic IP ACL with source IP
506 - Create IPv4 stream for pg0 -> pg1 interface.
507 - Create ACL with destination MAC address.
508 - Send and verify received packets on pg1 interface.
511 self.build_classify_table(dst_mac='ffffffffffff', key=key)
512 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
513 self.acl_active_table = key
514 self.run_verify_test(self.IP, self.IPV4, -1)
516 def test_0012_inacl_permit_src_dst_mac(self):
517 """ Input L2 ACL test - permit source and destination MAC
519 Test scenario for basic IP ACL with source IP
520 - Create IPv4 stream for pg0 -> pg1 interface.
521 - Create ACL with source and destination MAC addresses.
522 - Send and verify received packets on pg1 interface.
525 self.build_classify_table(
526 src_mac='ffffffffffff', dst_mac='ffffffffffff', key=key)
527 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
528 self.acl_active_table = key
529 self.run_verify_test(self.IP, self.IPV4, -1)
531 def test_0013_inacl_permit_ether_type(self):
532 """ Input L2 ACL test - permit ether_type
534 Test scenario for basic IP ACL with source IP
535 - Create IPv4 stream for pg0 -> pg1 interface.
536 - Create ACL with destination MAC address.
537 - Send and verify received packets on pg1 interface.
540 self.build_classify_table(
541 ether_type='ffff', etype=hex(ETH_P_IP)[2:], key=key)
542 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
543 self.acl_active_table = key
544 self.run_verify_test(self.IP, self.IPV4, -1)
546 def test_0015_inacl_deny(self):
547 """ Input L2 ACL test - deny
549 Test scenario for basic IP ACL with source IP
550 - Create IPv4 stream for pg0 -> pg1 interface.
552 - Create ACL with source MAC address.
553 - Send and verify no received packets on pg1 interface.
556 self.build_classify_table(
557 src_mac='ffffffffffff', hit_next_index=0, key=key)
558 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
559 self.acl_active_table = key
560 self.run_verify_negat_test(self.IP, self.IPV4, -1)
562 def test_0020_outacl_permit(self):
563 """ Output L2 ACL test - permit
565 Test scenario for basic IP ACL with source IP
566 - Create IPv4 stream for pg0 -> pg1 interface.
567 - Create ACL with source MAC address.
568 - Send and verify received packets on pg1 interface.
571 self.build_classify_table(src_mac='ffffffffffff', key=key)
572 self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get(key))
573 self.acl_active_table = key
574 self.run_verify_test(self.IP, self.IPV4, -1)
576 def test_0025_outacl_deny(self):
577 """ Output L2 ACL test - deny
579 Test scenario for basic IP ACL with source IP
580 - Create IPv4 stream for pg0 -> pg1 interface.
581 - Create ACL with source MAC address.
582 - Send and verify no received packets on pg1 interface.
585 self.build_classify_table(
586 src_mac='ffffffffffff', hit_next_index=0, key=key)
587 self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get(key))
588 self.acl_active_table = key
589 self.run_verify_negat_test(self.IP, self.IPV4, -1)
591 def test_0030_inoutacl_permit(self):
592 """ Input+Output L2 ACL test - permit
594 Test scenario for basic IP ACL with source IP
595 - Create IPv4 stream for pg0 -> pg1 interface.
596 - Create ACLs with source MAC address.
597 - Send and verify received packets on pg1 interface.
600 self.build_classify_table(src_mac='ffffffffffff', key=key)
601 self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get(key))
602 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
603 self.acl_active_table = key
604 self.run_verify_test(self.IP, self.IPV4, -1)
607 if __name__ == '__main__':
608 unittest.main(testRunner=VppTestRunner)