tests: replace pycodestyle with black
[vpp.git] / test / test_classify_l2_acl.py
1 #!/usr/bin/env python3
2 """ Classifier-based L2 ACL Test Case HLD:
3 """
4
5 import unittest
6 import random
7 import binascii
8 import socket
9
10
11 from scapy.packet import Raw
12 from scapy.data import ETH_P_IP
13 from scapy.layers.l2 import Ether
14 from scapy.layers.inet import IP, TCP, UDP, ICMP
15 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
16 from scapy.layers.inet6 import IPv6ExtHdrFragment
17 from framework import VppTestCase, VppTestRunner
18 from util import Host, ppp
19 from template_classifier import TestClassifier
20
21
22 class TestClassifyAcl(TestClassifier):
23     """Classifier-based L2 input and output ACL Test Case"""
24
25     # traffic types
26     IP = 0
27     ICMP = 1
28
29     # IP version
30     IPRANDOM = -1
31     IPV4 = 0
32     IPV6 = 1
33
34     # rule types
35     DENY = 0
36     PERMIT = 1
37
38     # supported protocols
39     proto = [[6, 17], [1, 58]]
40     proto_map = {1: "ICMP", 58: "ICMPv6EchoRequest", 6: "TCP", 17: "UDP"}
41     ICMPv4 = 0
42     ICMPv6 = 1
43     TCP = 0
44     UDP = 1
45     PROTO_ALL = 0
46
47     # port ranges
48     PORTS_ALL = -1
49     PORTS_RANGE = 0
50     PORTS_RANGE_2 = 1
51     udp_sport_from = 10
52     udp_sport_to = udp_sport_from + 5
53     udp_dport_from = 20000
54     udp_dport_to = udp_dport_from + 5000
55     tcp_sport_from = 30
56     tcp_sport_to = tcp_sport_from + 5
57     tcp_dport_from = 40000
58     tcp_dport_to = tcp_dport_from + 5000
59
60     udp_sport_from_2 = 90
61     udp_sport_to_2 = udp_sport_from_2 + 5
62     udp_dport_from_2 = 30000
63     udp_dport_to_2 = udp_dport_from_2 + 5000
64     tcp_sport_from_2 = 130
65     tcp_sport_to_2 = tcp_sport_from_2 + 5
66     tcp_dport_from_2 = 20000
67     tcp_dport_to_2 = tcp_dport_from_2 + 5000
68
69     icmp4_type = 8  # echo request
70     icmp4_code = 3
71     icmp6_type = 128  # echo request
72     icmp6_code = 3
73
74     icmp4_type_2 = 8
75     icmp4_code_from_2 = 5
76     icmp4_code_to_2 = 20
77     icmp6_type_2 = 128
78     icmp6_code_from_2 = 8
79     icmp6_code_to_2 = 42
80
81     # Test variables
82     bd_id = 1
83
84     @classmethod
85     def setUpClass(cls):
86         """
87         Perform standard class setup (defined by class method setUpClass in
88         class VppTestCase) before running the test case, set test case related
89         variables and configure VPP.
90         """
91         super(TestClassifyAcl, cls).setUpClass()
92         cls.af = None
93
94         try:
95             # Create 2 pg interfaces
96             cls.create_pg_interfaces(range(2))
97
98             # Packet flows mapping pg0 -> pg1, pg2 etc.
99             cls.flows = dict()
100             cls.flows[cls.pg0] = [cls.pg1]
101
102             # Packet sizes
103             cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
104
105             # Create BD with MAC learning and unknown unicast flooding disabled
106             # and put interfaces to this BD
107             cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1, learn=1)
108             for pg_if in cls.pg_interfaces:
109                 cls.vapi.sw_interface_set_l2_bridge(
110                     rx_sw_if_index=pg_if.sw_if_index, bd_id=cls.bd_id
111                 )
112
113             # Set up all interfaces
114             for i in cls.pg_interfaces:
115                 i.admin_up()
116
117             # Mapping between packet-generator index and lists of test hosts
118             cls.hosts_by_pg_idx = dict()
119             for pg_if in cls.pg_interfaces:
120                 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
121
122             # Create list of deleted hosts
123             cls.deleted_hosts_by_pg_idx = dict()
124             for pg_if in cls.pg_interfaces:
125                 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
126
127             # warm-up the mac address tables
128             # self.warmup_test()
129
130             # Holder of the active classify table key
131             cls.acl_active_table = ""
132
133         except Exception:
134             super(TestClassifyAcl, cls).tearDownClass()
135             raise
136
137     @classmethod
138     def tearDownClass(cls):
139         super(TestClassifyAcl, cls).tearDownClass()
140
141     def setUp(self):
142         super(TestClassifyAcl, self).setUp()
143         self.acl_tbl_idx = {}
144
145     def tearDown(self):
146         """
147         Show various debug prints after each test.
148         """
149         if not self.vpp_dead:
150             if self.acl_active_table == "mac_inout":
151                 self.output_acl_set_interface(
152                     self.pg1, self.acl_tbl_idx.get(self.acl_active_table), 0
153                 )
154                 self.input_acl_set_interface(
155                     self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0
156                 )
157                 self.acl_active_table = ""
158             elif self.acl_active_table == "mac_out":
159                 self.output_acl_set_interface(
160                     self.pg1, self.acl_tbl_idx.get(self.acl_active_table), 0
161                 )
162                 self.acl_active_table = ""
163             elif self.acl_active_table == "mac_in":
164                 self.input_acl_set_interface(
165                     self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0
166                 )
167                 self.acl_active_table = ""
168
169         super(TestClassifyAcl, self).tearDown()
170
171     def create_classify_session(
172         self, intf, table_index, match, hit_next_index=0xFFFFFFFF, is_add=1
173     ):
174         """Create Classify Session
175
176         :param VppInterface intf: Interface to apply classify session.
177         :param int table_index: table index to identify classify table.
178         :param str match: matched value for interested traffic.
179         :param int is_add: option to configure classify session.
180             - create(1) or delete(0)
181         """
182         mask_match, mask_match_len = self._resolve_mask_match(match)
183         r = self.vapi.classify_add_del_session(
184             is_add=is_add,
185             table_index=table_index,
186             match=mask_match,
187             match_len=mask_match_len,
188             hit_next_index=hit_next_index,
189         )
190         self.assertIsNotNone(r, "No response msg for add_del_session")
191
192     def create_hosts(self, count, start=0):
193         """
194         Create required number of host MAC addresses and distribute them among
195         interfaces. Create host IPv4 address for every host MAC address.
196
197         :param int count: Number of hosts to create MAC/IPv4 addresses for.
198         :param int start: Number to start numbering from.
199         """
200         n_int = len(self.pg_interfaces)
201         macs_per_if = count // n_int
202         i = -1
203         for pg_if in self.pg_interfaces:
204             i += 1
205             start_nr = macs_per_if * i + start
206             end_nr = (
207                 count + start if i == (n_int - 1) else macs_per_if * (i + 1) + start
208             )
209             hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
210             for j in range(start_nr, end_nr):
211                 host = Host(
212                     "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
213                     "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
214                     "2017:dead:%02x::%u" % (pg_if.sw_if_index, j),
215                 )
216                 hosts.append(host)
217
218     def create_upper_layer(self, packet_index, proto, ports=0):
219         p = self.proto_map[proto]
220         if p == "UDP":
221             if ports == 0:
222                 return UDP(
223                     sport=random.randint(self.udp_sport_from, self.udp_sport_to),
224                     dport=random.randint(self.udp_dport_from, self.udp_dport_to),
225                 )
226             else:
227                 return UDP(sport=ports, dport=ports)
228         elif p == "TCP":
229             if ports == 0:
230                 return TCP(
231                     sport=random.randint(self.tcp_sport_from, self.tcp_sport_to),
232                     dport=random.randint(self.tcp_dport_from, self.tcp_dport_to),
233                 )
234             else:
235                 return TCP(sport=ports, dport=ports)
236         return ""
237
238     def create_stream(
239         self,
240         src_if,
241         packet_sizes,
242         traffic_type=0,
243         ipv6=0,
244         proto=-1,
245         ports=0,
246         fragments=False,
247         pkt_raw=True,
248         etype=-1,
249     ):
250         """
251         Create input packet stream for defined interface using hosts or
252         deleted_hosts list.
253
254         :param object src_if: Interface to create packet stream for.
255         :param list packet_sizes: List of required packet sizes.
256         :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
257         :return: Stream of packets.
258         """
259         pkts = []
260         if self.flows.__contains__(src_if):
261             src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
262             for dst_if in self.flows[src_if]:
263                 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
264                 n_int = len(dst_hosts) * len(src_hosts)
265                 for i in range(0, n_int):
266                     dst_host = dst_hosts[i // len(src_hosts)]
267                     src_host = src_hosts[i % len(src_hosts)]
268                     pkt_info = self.create_packet_info(src_if, dst_if)
269                     if ipv6 == 1:
270                         pkt_info.ip = 1
271                     elif ipv6 == 0:
272                         pkt_info.ip = 0
273                     else:
274                         pkt_info.ip = random.choice([0, 1])
275                     if proto == -1:
276                         pkt_info.proto = random.choice(self.proto[self.IP])
277                     else:
278                         pkt_info.proto = proto
279                     payload = self.info_to_payload(pkt_info)
280                     p = Ether(dst=dst_host.mac, src=src_host.mac)
281                     if etype > 0:
282                         p = Ether(dst=dst_host.mac, src=src_host.mac, type=etype)
283                     if pkt_info.ip:
284                         p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
285                         if fragments:
286                             p /= IPv6ExtHdrFragment(offset=64, m=1)
287                     else:
288                         if fragments:
289                             p /= IP(
290                                 src=src_host.ip4, dst=dst_host.ip4, flags=1, frag=64
291                             )
292                         else:
293                             p /= IP(src=src_host.ip4, dst=dst_host.ip4)
294                     if traffic_type == self.ICMP:
295                         if pkt_info.ip:
296                             p /= ICMPv6EchoRequest(
297                                 type=self.icmp6_type, code=self.icmp6_code
298                             )
299                         else:
300                             p /= ICMP(type=self.icmp4_type, code=self.icmp4_code)
301                     else:
302                         p /= self.create_upper_layer(i, pkt_info.proto, ports)
303                     if pkt_raw:
304                         p /= Raw(payload)
305                         pkt_info.data = p.copy()
306                     if pkt_raw:
307                         size = random.choice(packet_sizes)
308                         self.extend_packet(p, size)
309                     pkts.append(p)
310         return pkts
311
312     def verify_capture(self, pg_if, capture, traffic_type=0, ip_type=0, etype=-1):
313         """
314         Verify captured input packet stream for defined interface.
315
316         :param object pg_if: Interface to verify captured packet stream for.
317         :param list capture: Captured packet stream.
318         :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
319         """
320         last_info = dict()
321         for i in self.pg_interfaces:
322             last_info[i.sw_if_index] = None
323         dst_sw_if_index = pg_if.sw_if_index
324         for packet in capture:
325             if etype > 0:
326                 if packet[Ether].type != etype:
327                     self.logger.error(ppp("Unexpected ethertype in packet:", packet))
328                 else:
329                     continue
330             try:
331                 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
332                 if traffic_type == self.ICMP and ip_type == self.IPV6:
333                     payload_info = self.payload_to_info(packet[ICMPv6EchoRequest].data)
334                     payload = packet[ICMPv6EchoRequest]
335                 else:
336                     payload_info = self.payload_to_info(packet[Raw])
337                     payload = packet[self.proto_map[payload_info.proto]]
338             except:
339                 self.logger.error(
340                     ppp("Unexpected or invalid packet (outside network):", packet)
341                 )
342                 raise
343
344             if ip_type != 0:
345                 self.assertEqual(payload_info.ip, ip_type)
346             if traffic_type == self.ICMP:
347                 try:
348                     if payload_info.ip == 0:
349                         self.assertEqual(payload.type, self.icmp4_type)
350                         self.assertEqual(payload.code, self.icmp4_code)
351                     else:
352                         self.assertEqual(payload.type, self.icmp6_type)
353                         self.assertEqual(payload.code, self.icmp6_code)
354                 except:
355                     self.logger.error(
356                         ppp("Unexpected or invalid packet (outside network):", packet)
357                     )
358                     raise
359             else:
360                 try:
361                     ip_version = IPv6 if payload_info.ip == 1 else IP
362
363                     ip = packet[ip_version]
364                     packet_index = payload_info.index
365
366                     self.assertEqual(payload_info.dst, dst_sw_if_index)
367                     self.logger.debug(
368                         "Got packet on port %s: src=%u (id=%u)"
369                         % (pg_if.name, payload_info.src, packet_index)
370                     )
371                     next_info = self.get_next_packet_info_for_interface2(
372                         payload_info.src, dst_sw_if_index, last_info[payload_info.src]
373                     )
374                     last_info[payload_info.src] = next_info
375                     self.assertTrue(next_info is not None)
376                     self.assertEqual(packet_index, next_info.index)
377                     saved_packet = next_info.data
378                     # Check standard fields
379                     self.assertEqual(ip.src, saved_packet[ip_version].src)
380                     self.assertEqual(ip.dst, saved_packet[ip_version].dst)
381                     p = self.proto_map[payload_info.proto]
382                     if p == "TCP":
383                         tcp = packet[TCP]
384                         self.assertEqual(tcp.sport, saved_packet[TCP].sport)
385                         self.assertEqual(tcp.dport, saved_packet[TCP].dport)
386                     elif p == "UDP":
387                         udp = packet[UDP]
388                         self.assertEqual(udp.sport, saved_packet[UDP].sport)
389                         self.assertEqual(udp.dport, saved_packet[UDP].dport)
390                 except:
391                     self.logger.error(ppp("Unexpected or invalid packet:", packet))
392                     raise
393         for i in self.pg_interfaces:
394             remaining_packet = self.get_next_packet_info_for_interface2(
395                 i, dst_sw_if_index, last_info[i.sw_if_index]
396             )
397             self.assertTrue(
398                 remaining_packet is None,
399                 "Port %u: Packet expected from source %u didn't arrive"
400                 % (dst_sw_if_index, i.sw_if_index),
401             )
402
403     def run_traffic_no_check(self):
404         # Test
405         # Create incoming packet streams for packet-generator interfaces
406         for i in self.pg_interfaces:
407             if self.flows.__contains__(i):
408                 pkts = self.create_stream(i, self.pg_if_packet_sizes)
409                 if len(pkts) > 0:
410                     i.add_stream(pkts)
411
412         # Enable packet capture and start packet sending
413         self.pg_enable_capture(self.pg_interfaces)
414         self.pg_start()
415
416     def run_verify_test(
417         self,
418         traffic_type=0,
419         ip_type=0,
420         proto=-1,
421         ports=0,
422         frags=False,
423         pkt_raw=True,
424         etype=-1,
425     ):
426         # Test
427         # Create incoming packet streams for packet-generator interfaces
428         pkts_cnt = 0
429         for i in self.pg_interfaces:
430             if self.flows.__contains__(i):
431                 pkts = self.create_stream(
432                     i,
433                     self.pg_if_packet_sizes,
434                     traffic_type,
435                     ip_type,
436                     proto,
437                     ports,
438                     frags,
439                     pkt_raw,
440                     etype,
441                 )
442                 if len(pkts) > 0:
443                     i.add_stream(pkts)
444                     pkts_cnt += len(pkts)
445
446         # Enable packet capture and start packet sendingself.IPV
447         self.pg_enable_capture(self.pg_interfaces)
448         self.pg_start()
449
450         # Verify
451         # Verify outgoing packet streams per packet-generator interface
452         for src_if in self.pg_interfaces:
453             if self.flows.__contains__(src_if):
454                 for dst_if in self.flows[src_if]:
455                     capture = dst_if.get_capture(pkts_cnt)
456                     self.logger.info("Verifying capture on interface %s" % dst_if.name)
457                     self.verify_capture(dst_if, capture, traffic_type, ip_type, etype)
458
459     def run_verify_negat_test(
460         self, traffic_type=0, ip_type=0, proto=-1, ports=0, frags=False, etype=-1
461     ):
462         # Test
463         self.reset_packet_infos()
464         for i in self.pg_interfaces:
465             if self.flows.__contains__(i):
466                 pkts = self.create_stream(
467                     i,
468                     self.pg_if_packet_sizes,
469                     traffic_type,
470                     ip_type,
471                     proto,
472                     ports,
473                     frags,
474                     True,
475                     etype,
476                 )
477                 if len(pkts) > 0:
478                     i.add_stream(pkts)
479
480         # Enable packet capture and start packet sending
481         self.pg_enable_capture(self.pg_interfaces)
482         self.pg_start()
483
484         # Verify
485         # Verify outgoing packet streams per packet-generator interface
486         for src_if in self.pg_interfaces:
487             if self.flows.__contains__(src_if):
488                 for dst_if in self.flows[src_if]:
489                     self.logger.info("Verifying capture on interface %s" % dst_if.name)
490                     capture = dst_if.get_capture(0)
491                     self.assertEqual(len(capture), 0)
492
493     def build_classify_table(
494         self,
495         src_mac="",
496         dst_mac="",
497         ether_type="",
498         etype="",
499         key="mac",
500         hit_next_index=0xFFFFFFFF,
501     ):
502         # Basic ACL testing
503         a_mask = self.build_mac_mask(
504             src_mac=src_mac, dst_mac=dst_mac, ether_type=ether_type
505         )
506         self.create_classify_table(key, a_mask)
507         for host in self.hosts_by_pg_idx[self.pg0.sw_if_index]:
508             s_mac = host.mac if src_mac else ""
509             if dst_mac:
510                 for dst_if in self.flows[self.pg0]:
511                     for dst_host in self.hosts_by_pg_idx[dst_if.sw_if_index]:
512                         self.create_classify_session(
513                             self.pg0,
514                             self.acl_tbl_idx.get(key),
515                             self.build_mac_match(
516                                 src_mac=s_mac, dst_mac=dst_host.mac, ether_type=etype
517                             ),
518                             hit_next_index=hit_next_index,
519                         )
520             else:
521                 self.create_classify_session(
522                     self.pg0,
523                     self.acl_tbl_idx.get(key),
524                     self.build_mac_match(src_mac=s_mac, dst_mac="", ether_type=etype),
525                     hit_next_index=hit_next_index,
526                 )
527
528     def test_0000_warmup_test(self):
529         """Learn the MAC addresses"""
530         self.create_hosts(2)
531         self.run_traffic_no_check()
532
533     def test_0010_inacl_permit_src_mac(self):
534         """Input  L2 ACL test - permit source MAC
535
536         Test scenario for basic IP ACL with source IP
537             - Create IPv4 stream for pg0 -> pg1 interface.
538             - Create ACL with source MAC address.
539             - Send and verify received packets on pg1 interface.
540         """
541         key = "mac_in"
542         self.build_classify_table(src_mac="ffffffffffff", key=key)
543         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
544         self.acl_active_table = key
545         self.run_verify_test(self.IP, self.IPV4, -1)
546
547     def test_0011_inacl_permit_dst_mac(self):
548         """Input  L2 ACL test - permit destination MAC
549
550         Test scenario for basic IP ACL with source IP
551             - Create IPv4 stream for pg0 -> pg1 interface.
552             - Create ACL with destination MAC address.
553             - Send and verify received packets on pg1 interface.
554         """
555         key = "mac_in"
556         self.build_classify_table(dst_mac="ffffffffffff", key=key)
557         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
558         self.acl_active_table = key
559         self.run_verify_test(self.IP, self.IPV4, -1)
560
561     def test_0012_inacl_permit_src_dst_mac(self):
562         """Input  L2 ACL test - permit source and destination MAC
563
564         Test scenario for basic IP ACL with source IP
565             - Create IPv4 stream for pg0 -> pg1 interface.
566             - Create ACL with source and destination MAC addresses.
567             - Send and verify received packets on pg1 interface.
568         """
569         key = "mac_in"
570         self.build_classify_table(
571             src_mac="ffffffffffff", dst_mac="ffffffffffff", key=key
572         )
573         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
574         self.acl_active_table = key
575         self.run_verify_test(self.IP, self.IPV4, -1)
576
577     def test_0013_inacl_permit_ether_type(self):
578         """Input  L2 ACL test - permit ether_type
579
580         Test scenario for basic IP ACL with source IP
581             - Create IPv4 stream for pg0 -> pg1 interface.
582             - Create ACL with destination MAC address.
583             - Send and verify received packets on pg1 interface.
584         """
585         key = "mac_in"
586         self.build_classify_table(ether_type="ffff", etype=hex(ETH_P_IP)[2:], key=key)
587         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
588         self.acl_active_table = key
589         self.run_verify_test(self.IP, self.IPV4, -1)
590
591     def test_0015_inacl_deny(self):
592         """Input  L2 ACL test - deny
593
594         Test scenario for basic IP ACL with source IP
595             - Create IPv4 stream for pg0 -> pg1 interface.
596
597             - Create ACL with source MAC address.
598             - Send and verify no received packets on pg1 interface.
599         """
600         key = "mac_in"
601         self.build_classify_table(src_mac="ffffffffffff", hit_next_index=0, key=key)
602         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
603         self.acl_active_table = key
604         self.run_verify_negat_test(self.IP, self.IPV4, -1)
605
606     def test_0020_outacl_permit(self):
607         """Output L2 ACL test - permit
608
609         Test scenario for basic IP ACL with source IP
610             - Create IPv4 stream for pg0 -> pg1 interface.
611             - Create ACL with source MAC address.
612             - Send and verify received packets on pg1 interface.
613         """
614         key = "mac_out"
615         self.build_classify_table(src_mac="ffffffffffff", key=key)
616         self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get(key))
617         self.acl_active_table = key
618         self.run_verify_test(self.IP, self.IPV4, -1)
619
620     def test_0025_outacl_deny(self):
621         """Output L2 ACL test - deny
622
623         Test scenario for basic IP ACL with source IP
624             - Create IPv4 stream for pg0 -> pg1 interface.
625             - Create ACL with source MAC address.
626             - Send and verify no received packets on pg1 interface.
627         """
628         key = "mac_out"
629         self.build_classify_table(src_mac="ffffffffffff", hit_next_index=0, key=key)
630         self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get(key))
631         self.acl_active_table = key
632         self.run_verify_negat_test(self.IP, self.IPV4, -1)
633
634     def test_0030_inoutacl_permit(self):
635         """Input+Output L2 ACL test - permit
636
637         Test scenario for basic IP ACL with source IP
638             - Create IPv4 stream for pg0 -> pg1 interface.
639             - Create ACLs with source MAC address.
640             - Send and verify received packets on pg1 interface.
641         """
642         key = "mac_inout"
643         self.build_classify_table(src_mac="ffffffffffff", key=key)
644         self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get(key))
645         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
646         self.acl_active_table = key
647         self.run_verify_test(self.IP, self.IPV4, -1)
648
649
650 if __name__ == "__main__":
651     unittest.main(testRunner=VppTestRunner)