7ce69d436f5072084560197d7385e9deceb33d0f
[vpp.git] / test / template_classifier.py
1 #!/usr/bin/env python3
2
3 import binascii
4 import socket
5 from socket import AF_INET, AF_INET6
6 import unittest
7 import sys
8 from dataclasses import dataclass
9
10 from framework import VppTestCase
11
12 from scapy.packet import Raw
13 from scapy.layers.l2 import Ether
14 from scapy.layers.inet import IP, UDP, TCP
15 from scapy.layers.inet6 import IPv6
16 from util import ppp
17
18
19 @dataclass
20 class VarMask:
21     offset: int
22     spec: str
23
24
25 @dataclass
26 class VarMatch:
27     offset: int
28     value: int
29     length: int
30
31
32 class TestClassifier(VppTestCase):
33
34     @staticmethod
35     def _resolve_mask_match(mask_match):
36         mask_match = binascii.unhexlify(mask_match)
37         mask_match_len = ((len(mask_match) - 1) // 16 + 1) * 16
38         mask_match = mask_match + b'\0' * \
39             (mask_match_len - len(mask_match))
40         return mask_match, mask_match_len
41
42     @classmethod
43     def setUpClass(cls):
44         """
45         Perform standard class setup (defined by class method setUpClass in
46         class VppTestCase) before running the test case, set test case related
47         variables and configure VPP.
48         """
49         super(TestClassifier, cls).setUpClass()
50         cls.acl_active_table = ''
51         cls.af = AF_INET
52
53     def setUp(self):
54         """
55         Perform test setup before test case.
56
57         **Config:**
58             - create 4 pg interfaces
59                 - untagged pg0/pg1/pg2 interface
60                     pg0 -------> pg1 (IP ACL)
61                            \
62                             ---> pg2 (MAC ACL))
63                              \
64                               -> pg3 (PBR)
65             - setup interfaces:
66                 - put it into UP state
67                 - set IPv4/6 addresses
68                 - resolve neighbor address using ARP
69
70         :ivar list interfaces: pg interfaces.
71         :ivar list pg_if_packet_sizes: packet sizes in test.
72         :ivar dict acl_tbl_idx: ACL table index.
73         :ivar int pbr_vrfid: VRF id for PBR test.
74         """
75         self.reset_packet_infos()
76         super(TestClassifier, self).setUp()
77         if self.af is None:  # l2_acl test case
78             return
79
80         # create 4 pg interfaces
81         self.create_pg_interfaces(range(4))
82
83         # packet sizes to test
84         self.pg_if_packet_sizes = [64, 9018]
85
86         self.interfaces = list(self.pg_interfaces)
87
88         # ACL & PBR vars
89         self.acl_tbl_idx = {}
90         self.pbr_vrfid = 200
91
92         # setup all interfaces
93         for intf in self.interfaces:
94             intf.admin_up()
95             if self.af == AF_INET:
96                 intf.config_ip4()
97                 intf.resolve_arp()
98             elif self.af == AF_INET6:
99                 intf.config_ip6()
100                 intf.resolve_ndp()
101
102     def tearDown(self):
103         """Run standard test teardown and acl related log."""
104         if self.af is not None and not self.vpp_dead:
105             if self.af == AF_INET:
106                 self.logger.info(self.vapi.ppcli("show inacl type ip4"))
107                 self.logger.info(self.vapi.ppcli("show outacl type ip4"))
108             elif self.af == AF_INET6:
109                 self.logger.info(self.vapi.ppcli("show inacl type ip6"))
110                 self.logger.info(self.vapi.ppcli("show outacl type ip6"))
111
112             self.logger.info(self.vapi.cli("show classify table verbose"))
113             self.logger.info(self.vapi.cli("show ip fib"))
114             self.logger.info(self.vapi.cli("show error"))
115
116             if self.acl_active_table.endswith('out'):
117                 self.output_acl_set_interface(
118                     self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
119             elif self.acl_active_table != '':
120                 self.input_acl_set_interface(
121                     self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
122             self.acl_active_table = ''
123
124             for intf in self.interfaces:
125                 if self.af == AF_INET:
126                     intf.unconfig_ip4()
127                 elif self.af == AF_INET6:
128                     intf.unconfig_ip6()
129                 intf.admin_down()
130
131         super(TestClassifier, self).tearDown()
132
133     @staticmethod
134     def build_mac_match(dst_mac='', src_mac='', ether_type=''):
135         """Build MAC ACL match data with hexstring format.
136
137         :param str dst_mac: source MAC address <x:x:x:x:x:x>
138         :param str src_mac: destination MAC address <x:x:x:x:x:x>
139         :param str ether_type: ethernet type <0-ffff>
140         """
141         if dst_mac:
142             dst_mac = dst_mac.replace(':', '')
143         if src_mac:
144             src_mac = src_mac.replace(':', '')
145
146         return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
147             dst_mac, src_mac, ether_type)).rstrip()
148
149     @staticmethod
150     def build_mac_mask(dst_mac='', src_mac='', ether_type=''):
151         """Build MAC ACL mask data with hexstring format.
152
153         :param str dst_mac: source MAC address <0-ffffffffffff>
154         :param str src_mac: destination MAC address <0-ffffffffffff>
155         :param str ether_type: ethernet type <0-ffff>
156         """
157
158         return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
159             dst_mac, src_mac, ether_type)).rstrip()
160
161     @staticmethod
162     def build_ip_mask(proto='', src_ip='', dst_ip='',
163                       src_port='', dst_port=''):
164         """Build IP ACL mask data with hexstring format.
165
166         :param str proto: protocol number <0-ff>
167         :param str src_ip: source ip address <0-ffffffff>
168         :param str dst_ip: destination ip address <0-ffffffff>
169         :param str src_port: source port number <0-ffff>
170         :param str dst_port: destination port number <0-ffff>
171         """
172
173         return ('{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}'.format(
174             proto, src_ip, dst_ip, src_port, dst_port)).rstrip('0')
175
176     @staticmethod
177     def build_ip6_mask(nh='', src_ip='', dst_ip='',
178                        src_port='', dst_port=''):
179         """Build IPv6 ACL mask data with hexstring format.
180
181         :param str nh: next header number <0-ff>
182         :param str src_ip: source ip address <0-ffffffff>
183         :param str dst_ip: destination ip address <0-ffffffff>
184         :param str src_port: source port number <0-ffff>
185         :param str dst_port: destination port number <0-ffff>
186         """
187
188         return ('{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}'.format(
189             nh, src_ip, dst_ip, src_port, dst_port)).rstrip('0')
190
191     @staticmethod
192     def build_payload_mask(masks):
193         payload_mask = ''
194
195         for mask in masks:
196             # offset is specified in bytes, convert to hex format.
197             length = (mask.offset * 2) + len(mask.spec)
198             format_spec = '{!s:0>' + str(length) + '}'
199             payload_mask += format_spec.format(mask.spec)
200
201         return payload_mask.rstrip('0')
202
203     @staticmethod
204     def build_ip_match(proto=0, src_ip='', dst_ip='',
205                        src_port=0, dst_port=0):
206         """Build IP ACL match data with hexstring format.
207
208         :param int proto: protocol number with valid option "x"
209         :param str src_ip: source ip address with format of "x.x.x.x"
210         :param str dst_ip: destination ip address with format of "x.x.x.x"
211         :param int src_port: source port number "x"
212         :param int dst_port: destination port number "x"
213         """
214         if src_ip:
215             src_ip = binascii.hexlify(socket.inet_aton(src_ip)).decode('ascii')
216         if dst_ip:
217             dst_ip = binascii.hexlify(socket.inet_aton(dst_ip)).decode('ascii')
218
219         return ('{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}'.format(
220             hex(proto)[2:], src_ip, dst_ip, hex(src_port)[2:],
221             hex(dst_port)[2:])).rstrip('0')
222
223     @staticmethod
224     def build_ip6_match(nh=0, src_ip='', dst_ip='',
225                         src_port=0, dst_port=0):
226         """Build IPv6 ACL match data with hexstring format.
227
228         :param int nh: next header number with valid option "x"
229         :param str src_ip: source ip6 address with format of "xxx:xxxx::xxxx"
230         :param str dst_ip: destination ip6 address with format of
231             "xxx:xxxx::xxxx"
232         :param int src_port: source port number "x"
233         :param int dst_port: destination port number "x"
234         """
235         if src_ip:
236             if sys.version_info[0] == 2:
237                 src_ip = binascii.hexlify(socket.inet_pton(
238                     socket.AF_INET6, src_ip))
239             else:
240                 src_ip = socket.inet_pton(socket.AF_INET6, src_ip).hex()
241
242         if dst_ip:
243             if sys.version_info[0] == 2:
244                 dst_ip = binascii.hexlify(socket.inet_pton(
245                     socket.AF_INET6, dst_ip))
246             else:
247                 dst_ip = socket.inet_pton(socket.AF_INET6, dst_ip).hex()
248
249         return ('{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}'.format(
250             hex(nh)[2:], src_ip, dst_ip, hex(src_port)[2:],
251             hex(dst_port)[2:])).rstrip('0')
252
253     @staticmethod
254     def build_payload_match(matches):
255         payload_match = ''
256
257         for match in matches:
258             sval = str(hex(match.value)[2:])
259             # offset is specified in bytes, convert to hex format.
260             length = (match.offset + match.length) * 2
261
262             format_spec = '{!s:0>' + str(length) + '}'
263             payload_match += format_spec.format(sval)
264
265         return payload_match.rstrip('0')
266
267     def create_stream(self, src_if, dst_if, packet_sizes,
268                       proto_l=UDP(sport=1234, dport=5678),
269                       payload_ex=None):
270         """Create input packet stream for defined interfaces.
271
272         :param VppInterface src_if: Source Interface for packet stream.
273         :param VppInterface dst_if: Destination Interface for packet stream.
274         :param list packet_sizes: packet size to test.
275         :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
276         """
277         pkts = []
278
279         for size in packet_sizes:
280             info = self.create_packet_info(src_if, dst_if)
281             payload = self.info_to_payload(info)
282
283             # append any additional payload after info
284             if payload_ex is not None:
285                 payload += payload_ex
286
287             if self.af == AF_INET:
288                 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
289                      IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) /
290                      proto_l /
291                      Raw(payload))
292             elif self.af == AF_INET6:
293                 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
294                      IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6) /
295                      proto_l /
296                      Raw(payload))
297             info.data = p.copy()
298             self.extend_packet(p, size)
299             pkts.append(p)
300         return pkts
301
302     def verify_capture(self, dst_if, capture, proto_l=UDP):
303         """Verify captured input packet stream for defined interface.
304
305         :param VppInterface dst_if: Interface to verify captured packet stream.
306         :param list capture: Captured packet stream.
307         :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
308         """
309         ip_proto = IP
310         if self.af == AF_INET6:
311             ip_proto = IPv6
312         self.logger.info("Verifying capture on interface %s" % dst_if.name)
313         last_info = dict()
314         for i in self.interfaces:
315             last_info[i.sw_if_index] = None
316         dst_sw_if_index = dst_if.sw_if_index
317         for packet in capture:
318             try:
319                 ip_received = packet[ip_proto]
320                 proto_received = packet[proto_l]
321                 payload_info = self.payload_to_info(packet[Raw])
322                 packet_index = payload_info.index
323                 self.assertEqual(payload_info.dst, dst_sw_if_index)
324                 self.logger.debug(
325                     "Got packet on port %s: src=%u (id=%u)" %
326                     (dst_if.name, payload_info.src, packet_index))
327                 next_info = self.get_next_packet_info_for_interface2(
328                     payload_info.src, dst_sw_if_index,
329                     last_info[payload_info.src])
330                 last_info[payload_info.src] = next_info
331                 self.assertTrue(next_info is not None)
332                 self.assertEqual(packet_index, next_info.index)
333                 saved_packet = next_info.data
334                 ip_saved = saved_packet[ip_proto]
335                 proto_saved = saved_packet[proto_l]
336                 # Check standard fields
337                 self.assertEqual(ip_received.src, ip_saved.src)
338                 self.assertEqual(ip_received.dst, ip_saved.dst)
339                 self.assertEqual(proto_received.sport, proto_saved.sport)
340                 self.assertEqual(proto_received.dport, proto_saved.dport)
341             except BaseException:
342                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
343                 raise
344         for i in self.interfaces:
345             remaining_packet = self.get_next_packet_info_for_interface2(
346                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
347             self.assertTrue(remaining_packet is None,
348                             "Interface %s: Packet expected from interface %s "
349                             "didn't arrive" % (dst_if.name, i.name))
350
351     def create_classify_table(self, key, mask, data_offset=0,
352                               next_table_index=None):
353         """Create Classify Table
354
355         :param str key: key for classify table (ex, ACL name).
356         :param str mask: mask value for interested traffic.
357         :param int data_offset:
358         :param str next_table_index
359         """
360         mask_match, mask_match_len = self._resolve_mask_match(mask)
361         r = self.vapi.classify_add_del_table(
362             is_add=1,
363             mask=mask_match,
364             mask_len=mask_match_len,
365             match_n_vectors=(len(mask) - 1) // 32 + 1,
366             miss_next_index=0,
367             current_data_flag=1,
368             current_data_offset=data_offset,
369             next_table_index=next_table_index)
370         self.assertIsNotNone(r, 'No response msg for add_del_table')
371         self.acl_tbl_idx[key] = r.new_table_index
372
373     def create_classify_session(self, table_index, match, pbr_option=0,
374                                 vrfid=0, is_add=1):
375         """Create Classify Session
376
377         :param int table_index: table index to identify classify table.
378         :param str match: matched value for interested traffic.
379         :param int pbr_option: enable/disable PBR feature.
380         :param int vrfid: VRF id.
381         :param int is_add: option to configure classify session.
382             - create(1) or delete(0)
383         """
384         mask_match, mask_match_len = self._resolve_mask_match(match)
385         r = self.vapi.classify_add_del_session(
386             is_add=is_add,
387             table_index=table_index,
388             match=mask_match,
389             match_len=mask_match_len,
390             opaque_index=0,
391             action=pbr_option,
392             metadata=vrfid)
393         self.assertIsNotNone(r, 'No response msg for add_del_session')
394
395     def input_acl_set_interface(self, intf, table_index, is_add=1):
396         """Configure Input ACL interface
397
398         :param VppInterface intf: Interface to apply Input ACL feature.
399         :param int table_index: table index to identify classify table.
400         :param int is_add: option to configure classify session.
401             - enable(1) or disable(0)
402         """
403         r = None
404         if self.af == AF_INET:
405             r = self.vapi.input_acl_set_interface(
406                 is_add,
407                 intf.sw_if_index,
408                 ip4_table_index=table_index)
409         elif self.af == AF_INET6:
410             r = self.vapi.input_acl_set_interface(
411                 is_add,
412                 intf.sw_if_index,
413                 ip6_table_index=table_index)
414         else:
415             r = self.vapi.input_acl_set_interface(
416                 is_add,
417                 intf.sw_if_index,
418                 l2_table_index=table_index)
419         self.assertIsNotNone(r, 'No response msg for acl_set_interface')
420
421     def output_acl_set_interface(self, intf, table_index, is_add=1):
422         """Configure Output ACL interface
423
424         :param VppInterface intf: Interface to apply Output ACL feature.
425         :param int table_index: table index to identify classify table.
426         :param int is_add: option to configure classify session.
427             - enable(1) or disable(0)
428         """
429         r = None
430         if self.af == AF_INET:
431             r = self.vapi.output_acl_set_interface(
432                 is_add,
433                 intf.sw_if_index,
434                 ip4_table_index=table_index)
435         elif self.af == AF_INET6:
436             r = self.vapi.output_acl_set_interface(
437                 is_add,
438                 intf.sw_if_index,
439                 ip6_table_index=table_index)
440         else:
441             r = self.vapi.output_acl_set_interface(
442                 is_add,
443                 intf.sw_if_index,
444                 l2_table_index=table_index)
445         self.assertIsNotNone(r, 'No response msg for acl_set_interface')
446
447     def config_pbr_fib_entry(self, intf, is_add=1):
448         """Configure fib entry to route traffic toward PBR VRF table
449
450         :param VppInterface intf: destination interface to be routed for PBR.
451
452         """
453         addr_len = 24
454         self.vapi.ip_add_del_route(dst_address=intf.local_ip4,
455                                    dst_address_length=addr_len,
456                                    next_hop_address=intf.remote_ip4,
457                                    table_id=self.pbr_vrfid, is_add=is_add)
458
459     def verify_vrf(self, vrf_id):
460         """
461         Check if the FIB table / VRF ID is configured.
462
463         :param int vrf_id: The FIB table / VRF ID to be verified.
464         :return: 1 if the FIB table / VRF ID is configured, otherwise return 0.
465         """
466         ip_fib_dump = self.vapi.ip_route_dump(vrf_id, False)
467         vrf_count = len(ip_fib_dump)
468         if vrf_count == 0:
469             self.logger.info("IPv4 VRF ID %d is not configured" % vrf_id)
470             return 0
471         else:
472             self.logger.info("IPv4 VRF ID %d is configured" % vrf_id)
473             return 1