tests: replace pycodestyle with black
[vpp.git] / test / template_classifier.py
1 #!/usr/bin/env python3
2
3 import binascii
4 import socket
5 from socket import AF_INET, AF_INET6
6 import unittest
7 import sys
8 from dataclasses import dataclass
9
10 from framework import VppTestCase
11
12 from scapy.packet import Raw
13 from scapy.layers.l2 import Ether
14 from scapy.layers.inet import IP, UDP, TCP
15 from scapy.layers.inet6 import IPv6
16 from util import ppp
17
18
19 @dataclass
20 class VarMask:
21     offset: int
22     spec: str
23
24
25 @dataclass
26 class VarMatch:
27     offset: int
28     value: int
29     length: int
30
31
32 class TestClassifier(VppTestCase):
33     @staticmethod
34     def _resolve_mask_match(mask_match):
35         mask_match = binascii.unhexlify(mask_match)
36         mask_match_len = ((len(mask_match) - 1) // 16 + 1) * 16
37         mask_match = mask_match + b"\0" * (mask_match_len - len(mask_match))
38         return mask_match, mask_match_len
39
40     @classmethod
41     def setUpClass(cls):
42         """
43         Perform standard class setup (defined by class method setUpClass in
44         class VppTestCase) before running the test case, set test case related
45         variables and configure VPP.
46         """
47         super(TestClassifier, cls).setUpClass()
48         cls.acl_active_table = ""
49         cls.af = AF_INET
50
51     def setUp(self):
52         """
53         Perform test setup before test case.
54
55         **Config:**
56             - create 4 pg interfaces
57                 - untagged pg0/pg1/pg2 interface
58                     pg0 -------> pg1 (IP ACL)
59                            \
60                             ---> pg2 (MAC ACL))
61                              \
62                               -> pg3 (PBR)
63             - setup interfaces:
64                 - put it into UP state
65                 - set IPv4/6 addresses
66                 - resolve neighbor address using ARP
67
68         :ivar list interfaces: pg interfaces.
69         :ivar list pg_if_packet_sizes: packet sizes in test.
70         :ivar dict acl_tbl_idx: ACL table index.
71         :ivar int pbr_vrfid: VRF id for PBR test.
72         """
73         self.reset_packet_infos()
74         super(TestClassifier, self).setUp()
75         if self.af is None:  # l2_acl test case
76             return
77
78         # create 4 pg interfaces
79         self.create_pg_interfaces(range(4))
80
81         # packet sizes to test
82         self.pg_if_packet_sizes = [64, 9018]
83
84         self.interfaces = list(self.pg_interfaces)
85
86         # ACL & PBR vars
87         self.acl_tbl_idx = {}
88         self.pbr_vrfid = 200
89
90         # setup all interfaces
91         for intf in self.interfaces:
92             intf.admin_up()
93             if self.af == AF_INET:
94                 intf.config_ip4()
95                 intf.resolve_arp()
96             elif self.af == AF_INET6:
97                 intf.config_ip6()
98                 intf.resolve_ndp()
99
100     def tearDown(self):
101         """Run standard test teardown and acl related log."""
102         if self.af is not None and not self.vpp_dead:
103             if self.af == AF_INET:
104                 self.logger.info(self.vapi.ppcli("show inacl type ip4"))
105                 self.logger.info(self.vapi.ppcli("show outacl type ip4"))
106             elif self.af == AF_INET6:
107                 self.logger.info(self.vapi.ppcli("show inacl type ip6"))
108                 self.logger.info(self.vapi.ppcli("show outacl type ip6"))
109
110             self.logger.info(self.vapi.cli("show classify table verbose"))
111             self.logger.info(self.vapi.cli("show ip fib"))
112             self.logger.info(self.vapi.cli("show error"))
113
114             if self.acl_active_table.endswith("out"):
115                 self.output_acl_set_interface(
116                     self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0
117                 )
118             elif self.acl_active_table != "":
119                 self.input_acl_set_interface(
120                     self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0
121                 )
122             self.acl_active_table = ""
123
124             for intf in self.interfaces:
125                 if self.af == AF_INET:
126                     intf.unconfig_ip4()
127                 elif self.af == AF_INET6:
128                     intf.unconfig_ip6()
129                 intf.admin_down()
130
131         super(TestClassifier, self).tearDown()
132
133     @staticmethod
134     def build_mac_match(dst_mac="", src_mac="", ether_type=""):
135         """Build MAC ACL match data with hexstring format.
136
137         :param str dst_mac: source MAC address <x:x:x:x:x:x>
138         :param str src_mac: destination MAC address <x:x:x:x:x:x>
139         :param str ether_type: ethernet type <0-ffff>
140         """
141         if dst_mac:
142             dst_mac = dst_mac.replace(":", "")
143         if src_mac:
144             src_mac = src_mac.replace(":", "")
145
146         return (
147             "{!s:0>12}{!s:0>12}{!s:0>4}".format(dst_mac, src_mac, ether_type)
148         ).rstrip()
149
150     @staticmethod
151     def build_mac_mask(dst_mac="", src_mac="", ether_type=""):
152         """Build MAC ACL mask data with hexstring format.
153
154         :param str dst_mac: source MAC address <0-ffffffffffff>
155         :param str src_mac: destination MAC address <0-ffffffffffff>
156         :param str ether_type: ethernet type <0-ffff>
157         """
158
159         return (
160             "{!s:0>12}{!s:0>12}{!s:0>4}".format(dst_mac, src_mac, ether_type)
161         ).rstrip()
162
163     @staticmethod
164     def build_ip_mask(proto="", src_ip="", dst_ip="", src_port="", dst_port=""):
165         """Build IP ACL mask data with hexstring format.
166
167         :param str proto: protocol number <0-ff>
168         :param str src_ip: source ip address <0-ffffffff>
169         :param str dst_ip: destination ip address <0-ffffffff>
170         :param str src_port: source port number <0-ffff>
171         :param str dst_port: destination port number <0-ffff>
172         """
173
174         return (
175             "{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}".format(
176                 proto, src_ip, dst_ip, src_port, dst_port
177             )
178         ).rstrip("0")
179
180     @staticmethod
181     def build_ip6_mask(nh="", src_ip="", dst_ip="", src_port="", dst_port=""):
182         """Build IPv6 ACL mask data with hexstring format.
183
184         :param str nh: next header number <0-ff>
185         :param str src_ip: source ip address <0-ffffffff>
186         :param str dst_ip: destination ip address <0-ffffffff>
187         :param str src_port: source port number <0-ffff>
188         :param str dst_port: destination port number <0-ffff>
189         """
190
191         return (
192             "{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}".format(
193                 nh, src_ip, dst_ip, src_port, dst_port
194             )
195         ).rstrip("0")
196
197     @staticmethod
198     def build_payload_mask(masks):
199         payload_mask = ""
200
201         for mask in masks:
202             # offset is specified in bytes, convert to hex format.
203             length = (mask.offset * 2) + len(mask.spec)
204             format_spec = "{!s:0>" + str(length) + "}"
205             payload_mask += format_spec.format(mask.spec)
206
207         return payload_mask.rstrip("0")
208
209     @staticmethod
210     def build_ip_match(proto=0, src_ip="", dst_ip="", src_port=0, dst_port=0):
211         """Build IP ACL match data with hexstring format.
212
213         :param int proto: protocol number with valid option "x"
214         :param str src_ip: source ip address with format of "x.x.x.x"
215         :param str dst_ip: destination ip address with format of "x.x.x.x"
216         :param int src_port: source port number "x"
217         :param int dst_port: destination port number "x"
218         """
219         if src_ip:
220             src_ip = binascii.hexlify(socket.inet_aton(src_ip)).decode("ascii")
221         if dst_ip:
222             dst_ip = binascii.hexlify(socket.inet_aton(dst_ip)).decode("ascii")
223
224         return (
225             "{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}".format(
226                 hex(proto)[2:], src_ip, dst_ip, hex(src_port)[2:], hex(dst_port)[2:]
227             )
228         ).rstrip("0")
229
230     @staticmethod
231     def build_ip6_match(nh=0, src_ip="", dst_ip="", src_port=0, dst_port=0):
232         """Build IPv6 ACL match data with hexstring format.
233
234         :param int nh: next header number with valid option "x"
235         :param str src_ip: source ip6 address with format of "xxx:xxxx::xxxx"
236         :param str dst_ip: destination ip6 address with format of
237             "xxx:xxxx::xxxx"
238         :param int src_port: source port number "x"
239         :param int dst_port: destination port number "x"
240         """
241         if src_ip:
242             if sys.version_info[0] == 2:
243                 src_ip = binascii.hexlify(socket.inet_pton(socket.AF_INET6, src_ip))
244             else:
245                 src_ip = socket.inet_pton(socket.AF_INET6, src_ip).hex()
246
247         if dst_ip:
248             if sys.version_info[0] == 2:
249                 dst_ip = binascii.hexlify(socket.inet_pton(socket.AF_INET6, dst_ip))
250             else:
251                 dst_ip = socket.inet_pton(socket.AF_INET6, dst_ip).hex()
252
253         return (
254             "{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}".format(
255                 hex(nh)[2:], src_ip, dst_ip, hex(src_port)[2:], hex(dst_port)[2:]
256             )
257         ).rstrip("0")
258
259     @staticmethod
260     def build_payload_match(matches):
261         payload_match = ""
262
263         for match in matches:
264             sval = str(hex(match.value)[2:])
265             # offset is specified in bytes, convert to hex format.
266             length = (match.offset + match.length) * 2
267
268             format_spec = "{!s:0>" + str(length) + "}"
269             payload_match += format_spec.format(sval)
270
271         return payload_match.rstrip("0")
272
273     def create_stream(
274         self,
275         src_if,
276         dst_if,
277         packet_sizes,
278         proto_l=UDP(sport=1234, dport=5678),
279         payload_ex=None,
280     ):
281         """Create input packet stream for defined interfaces.
282
283         :param VppInterface src_if: Source Interface for packet stream.
284         :param VppInterface dst_if: Destination Interface for packet stream.
285         :param list packet_sizes: packet size to test.
286         :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
287         """
288         pkts = []
289
290         for size in packet_sizes:
291             info = self.create_packet_info(src_if, dst_if)
292             payload = self.info_to_payload(info)
293
294             # append any additional payload after info
295             if payload_ex is not None:
296                 payload += payload_ex
297
298             if self.af == AF_INET:
299                 p = (
300                     Ether(dst=src_if.local_mac, src=src_if.remote_mac)
301                     / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
302                     / proto_l
303                     / Raw(payload)
304                 )
305             elif self.af == AF_INET6:
306                 p = (
307                     Ether(dst=src_if.local_mac, src=src_if.remote_mac)
308                     / IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)
309                     / proto_l
310                     / Raw(payload)
311                 )
312             info.data = p.copy()
313             self.extend_packet(p, size)
314             pkts.append(p)
315         return pkts
316
317     def verify_capture(self, dst_if, capture, proto_l=UDP):
318         """Verify captured input packet stream for defined interface.
319
320         :param VppInterface dst_if: Interface to verify captured packet stream.
321         :param list capture: Captured packet stream.
322         :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
323         """
324         ip_proto = IP
325         if self.af == AF_INET6:
326             ip_proto = IPv6
327         self.logger.info("Verifying capture on interface %s" % dst_if.name)
328         last_info = dict()
329         for i in self.interfaces:
330             last_info[i.sw_if_index] = None
331         dst_sw_if_index = dst_if.sw_if_index
332         for packet in capture:
333             try:
334                 ip_received = packet[ip_proto]
335                 proto_received = packet[proto_l]
336                 payload_info = self.payload_to_info(packet[Raw])
337                 packet_index = payload_info.index
338                 self.assertEqual(payload_info.dst, dst_sw_if_index)
339                 self.logger.debug(
340                     "Got packet on port %s: src=%u (id=%u)"
341                     % (dst_if.name, payload_info.src, packet_index)
342                 )
343                 next_info = self.get_next_packet_info_for_interface2(
344                     payload_info.src, dst_sw_if_index, last_info[payload_info.src]
345                 )
346                 last_info[payload_info.src] = next_info
347                 self.assertTrue(next_info is not None)
348                 self.assertEqual(packet_index, next_info.index)
349                 saved_packet = next_info.data
350                 ip_saved = saved_packet[ip_proto]
351                 proto_saved = saved_packet[proto_l]
352                 # Check standard fields
353                 self.assertEqual(ip_received.src, ip_saved.src)
354                 self.assertEqual(ip_received.dst, ip_saved.dst)
355                 self.assertEqual(proto_received.sport, proto_saved.sport)
356                 self.assertEqual(proto_received.dport, proto_saved.dport)
357             except BaseException:
358                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
359                 raise
360         for i in self.interfaces:
361             remaining_packet = self.get_next_packet_info_for_interface2(
362                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index]
363             )
364             self.assertTrue(
365                 remaining_packet is None,
366                 "Interface %s: Packet expected from interface %s "
367                 "didn't arrive" % (dst_if.name, i.name),
368             )
369
370     def create_classify_table(self, key, mask, data_offset=0, next_table_index=None):
371         """Create Classify Table
372
373         :param str key: key for classify table (ex, ACL name).
374         :param str mask: mask value for interested traffic.
375         :param int data_offset:
376         :param str next_table_index
377         """
378         mask_match, mask_match_len = self._resolve_mask_match(mask)
379         r = self.vapi.classify_add_del_table(
380             is_add=1,
381             mask=mask_match,
382             mask_len=mask_match_len,
383             match_n_vectors=(len(mask) - 1) // 32 + 1,
384             miss_next_index=0,
385             current_data_flag=1,
386             current_data_offset=data_offset,
387             next_table_index=next_table_index,
388         )
389         self.assertIsNotNone(r, "No response msg for add_del_table")
390         self.acl_tbl_idx[key] = r.new_table_index
391
392     def create_classify_session(
393         self, table_index, match, pbr_option=0, vrfid=0, is_add=1
394     ):
395         """Create Classify Session
396
397         :param int table_index: table index to identify classify table.
398         :param str match: matched value for interested traffic.
399         :param int pbr_option: enable/disable PBR feature.
400         :param int vrfid: VRF id.
401         :param int is_add: option to configure classify session.
402             - create(1) or delete(0)
403         """
404         mask_match, mask_match_len = self._resolve_mask_match(match)
405         r = self.vapi.classify_add_del_session(
406             is_add=is_add,
407             table_index=table_index,
408             match=mask_match,
409             match_len=mask_match_len,
410             opaque_index=0,
411             action=pbr_option,
412             metadata=vrfid,
413         )
414         self.assertIsNotNone(r, "No response msg for add_del_session")
415
416     def input_acl_set_interface(self, intf, table_index, is_add=1):
417         """Configure Input ACL interface
418
419         :param VppInterface intf: Interface to apply Input ACL feature.
420         :param int table_index: table index to identify classify table.
421         :param int is_add: option to configure classify session.
422             - enable(1) or disable(0)
423         """
424         r = None
425         if self.af == AF_INET:
426             r = self.vapi.input_acl_set_interface(
427                 is_add, intf.sw_if_index, ip4_table_index=table_index
428             )
429         elif self.af == AF_INET6:
430             r = self.vapi.input_acl_set_interface(
431                 is_add, intf.sw_if_index, ip6_table_index=table_index
432             )
433         else:
434             r = self.vapi.input_acl_set_interface(
435                 is_add, intf.sw_if_index, l2_table_index=table_index
436             )
437         self.assertIsNotNone(r, "No response msg for acl_set_interface")
438
439     def output_acl_set_interface(self, intf, table_index, is_add=1):
440         """Configure Output ACL interface
441
442         :param VppInterface intf: Interface to apply Output ACL feature.
443         :param int table_index: table index to identify classify table.
444         :param int is_add: option to configure classify session.
445             - enable(1) or disable(0)
446         """
447         r = None
448         if self.af == AF_INET:
449             r = self.vapi.output_acl_set_interface(
450                 is_add, intf.sw_if_index, ip4_table_index=table_index
451             )
452         elif self.af == AF_INET6:
453             r = self.vapi.output_acl_set_interface(
454                 is_add, intf.sw_if_index, ip6_table_index=table_index
455             )
456         else:
457             r = self.vapi.output_acl_set_interface(
458                 is_add, intf.sw_if_index, l2_table_index=table_index
459             )
460         self.assertIsNotNone(r, "No response msg for acl_set_interface")
461
462     def config_pbr_fib_entry(self, intf, is_add=1):
463         """Configure fib entry to route traffic toward PBR VRF table
464
465         :param VppInterface intf: destination interface to be routed for PBR.
466
467         """
468         addr_len = 24
469         self.vapi.ip_add_del_route(
470             dst_address=intf.local_ip4,
471             dst_address_length=addr_len,
472             next_hop_address=intf.remote_ip4,
473             table_id=self.pbr_vrfid,
474             is_add=is_add,
475         )
476
477     def verify_vrf(self, vrf_id):
478         """
479         Check if the FIB table / VRF ID is configured.
480
481         :param int vrf_id: The FIB table / VRF ID to be verified.
482         :return: 1 if the FIB table / VRF ID is configured, otherwise return 0.
483         """
484         ip_fib_dump = self.vapi.ip_route_dump(vrf_id, False)
485         vrf_count = len(ip_fib_dump)
486         if vrf_count == 0:
487             self.logger.info("IPv4 VRF ID %d is not configured" % vrf_id)
488             return 0
489         else:
490             self.logger.info("IPv4 VRF ID %d is configured" % vrf_id)
491             return 1