ikev2: support variable-length nonces
[vpp.git] / test / template_classifier.py
1 #!/usr/bin/env python3
2
3 import binascii
4 import socket
5 from socket import AF_INET, AF_INET6
6 import unittest
7 import sys
8 from dataclasses import dataclass
9
10 from framework import VppTestCase
11
12 from scapy.packet import Raw
13 from scapy.layers.l2 import Ether
14 from scapy.layers.inet import IP, UDP, TCP
15 from scapy.layers.inet6 import IPv6
16 from util import ppp
17
18
19 @dataclass
20 class VarMask:
21     offset: int
22     spec: str
23
24
25 @dataclass
26 class VarMatch:
27     offset: int
28     value: int
29     length: int
30
31
32 class TestClassifier(VppTestCase):
33
34     @staticmethod
35     def _resolve_mask_match(mask_match):
36         mask_match = binascii.unhexlify(mask_match)
37         mask_match_len = ((len(mask_match) - 1) // 16 + 1) * 16
38         mask_match = mask_match + b'\0' * \
39             (mask_match_len - len(mask_match))
40         return mask_match, mask_match_len
41
42     @classmethod
43     def setUpClass(cls):
44         """
45         Perform standard class setup (defined by class method setUpClass in
46         class VppTestCase) before running the test case, set test case related
47         variables and configure VPP.
48         """
49         super(TestClassifier, cls).setUpClass()
50         cls.acl_active_table = ''
51         cls.af = AF_INET
52
53     def setUp(self):
54         """
55         Perform test setup before test case.
56
57         **Config:**
58             - create 4 pg interfaces
59                 - untagged pg0/pg1/pg2 interface
60                     pg0 -------> pg1 (IP ACL)
61                            \
62                             ---> pg2 (MAC ACL))
63                              \
64                               -> pg3 (PBR)
65             - setup interfaces:
66                 - put it into UP state
67                 - set IPv4/6 addresses
68                 - resolve neighbor address using ARP
69
70         :ivar list interfaces: pg interfaces.
71         :ivar list pg_if_packet_sizes: packet sizes in test.
72         :ivar dict acl_tbl_idx: ACL table index.
73         :ivar int pbr_vrfid: VRF id for PBR test.
74         """
75         self.reset_packet_infos()
76         super(TestClassifier, self).setUp()
77         if self.af is None:  # l2_acl test case
78             return
79
80         # create 4 pg interfaces
81         self.create_pg_interfaces(range(4))
82
83         # packet sizes to test
84         self.pg_if_packet_sizes = [64, 9018]
85
86         self.interfaces = list(self.pg_interfaces)
87
88         # ACL & PBR vars
89         self.acl_tbl_idx = {}
90         self.pbr_vrfid = 200
91
92         # setup all interfaces
93         for intf in self.interfaces:
94             intf.admin_up()
95             if self.af == AF_INET:
96                 intf.config_ip4()
97                 intf.resolve_arp()
98             elif self.af == AF_INET6:
99                 intf.config_ip6()
100                 intf.resolve_ndp()
101
102     def tearDown(self):
103         """Run standard test teardown and acl related log."""
104         if self.af is not None and not self.vpp_dead:
105             if self.af == AF_INET:
106                 self.logger.info(self.vapi.ppcli("show inacl type ip4"))
107                 self.logger.info(self.vapi.ppcli("show outacl type ip4"))
108             elif self.af == AF_INET6:
109                 self.logger.info(self.vapi.ppcli("show inacl type ip6"))
110                 self.logger.info(self.vapi.ppcli("show outacl type ip6"))
111
112             self.logger.info(self.vapi.cli("show classify table verbose"))
113             self.logger.info(self.vapi.cli("show ip fib"))
114             self.logger.info(self.vapi.cli("show error"))
115
116             if self.acl_active_table.endswith('out'):
117                 self.output_acl_set_interface(
118                     self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
119                 self.acl_active_table = ''
120             elif self.acl_active_table != '':
121                 self.input_acl_set_interface(
122                     self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
123                 self.acl_active_table = ''
124
125             for intf in self.interfaces:
126                 if self.af == AF_INET:
127                     intf.unconfig_ip4()
128                 elif self.af == AF_INET6:
129                     intf.unconfig_ip6()
130                 intf.admin_down()
131
132         super(TestClassifier, self).tearDown()
133
134     @staticmethod
135     def build_mac_match(dst_mac='', src_mac='', ether_type=''):
136         """Build MAC ACL match data with hexstring format.
137
138         :param str dst_mac: source MAC address <x:x:x:x:x:x>
139         :param str src_mac: destination MAC address <x:x:x:x:x:x>
140         :param str ether_type: ethernet type <0-ffff>
141         """
142         if dst_mac:
143             dst_mac = dst_mac.replace(':', '')
144         if src_mac:
145             src_mac = src_mac.replace(':', '')
146
147         return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
148             dst_mac, src_mac, ether_type)).rstrip()
149
150     @staticmethod
151     def build_mac_mask(dst_mac='', src_mac='', ether_type=''):
152         """Build MAC ACL mask data with hexstring format.
153
154         :param str dst_mac: source MAC address <0-ffffffffffff>
155         :param str src_mac: destination MAC address <0-ffffffffffff>
156         :param str ether_type: ethernet type <0-ffff>
157         """
158
159         return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
160             dst_mac, src_mac, ether_type)).rstrip()
161
162     @staticmethod
163     def build_ip_mask(proto='', src_ip='', dst_ip='',
164                       src_port='', dst_port=''):
165         """Build IP ACL mask data with hexstring format.
166
167         :param str proto: protocol number <0-ff>
168         :param str src_ip: source ip address <0-ffffffff>
169         :param str dst_ip: destination ip address <0-ffffffff>
170         :param str src_port: source port number <0-ffff>
171         :param str dst_port: destination port number <0-ffff>
172         """
173
174         return ('{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}'.format(
175             proto, src_ip, dst_ip, src_port, dst_port)).rstrip('0')
176
177     @staticmethod
178     def build_ip6_mask(nh='', src_ip='', dst_ip='',
179                        src_port='', dst_port=''):
180         """Build IPv6 ACL mask data with hexstring format.
181
182         :param str nh: next header number <0-ff>
183         :param str src_ip: source ip address <0-ffffffff>
184         :param str dst_ip: destination ip address <0-ffffffff>
185         :param str src_port: source port number <0-ffff>
186         :param str dst_port: destination port number <0-ffff>
187         """
188
189         return ('{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}'.format(
190             nh, src_ip, dst_ip, src_port, dst_port)).rstrip('0')
191
192     @staticmethod
193     def build_payload_mask(masks):
194         payload_mask = ''
195
196         for mask in masks:
197             # offset is specified in bytes, convert to hex format.
198             length = (mask.offset * 2) + len(mask.spec)
199             format_spec = '{!s:0>' + str(length) + '}'
200             payload_mask += format_spec.format(mask.spec)
201
202         return payload_mask.rstrip('0')
203
204     @staticmethod
205     def build_ip_match(proto=0, src_ip='', dst_ip='',
206                        src_port=0, dst_port=0):
207         """Build IP ACL match data with hexstring format.
208
209         :param int proto: protocol number with valid option "x"
210         :param str src_ip: source ip address with format of "x.x.x.x"
211         :param str dst_ip: destination ip address with format of "x.x.x.x"
212         :param int src_port: source port number "x"
213         :param int dst_port: destination port number "x"
214         """
215         if src_ip:
216             src_ip = binascii.hexlify(socket.inet_aton(src_ip)).decode('ascii')
217         if dst_ip:
218             dst_ip = binascii.hexlify(socket.inet_aton(dst_ip)).decode('ascii')
219
220         return ('{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}'.format(
221             hex(proto)[2:], src_ip, dst_ip, hex(src_port)[2:],
222             hex(dst_port)[2:])).rstrip('0')
223
224     @staticmethod
225     def build_ip6_match(nh=0, src_ip='', dst_ip='',
226                         src_port=0, dst_port=0):
227         """Build IPv6 ACL match data with hexstring format.
228
229         :param int nh: next header number with valid option "x"
230         :param str src_ip: source ip6 address with format of "xxx:xxxx::xxxx"
231         :param str dst_ip: destination ip6 address with format of
232             "xxx:xxxx::xxxx"
233         :param int src_port: source port number "x"
234         :param int dst_port: destination port number "x"
235         """
236         if src_ip:
237             if sys.version_info[0] == 2:
238                 src_ip = binascii.hexlify(socket.inet_pton(
239                     socket.AF_INET6, src_ip))
240             else:
241                 src_ip = socket.inet_pton(socket.AF_INET6, src_ip).hex()
242
243         if dst_ip:
244             if sys.version_info[0] == 2:
245                 dst_ip = binascii.hexlify(socket.inet_pton(
246                     socket.AF_INET6, dst_ip))
247             else:
248                 dst_ip = socket.inet_pton(socket.AF_INET6, dst_ip).hex()
249
250         return ('{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}'.format(
251             hex(nh)[2:], src_ip, dst_ip, hex(src_port)[2:],
252             hex(dst_port)[2:])).rstrip('0')
253
254     @staticmethod
255     def build_payload_match(matches):
256         payload_match = ''
257
258         for match in matches:
259             sval = str(hex(match.value)[2:])
260             # offset is specified in bytes, convert to hex format.
261             length = (match.offset + match.length) * 2
262
263             format_spec = '{!s:0>' + str(length) + '}'
264             payload_match += format_spec.format(sval)
265
266         return payload_match.rstrip('0')
267
268     def create_stream(self, src_if, dst_if, packet_sizes,
269                       proto_l=UDP(sport=1234, dport=5678),
270                       payload_ex=None):
271         """Create input packet stream for defined interfaces.
272
273         :param VppInterface src_if: Source Interface for packet stream.
274         :param VppInterface dst_if: Destination Interface for packet stream.
275         :param list packet_sizes: packet size to test.
276         :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
277         """
278         pkts = []
279
280         for size in packet_sizes:
281             info = self.create_packet_info(src_if, dst_if)
282             payload = self.info_to_payload(info)
283
284             # append any additional payload after info
285             if payload_ex is not None:
286                 payload += payload_ex
287
288             if self.af == AF_INET:
289                 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
290                      IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) /
291                      proto_l /
292                      Raw(payload))
293             elif self.af == AF_INET6:
294                 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
295                      IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6) /
296                      proto_l /
297                      Raw(payload))
298             info.data = p.copy()
299             self.extend_packet(p, size)
300             pkts.append(p)
301         return pkts
302
303     def verify_capture(self, dst_if, capture, proto_l=UDP):
304         """Verify captured input packet stream for defined interface.
305
306         :param VppInterface dst_if: Interface to verify captured packet stream.
307         :param list capture: Captured packet stream.
308         :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
309         """
310         ip_proto = IP
311         if self.af == AF_INET6:
312             ip_proto = IPv6
313         self.logger.info("Verifying capture on interface %s" % dst_if.name)
314         last_info = dict()
315         for i in self.interfaces:
316             last_info[i.sw_if_index] = None
317         dst_sw_if_index = dst_if.sw_if_index
318         for packet in capture:
319             try:
320                 ip_received = packet[ip_proto]
321                 proto_received = packet[proto_l]
322                 payload_info = self.payload_to_info(packet[Raw])
323                 packet_index = payload_info.index
324                 self.assertEqual(payload_info.dst, dst_sw_if_index)
325                 self.logger.debug(
326                     "Got packet on port %s: src=%u (id=%u)" %
327                     (dst_if.name, payload_info.src, packet_index))
328                 next_info = self.get_next_packet_info_for_interface2(
329                     payload_info.src, dst_sw_if_index,
330                     last_info[payload_info.src])
331                 last_info[payload_info.src] = next_info
332                 self.assertTrue(next_info is not None)
333                 self.assertEqual(packet_index, next_info.index)
334                 saved_packet = next_info.data
335                 ip_saved = saved_packet[ip_proto]
336                 proto_saved = saved_packet[proto_l]
337                 # Check standard fields
338                 self.assertEqual(ip_received.src, ip_saved.src)
339                 self.assertEqual(ip_received.dst, ip_saved.dst)
340                 self.assertEqual(proto_received.sport, proto_saved.sport)
341                 self.assertEqual(proto_received.dport, proto_saved.dport)
342             except BaseException:
343                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
344                 raise
345         for i in self.interfaces:
346             remaining_packet = self.get_next_packet_info_for_interface2(
347                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
348             self.assertTrue(remaining_packet is None,
349                             "Interface %s: Packet expected from interface %s "
350                             "didn't arrive" % (dst_if.name, i.name))
351
352     def create_classify_table(self, key, mask, data_offset=0,
353                               next_table_index=None):
354         """Create Classify Table
355
356         :param str key: key for classify table (ex, ACL name).
357         :param str mask: mask value for interested traffic.
358         :param int data_offset:
359         :param str next_table_index
360         """
361         mask_match, mask_match_len = self._resolve_mask_match(mask)
362         r = self.vapi.classify_add_del_table(
363             is_add=1,
364             mask=mask_match,
365             mask_len=mask_match_len,
366             match_n_vectors=(len(mask) - 1) // 32 + 1,
367             miss_next_index=0,
368             current_data_flag=1,
369             current_data_offset=data_offset,
370             next_table_index=next_table_index)
371         self.assertIsNotNone(r, 'No response msg for add_del_table')
372         self.acl_tbl_idx[key] = r.new_table_index
373
374     def create_classify_session(self, table_index, match, pbr_option=0,
375                                 vrfid=0, is_add=1):
376         """Create Classify Session
377
378         :param int table_index: table index to identify classify table.
379         :param str match: matched value for interested traffic.
380         :param int pbr_option: enable/disable PBR feature.
381         :param int vrfid: VRF id.
382         :param int is_add: option to configure classify session.
383             - create(1) or delete(0)
384         """
385         mask_match, mask_match_len = self._resolve_mask_match(match)
386         r = self.vapi.classify_add_del_session(
387             is_add=is_add,
388             table_index=table_index,
389             match=mask_match,
390             match_len=mask_match_len,
391             opaque_index=0,
392             action=pbr_option,
393             metadata=vrfid)
394         self.assertIsNotNone(r, 'No response msg for add_del_session')
395
396     def input_acl_set_interface(self, intf, table_index, is_add=1):
397         """Configure Input ACL interface
398
399         :param VppInterface intf: Interface to apply Input ACL feature.
400         :param int table_index: table index to identify classify table.
401         :param int is_add: option to configure classify session.
402             - enable(1) or disable(0)
403         """
404         r = None
405         if self.af == AF_INET:
406             r = self.vapi.input_acl_set_interface(
407                 is_add,
408                 intf.sw_if_index,
409                 ip4_table_index=table_index)
410         elif self.af == AF_INET6:
411             r = self.vapi.input_acl_set_interface(
412                 is_add,
413                 intf.sw_if_index,
414                 ip6_table_index=table_index)
415         else:
416             r = self.vapi.input_acl_set_interface(
417                 is_add,
418                 intf.sw_if_index,
419                 l2_table_index=table_index)
420         self.assertIsNotNone(r, 'No response msg for acl_set_interface')
421
422     def output_acl_set_interface(self, intf, table_index, is_add=1):
423         """Configure Output ACL interface
424
425         :param VppInterface intf: Interface to apply Output ACL feature.
426         :param int table_index: table index to identify classify table.
427         :param int is_add: option to configure classify session.
428             - enable(1) or disable(0)
429         """
430         r = None
431         if self.af == AF_INET:
432             r = self.vapi.output_acl_set_interface(
433                 is_add,
434                 intf.sw_if_index,
435                 ip4_table_index=table_index)
436         elif self.af == AF_INET6:
437             r = self.vapi.output_acl_set_interface(
438                 is_add,
439                 intf.sw_if_index,
440                 ip6_table_index=table_index)
441         else:
442             r = self.vapi.output_acl_set_interface(
443                 is_add,
444                 intf.sw_if_index,
445                 l2_table_index=table_index)
446         self.assertIsNotNone(r, 'No response msg for acl_set_interface')
447
448     def config_pbr_fib_entry(self, intf, is_add=1):
449         """Configure fib entry to route traffic toward PBR VRF table
450
451         :param VppInterface intf: destination interface to be routed for PBR.
452
453         """
454         addr_len = 24
455         self.vapi.ip_add_del_route(dst_address=intf.local_ip4,
456                                    dst_address_length=addr_len,
457                                    next_hop_address=intf.remote_ip4,
458                                    table_id=self.pbr_vrfid, is_add=is_add)
459
460     def verify_vrf(self, vrf_id):
461         """
462         Check if the FIB table / VRF ID is configured.
463
464         :param int vrf_id: The FIB table / VRF ID to be verified.
465         :return: 1 if the FIB table / VRF ID is configured, otherwise return 0.
466         """
467         ip_fib_dump = self.vapi.ip_route_dump(vrf_id, False)
468         vrf_count = len(ip_fib_dump)
469         if vrf_count == 0:
470             self.logger.info("IPv4 VRF ID %d is not configured" % vrf_id)
471             return 0
472         else:
473             self.logger.info("IPv4 VRF ID %d is configured" % vrf_id)
474             return 1