tests: Add support for getting corefile patterns on FreeBSD
[vpp.git] / test / test_classify_l2_acl.py
1 #!/usr/bin/env python3
2 """ Classifier-based L2 ACL Test Case HLD:
3 """
4
5 import unittest
6 import random
7
8 from scapy.packet import Raw
9 from scapy.data import ETH_P_IP
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP, TCP, UDP, ICMP
12 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
13 from scapy.layers.inet6 import IPv6ExtHdrFragment
14 from asfframework import VppTestRunner
15 from util import Host, ppp
16 from template_classifier import TestClassifier
17
18
19 class TestClassifyAcl(TestClassifier):
20     """Classifier-based L2 input and output ACL Test Case"""
21
22     # traffic types
23     IP = 0
24     ICMP = 1
25
26     # IP version
27     IPRANDOM = -1
28     IPV4 = 0
29     IPV6 = 1
30
31     # rule types
32     DENY = 0
33     PERMIT = 1
34
35     # supported protocols
36     proto = [[6, 17], [1, 58]]
37     proto_map = {1: "ICMP", 58: "ICMPv6EchoRequest", 6: "TCP", 17: "UDP"}
38     ICMPv4 = 0
39     ICMPv6 = 1
40     TCP = 0
41     UDP = 1
42     PROTO_ALL = 0
43
44     # port ranges
45     PORTS_ALL = -1
46     PORTS_RANGE = 0
47     PORTS_RANGE_2 = 1
48     udp_sport_from = 10
49     udp_sport_to = udp_sport_from + 5
50     udp_dport_from = 20000
51     udp_dport_to = udp_dport_from + 5000
52     tcp_sport_from = 30
53     tcp_sport_to = tcp_sport_from + 5
54     tcp_dport_from = 40000
55     tcp_dport_to = tcp_dport_from + 5000
56
57     udp_sport_from_2 = 90
58     udp_sport_to_2 = udp_sport_from_2 + 5
59     udp_dport_from_2 = 30000
60     udp_dport_to_2 = udp_dport_from_2 + 5000
61     tcp_sport_from_2 = 130
62     tcp_sport_to_2 = tcp_sport_from_2 + 5
63     tcp_dport_from_2 = 20000
64     tcp_dport_to_2 = tcp_dport_from_2 + 5000
65
66     icmp4_type = 8  # echo request
67     icmp4_code = 3
68     icmp6_type = 128  # echo request
69     icmp6_code = 3
70
71     icmp4_type_2 = 8
72     icmp4_code_from_2 = 5
73     icmp4_code_to_2 = 20
74     icmp6_type_2 = 128
75     icmp6_code_from_2 = 8
76     icmp6_code_to_2 = 42
77
78     # Test variables
79     bd_id = 1
80
81     @classmethod
82     def setUpClass(cls):
83         """
84         Perform standard class setup (defined by class method setUpClass in
85         class VppTestCase) before running the test case, set test case related
86         variables and configure VPP.
87         """
88         super(TestClassifyAcl, cls).setUpClass()
89         cls.af = None
90
91         try:
92             # Create 2 pg interfaces
93             cls.create_pg_interfaces(range(2))
94
95             # Packet flows mapping pg0 -> pg1, pg2 etc.
96             cls.flows = dict()
97             cls.flows[cls.pg0] = [cls.pg1]
98
99             # Packet sizes
100             cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
101
102             # Create BD with MAC learning and unknown unicast flooding disabled
103             # and put interfaces to this BD
104             cls.vapi.bridge_domain_add_del_v2(
105                 bd_id=cls.bd_id, uu_flood=1, learn=1, flood=1, forward=1, is_add=1
106             )
107             for pg_if in cls.pg_interfaces:
108                 cls.vapi.sw_interface_set_l2_bridge(
109                     rx_sw_if_index=pg_if.sw_if_index, bd_id=cls.bd_id
110                 )
111
112             # Set up all interfaces
113             for i in cls.pg_interfaces:
114                 i.admin_up()
115
116             # Mapping between packet-generator index and lists of test hosts
117             cls.hosts_by_pg_idx = dict()
118             for pg_if in cls.pg_interfaces:
119                 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
120
121             # Create list of deleted hosts
122             cls.deleted_hosts_by_pg_idx = dict()
123             for pg_if in cls.pg_interfaces:
124                 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
125
126             # warm-up the mac address tables
127             # self.warmup_test()
128
129             # Holder of the active classify table key
130             cls.acl_active_table = ""
131
132         except Exception:
133             super(TestClassifyAcl, cls).tearDownClass()
134             raise
135
136     @classmethod
137     def tearDownClass(cls):
138         super(TestClassifyAcl, cls).tearDownClass()
139
140     def setUp(self):
141         super(TestClassifyAcl, self).setUp()
142         self.acl_tbl_idx = {}
143
144     def tearDown(self):
145         """
146         Show various debug prints after each test.
147         """
148         if not self.vpp_dead:
149             if self.acl_active_table == "mac_inout":
150                 self.output_acl_set_interface(
151                     self.pg1, self.acl_tbl_idx.get(self.acl_active_table), 0
152                 )
153                 self.input_acl_set_interface(
154                     self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0
155                 )
156                 self.acl_active_table = ""
157             elif self.acl_active_table == "mac_out":
158                 self.output_acl_set_interface(
159                     self.pg1, self.acl_tbl_idx.get(self.acl_active_table), 0
160                 )
161                 self.acl_active_table = ""
162             elif self.acl_active_table == "mac_in":
163                 self.input_acl_set_interface(
164                     self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0
165                 )
166                 self.acl_active_table = ""
167
168         super(TestClassifyAcl, self).tearDown()
169
170     def create_classify_session(
171         self, intf, table_index, match, hit_next_index=0xFFFFFFFF, is_add=1
172     ):
173         """Create Classify Session
174
175         :param VppInterface intf: Interface to apply classify session.
176         :param int table_index: table index to identify classify table.
177         :param str match: matched value for interested traffic.
178         :param int is_add: option to configure classify session.
179             - create(1) or delete(0)
180         """
181         mask_match, mask_match_len = self._resolve_mask_match(match)
182         r = self.vapi.classify_add_del_session(
183             is_add=is_add,
184             table_index=table_index,
185             match=mask_match,
186             match_len=mask_match_len,
187             hit_next_index=hit_next_index,
188         )
189         self.assertIsNotNone(r, "No response msg for add_del_session")
190
191     def create_hosts(self, count, start=0):
192         """
193         Create required number of host MAC addresses and distribute them among
194         interfaces. Create host IPv4 address for every host MAC address.
195
196         :param int count: Number of hosts to create MAC/IPv4 addresses for.
197         :param int start: Number to start numbering from.
198         """
199         n_int = len(self.pg_interfaces)
200         macs_per_if = count // n_int
201         i = -1
202         for pg_if in self.pg_interfaces:
203             i += 1
204             start_nr = macs_per_if * i + start
205             end_nr = (
206                 count + start if i == (n_int - 1) else macs_per_if * (i + 1) + start
207             )
208             hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
209             for j in range(start_nr, end_nr):
210                 host = Host(
211                     "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
212                     "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
213                     "2017:dead:%02x::%u" % (pg_if.sw_if_index, j),
214                 )
215                 hosts.append(host)
216
217     def create_upper_layer(self, packet_index, proto, ports=0):
218         p = self.proto_map[proto]
219         if p == "UDP":
220             if ports == 0:
221                 return UDP(
222                     sport=random.randint(self.udp_sport_from, self.udp_sport_to),
223                     dport=random.randint(self.udp_dport_from, self.udp_dport_to),
224                 )
225             else:
226                 return UDP(sport=ports, dport=ports)
227         elif p == "TCP":
228             if ports == 0:
229                 return TCP(
230                     sport=random.randint(self.tcp_sport_from, self.tcp_sport_to),
231                     dport=random.randint(self.tcp_dport_from, self.tcp_dport_to),
232                 )
233             else:
234                 return TCP(sport=ports, dport=ports)
235         return ""
236
237     def create_stream(
238         self,
239         src_if,
240         packet_sizes,
241         traffic_type=0,
242         ipv6=0,
243         proto=-1,
244         ports=0,
245         fragments=False,
246         pkt_raw=True,
247         etype=-1,
248     ):
249         """
250         Create input packet stream for defined interface using hosts or
251         deleted_hosts list.
252
253         :param object src_if: Interface to create packet stream for.
254         :param list packet_sizes: List of required packet sizes.
255         :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
256         :return: Stream of packets.
257         """
258         pkts = []
259         if self.flows.__contains__(src_if):
260             src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
261             for dst_if in self.flows[src_if]:
262                 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
263                 n_int = len(dst_hosts) * len(src_hosts)
264                 for i in range(0, n_int):
265                     dst_host = dst_hosts[i // len(src_hosts)]
266                     src_host = src_hosts[i % len(src_hosts)]
267                     pkt_info = self.create_packet_info(src_if, dst_if)
268                     if ipv6 == 1:
269                         pkt_info.ip = 1
270                     elif ipv6 == 0:
271                         pkt_info.ip = 0
272                     else:
273                         pkt_info.ip = random.choice([0, 1])
274                     if proto == -1:
275                         pkt_info.proto = random.choice(self.proto[self.IP])
276                     else:
277                         pkt_info.proto = proto
278                     payload = self.info_to_payload(pkt_info)
279                     p = Ether(dst=dst_host.mac, src=src_host.mac)
280                     if etype > 0:
281                         p = Ether(dst=dst_host.mac, src=src_host.mac, type=etype)
282                     if pkt_info.ip:
283                         p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
284                         if fragments:
285                             p /= IPv6ExtHdrFragment(offset=64, m=1)
286                     else:
287                         if fragments:
288                             p /= IP(
289                                 src=src_host.ip4, dst=dst_host.ip4, flags=1, frag=64
290                             )
291                         else:
292                             p /= IP(src=src_host.ip4, dst=dst_host.ip4)
293                     if traffic_type == self.ICMP:
294                         if pkt_info.ip:
295                             p /= ICMPv6EchoRequest(
296                                 type=self.icmp6_type, code=self.icmp6_code
297                             )
298                         else:
299                             p /= ICMP(type=self.icmp4_type, code=self.icmp4_code)
300                     else:
301                         p /= self.create_upper_layer(i, pkt_info.proto, ports)
302                     if pkt_raw:
303                         p /= Raw(payload)
304                         pkt_info.data = p.copy()
305                     if pkt_raw:
306                         size = random.choice(packet_sizes)
307                         self.extend_packet(p, size)
308                     pkts.append(p)
309         return pkts
310
311     def verify_capture(self, pg_if, capture, traffic_type=0, ip_type=0, etype=-1):
312         """
313         Verify captured input packet stream for defined interface.
314
315         :param object pg_if: Interface to verify captured packet stream for.
316         :param list capture: Captured packet stream.
317         :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
318         """
319         last_info = dict()
320         for i in self.pg_interfaces:
321             last_info[i.sw_if_index] = None
322         dst_sw_if_index = pg_if.sw_if_index
323         for packet in capture:
324             if etype > 0:
325                 if packet[Ether].type != etype:
326                     self.logger.error(ppp("Unexpected ethertype in packet:", packet))
327                 else:
328                     continue
329             try:
330                 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
331                 if traffic_type == self.ICMP and ip_type == self.IPV6:
332                     payload_info = self.payload_to_info(packet[ICMPv6EchoRequest].data)
333                     payload = packet[ICMPv6EchoRequest]
334                 else:
335                     payload_info = self.payload_to_info(packet[Raw])
336                     payload = packet[self.proto_map[payload_info.proto]]
337             except:
338                 self.logger.error(
339                     ppp("Unexpected or invalid packet (outside network):", packet)
340                 )
341                 raise
342
343             if ip_type != 0:
344                 self.assertEqual(payload_info.ip, ip_type)
345             if traffic_type == self.ICMP:
346                 try:
347                     if payload_info.ip == 0:
348                         self.assertEqual(payload.type, self.icmp4_type)
349                         self.assertEqual(payload.code, self.icmp4_code)
350                     else:
351                         self.assertEqual(payload.type, self.icmp6_type)
352                         self.assertEqual(payload.code, self.icmp6_code)
353                 except:
354                     self.logger.error(
355                         ppp("Unexpected or invalid packet (outside network):", packet)
356                     )
357                     raise
358             else:
359                 try:
360                     ip_version = IPv6 if payload_info.ip == 1 else IP
361
362                     ip = packet[ip_version]
363                     packet_index = payload_info.index
364
365                     self.assertEqual(payload_info.dst, dst_sw_if_index)
366                     self.logger.debug(
367                         "Got packet on port %s: src=%u (id=%u)"
368                         % (pg_if.name, payload_info.src, packet_index)
369                     )
370                     next_info = self.get_next_packet_info_for_interface2(
371                         payload_info.src, dst_sw_if_index, last_info[payload_info.src]
372                     )
373                     last_info[payload_info.src] = next_info
374                     self.assertTrue(next_info is not None)
375                     self.assertEqual(packet_index, next_info.index)
376                     saved_packet = next_info.data
377                     # Check standard fields
378                     self.assertEqual(ip.src, saved_packet[ip_version].src)
379                     self.assertEqual(ip.dst, saved_packet[ip_version].dst)
380                     p = self.proto_map[payload_info.proto]
381                     if p == "TCP":
382                         tcp = packet[TCP]
383                         self.assertEqual(tcp.sport, saved_packet[TCP].sport)
384                         self.assertEqual(tcp.dport, saved_packet[TCP].dport)
385                     elif p == "UDP":
386                         udp = packet[UDP]
387                         self.assertEqual(udp.sport, saved_packet[UDP].sport)
388                         self.assertEqual(udp.dport, saved_packet[UDP].dport)
389                 except:
390                     self.logger.error(ppp("Unexpected or invalid packet:", packet))
391                     raise
392         for i in self.pg_interfaces:
393             remaining_packet = self.get_next_packet_info_for_interface2(
394                 i, dst_sw_if_index, last_info[i.sw_if_index]
395             )
396             self.assertTrue(
397                 remaining_packet is None,
398                 "Port %u: Packet expected from source %u didn't arrive"
399                 % (dst_sw_if_index, i.sw_if_index),
400             )
401
402     def run_traffic_no_check(self):
403         # Test
404         # Create incoming packet streams for packet-generator interfaces
405         for i in self.pg_interfaces:
406             if self.flows.__contains__(i):
407                 pkts = self.create_stream(i, self.pg_if_packet_sizes)
408                 if len(pkts) > 0:
409                     i.add_stream(pkts)
410
411         # Enable packet capture and start packet sending
412         self.pg_enable_capture(self.pg_interfaces)
413         self.pg_start()
414
415     def run_verify_test(
416         self,
417         traffic_type=0,
418         ip_type=0,
419         proto=-1,
420         ports=0,
421         frags=False,
422         pkt_raw=True,
423         etype=-1,
424     ):
425         # Test
426         # Create incoming packet streams for packet-generator interfaces
427         pkts_cnt = 0
428         for i in self.pg_interfaces:
429             if self.flows.__contains__(i):
430                 pkts = self.create_stream(
431                     i,
432                     self.pg_if_packet_sizes,
433                     traffic_type,
434                     ip_type,
435                     proto,
436                     ports,
437                     frags,
438                     pkt_raw,
439                     etype,
440                 )
441                 if len(pkts) > 0:
442                     i.add_stream(pkts)
443                     pkts_cnt += len(pkts)
444
445         # Enable packet capture and start packet sendingself.IPV
446         self.pg_enable_capture(self.pg_interfaces)
447         self.pg_start()
448
449         # Verify
450         # Verify outgoing packet streams per packet-generator interface
451         for src_if in self.pg_interfaces:
452             if self.flows.__contains__(src_if):
453                 for dst_if in self.flows[src_if]:
454                     capture = dst_if.get_capture(pkts_cnt)
455                     self.logger.info("Verifying capture on interface %s" % dst_if.name)
456                     self.verify_capture(dst_if, capture, traffic_type, ip_type, etype)
457
458     def run_verify_negat_test(
459         self, traffic_type=0, ip_type=0, proto=-1, ports=0, frags=False, etype=-1
460     ):
461         # Test
462         self.reset_packet_infos()
463         for i in self.pg_interfaces:
464             if self.flows.__contains__(i):
465                 pkts = self.create_stream(
466                     i,
467                     self.pg_if_packet_sizes,
468                     traffic_type,
469                     ip_type,
470                     proto,
471                     ports,
472                     frags,
473                     True,
474                     etype,
475                 )
476                 if len(pkts) > 0:
477                     i.add_stream(pkts)
478
479         # Enable packet capture and start packet sending
480         self.pg_enable_capture(self.pg_interfaces)
481         self.pg_start()
482
483         # Verify
484         # Verify outgoing packet streams per packet-generator interface
485         for src_if in self.pg_interfaces:
486             if self.flows.__contains__(src_if):
487                 for dst_if in self.flows[src_if]:
488                     self.logger.info("Verifying capture on interface %s" % dst_if.name)
489                     capture = dst_if.get_capture(0)
490                     self.assertEqual(len(capture), 0)
491
492     def build_classify_table(
493         self,
494         src_mac="",
495         dst_mac="",
496         ether_type="",
497         etype="",
498         key="mac",
499         hit_next_index=0xFFFFFFFF,
500     ):
501         # Basic ACL testing
502         a_mask = self.build_mac_mask(
503             src_mac=src_mac, dst_mac=dst_mac, ether_type=ether_type
504         )
505         self.create_classify_table(key, a_mask)
506         for host in self.hosts_by_pg_idx[self.pg0.sw_if_index]:
507             s_mac = host.mac if src_mac else ""
508             if dst_mac:
509                 for dst_if in self.flows[self.pg0]:
510                     for dst_host in self.hosts_by_pg_idx[dst_if.sw_if_index]:
511                         self.create_classify_session(
512                             self.pg0,
513                             self.acl_tbl_idx.get(key),
514                             self.build_mac_match(
515                                 src_mac=s_mac, dst_mac=dst_host.mac, ether_type=etype
516                             ),
517                             hit_next_index=hit_next_index,
518                         )
519             else:
520                 self.create_classify_session(
521                     self.pg0,
522                     self.acl_tbl_idx.get(key),
523                     self.build_mac_match(src_mac=s_mac, dst_mac="", ether_type=etype),
524                     hit_next_index=hit_next_index,
525                 )
526
527     def test_0000_warmup_test(self):
528         """Learn the MAC addresses"""
529         self.create_hosts(2)
530         self.run_traffic_no_check()
531
532     def test_0010_inacl_permit_src_mac(self):
533         """Input  L2 ACL test - permit source MAC
534
535         Test scenario for basic IP ACL with source IP
536             - Create IPv4 stream for pg0 -> pg1 interface.
537             - Create ACL with source MAC address.
538             - Send and verify received packets on pg1 interface.
539         """
540         key = "mac_in"
541         self.build_classify_table(src_mac="ffffffffffff", key=key)
542         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
543         self.acl_active_table = key
544         self.run_verify_test(self.IP, self.IPV4, -1)
545
546     def test_0011_inacl_permit_dst_mac(self):
547         """Input  L2 ACL test - permit destination MAC
548
549         Test scenario for basic IP ACL with source IP
550             - Create IPv4 stream for pg0 -> pg1 interface.
551             - Create ACL with destination MAC address.
552             - Send and verify received packets on pg1 interface.
553         """
554         key = "mac_in"
555         self.build_classify_table(dst_mac="ffffffffffff", key=key)
556         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
557         self.acl_active_table = key
558         self.run_verify_test(self.IP, self.IPV4, -1)
559
560     def test_0012_inacl_permit_src_dst_mac(self):
561         """Input  L2 ACL test - permit source and destination MAC
562
563         Test scenario for basic IP ACL with source IP
564             - Create IPv4 stream for pg0 -> pg1 interface.
565             - Create ACL with source and destination MAC addresses.
566             - Send and verify received packets on pg1 interface.
567         """
568         key = "mac_in"
569         self.build_classify_table(
570             src_mac="ffffffffffff", dst_mac="ffffffffffff", key=key
571         )
572         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
573         self.acl_active_table = key
574         self.run_verify_test(self.IP, self.IPV4, -1)
575
576     def test_0013_inacl_permit_ether_type(self):
577         """Input  L2 ACL test - permit ether_type
578
579         Test scenario for basic IP ACL with source IP
580             - Create IPv4 stream for pg0 -> pg1 interface.
581             - Create ACL with destination MAC address.
582             - Send and verify received packets on pg1 interface.
583         """
584         key = "mac_in"
585         self.build_classify_table(ether_type="ffff", etype=hex(ETH_P_IP)[2:], key=key)
586         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
587         self.acl_active_table = key
588         self.run_verify_test(self.IP, self.IPV4, -1)
589
590     def test_0015_inacl_deny(self):
591         """Input  L2 ACL test - deny
592
593         Test scenario for basic IP ACL with source IP
594             - Create IPv4 stream for pg0 -> pg1 interface.
595
596             - Create ACL with source MAC address.
597             - Send and verify no received packets on pg1 interface.
598         """
599         key = "mac_in"
600         self.build_classify_table(src_mac="ffffffffffff", hit_next_index=0, key=key)
601         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
602         self.acl_active_table = key
603         self.run_verify_negat_test(self.IP, self.IPV4, -1)
604
605     def test_0020_outacl_permit(self):
606         """Output L2 ACL test - permit
607
608         Test scenario for basic IP ACL with source IP
609             - Create IPv4 stream for pg0 -> pg1 interface.
610             - Create ACL with source MAC address.
611             - Send and verify received packets on pg1 interface.
612         """
613         key = "mac_out"
614         self.build_classify_table(src_mac="ffffffffffff", key=key)
615         self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get(key))
616         self.acl_active_table = key
617         self.run_verify_test(self.IP, self.IPV4, -1)
618
619     def test_0025_outacl_deny(self):
620         """Output L2 ACL test - deny
621
622         Test scenario for basic IP ACL with source IP
623             - Create IPv4 stream for pg0 -> pg1 interface.
624             - Create ACL with source MAC address.
625             - Send and verify no received packets on pg1 interface.
626         """
627         key = "mac_out"
628         self.build_classify_table(src_mac="ffffffffffff", hit_next_index=0, key=key)
629         self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get(key))
630         self.acl_active_table = key
631         self.run_verify_negat_test(self.IP, self.IPV4, -1)
632
633     def test_0030_inoutacl_permit(self):
634         """Input+Output L2 ACL test - permit
635
636         Test scenario for basic IP ACL with source IP
637             - Create IPv4 stream for pg0 -> pg1 interface.
638             - Create ACLs with source MAC address.
639             - Send and verify received packets on pg1 interface.
640         """
641         key = "mac_inout"
642         self.build_classify_table(src_mac="ffffffffffff", key=key)
643         self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get(key))
644         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
645         self.acl_active_table = key
646         self.run_verify_test(self.IP, self.IPV4, -1)
647
648
649 if __name__ == "__main__":
650     unittest.main(testRunner=VppTestRunner)