classify: API cleanup
[vpp.git] / test / template_classifier.py
1 #!/usr/bin/env python
2
3 import binascii
4 import socket
5 from socket import AF_INET, AF_INET6
6 import unittest
7 import sys
8
9 from framework import VppTestCase
10
11 from scapy.packet import Raw
12 from scapy.layers.l2 import Ether
13 from scapy.layers.inet import IP, UDP, TCP
14 from scapy.layers.inet6 import IPv6
15 from util import ppp
16
17
18 class TestClassifier(VppTestCase):
19
20     @staticmethod
21     def _resolve_mask_match(mask_match):
22         mask_match = binascii.unhexlify(mask_match)
23         mask_match_len = ((len(mask_match) - 1) // 16 + 1) * 16
24         mask_match = mask_match + b'\0' * \
25             (mask_match_len - len(mask_match))
26         return mask_match, mask_match_len
27
28     @classmethod
29     def setUpClass(cls):
30         """
31         Perform standard class setup (defined by class method setUpClass in
32         class VppTestCase) before running the test case, set test case related
33         variables and configure VPP.
34         """
35         super(TestClassifier, cls).setUpClass()
36         cls.acl_active_table = ''
37         cls.af = AF_INET
38
39     def setUp(self):
40         """
41         Perform test setup before test case.
42
43         **Config:**
44             - create 4 pg interfaces
45                 - untagged pg0/pg1/pg2 interface
46                     pg0 -------> pg1 (IP ACL)
47                            \
48                             ---> pg2 (MAC ACL))
49                              \
50                               -> pg3 (PBR)
51             - setup interfaces:
52                 - put it into UP state
53                 - set IPv4/6 addresses
54                 - resolve neighbor address using ARP
55
56         :ivar list interfaces: pg interfaces.
57         :ivar list pg_if_packet_sizes: packet sizes in test.
58         :ivar dict acl_tbl_idx: ACL table index.
59         :ivar int pbr_vrfid: VRF id for PBR test.
60         """
61         self.reset_packet_infos()
62         super(TestClassifier, self).setUp()
63         if self.af is None:  # l2_acl test case
64             return
65
66         # create 4 pg interfaces
67         self.create_pg_interfaces(range(4))
68
69         # packet sizes to test
70         self.pg_if_packet_sizes = [64, 9018]
71
72         self.interfaces = list(self.pg_interfaces)
73
74         # ACL & PBR vars
75         self.acl_tbl_idx = {}
76         self.pbr_vrfid = 200
77
78         # setup all interfaces
79         for intf in self.interfaces:
80             intf.admin_up()
81             if self.af == AF_INET:
82                 intf.config_ip4()
83                 intf.resolve_arp()
84             elif self.af == AF_INET6:
85                 intf.config_ip6()
86                 intf.resolve_ndp()
87
88     def tearDown(self):
89         """Run standard test teardown and acl related log."""
90         if self.af is not None and not self.vpp_dead:
91             if self.af == AF_INET:
92                 self.logger.info(self.vapi.ppcli("show inacl type ip4"))
93                 self.logger.info(self.vapi.ppcli("show outacl type ip4"))
94             elif self.af == AF_INET6:
95                 self.logger.info(self.vapi.ppcli("show inacl type ip6"))
96                 self.logger.info(self.vapi.ppcli("show outacl type ip6"))
97
98             self.logger.info(self.vapi.cli("show classify table verbose"))
99             self.logger.info(self.vapi.cli("show ip fib"))
100
101             acl_active_table = 'ip_out'
102             if self.af == AF_INET6:
103                 acl_active_table = 'ip6_out'
104
105             if self.acl_active_table == acl_active_table:
106                 self.output_acl_set_interface(
107                     self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
108                 self.acl_active_table = ''
109             elif self.acl_active_table != '':
110                 self.input_acl_set_interface(
111                     self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
112                 self.acl_active_table = ''
113
114             for intf in self.interfaces:
115                 if self.af == AF_INET:
116                     intf.unconfig_ip4()
117                 elif self.af == AF_INET6:
118                     intf.unconfig_ip6()
119                 intf.admin_down()
120
121         super(TestClassifier, self).tearDown()
122
123     @staticmethod
124     def build_mac_match(dst_mac='', src_mac='', ether_type=''):
125         """Build MAC ACL match data with hexstring format.
126
127         :param str dst_mac: source MAC address <x:x:x:x:x:x>
128         :param str src_mac: destination MAC address <x:x:x:x:x:x>
129         :param str ether_type: ethernet type <0-ffff>
130         """
131         if dst_mac:
132             dst_mac = dst_mac.replace(':', '')
133         if src_mac:
134             src_mac = src_mac.replace(':', '')
135
136         return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
137             dst_mac, src_mac, ether_type)).rstrip('0')
138
139     @staticmethod
140     def build_mac_mask(dst_mac='', src_mac='', ether_type=''):
141         """Build MAC ACL mask data with hexstring format.
142
143         :param str dst_mac: source MAC address <0-ffffffffffff>
144         :param str src_mac: destination MAC address <0-ffffffffffff>
145         :param str ether_type: ethernet type <0-ffff>
146         """
147
148         return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
149             dst_mac, src_mac, ether_type)).rstrip('0')
150
151     @staticmethod
152     def build_ip_mask(proto='', src_ip='', dst_ip='',
153                       src_port='', dst_port=''):
154         """Build IP ACL mask data with hexstring format.
155
156         :param str proto: protocol number <0-ff>
157         :param str src_ip: source ip address <0-ffffffff>
158         :param str dst_ip: destination ip address <0-ffffffff>
159         :param str src_port: source port number <0-ffff>
160         :param str dst_port: destination port number <0-ffff>
161         """
162
163         return ('{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}'.format(
164             proto, src_ip, dst_ip, src_port, dst_port)).rstrip('0')
165
166     @staticmethod
167     def build_ip6_mask(nh='', src_ip='', dst_ip='',
168                        src_port='', dst_port=''):
169         """Build IPv6 ACL mask data with hexstring format.
170
171         :param str nh: next header number <0-ff>
172         :param str src_ip: source ip address <0-ffffffff>
173         :param str dst_ip: destination ip address <0-ffffffff>
174         :param str src_port: source port number <0-ffff>
175         :param str dst_port: destination port number <0-ffff>
176         """
177
178         return ('{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}'.format(
179             nh, src_ip, dst_ip, src_port, dst_port)).rstrip('0')
180
181     @staticmethod
182     def build_ip_match(proto=0, src_ip='', dst_ip='',
183                        src_port=0, dst_port=0):
184         """Build IP ACL match data with hexstring format.
185
186         :param int proto: protocol number with valid option "x"
187         :param str src_ip: source ip address with format of "x.x.x.x"
188         :param str dst_ip: destination ip address with format of "x.x.x.x"
189         :param int src_port: source port number "x"
190         :param int dst_port: destination port number "x"
191         """
192         if src_ip:
193             src_ip = binascii.hexlify(socket.inet_aton(src_ip)).decode('ascii')
194         if dst_ip:
195             dst_ip = binascii.hexlify(socket.inet_aton(dst_ip)).decode('ascii')
196
197         return ('{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}'.format(
198             hex(proto)[2:], src_ip, dst_ip, hex(src_port)[2:],
199             hex(dst_port)[2:])).rstrip('0')
200
201     @staticmethod
202     def build_ip6_match(nh=0, src_ip='', dst_ip='',
203                         src_port=0, dst_port=0):
204         """Build IPv6 ACL match data with hexstring format.
205
206         :param int nh: next header number with valid option "x"
207         :param str src_ip: source ip6 address with format of "xxx:xxxx::xxxx"
208         :param str dst_ip: destination ip6 address with format of
209             "xxx:xxxx::xxxx"
210         :param int src_port: source port number "x"
211         :param int dst_port: destination port number "x"
212         """
213         if src_ip:
214             if sys.version_info[0] == 2:
215                 src_ip = binascii.hexlify(socket.inet_pton(
216                     socket.AF_INET6, src_ip))
217             else:
218                 src_ip = socket.inet_pton(socket.AF_INET6, src_ip).hex()
219
220         if dst_ip:
221             if sys.version_info[0] == 2:
222                 dst_ip = binascii.hexlify(socket.inet_pton(
223                     socket.AF_INET6, dst_ip))
224             else:
225                 dst_ip = socket.inet_pton(socket.AF_INET6, dst_ip).hex()
226
227         return ('{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}'.format(
228             hex(nh)[2:], src_ip, dst_ip, hex(src_port)[2:],
229             hex(dst_port)[2:])).rstrip('0')
230
231     def create_stream(self, src_if, dst_if, packet_sizes,
232                       proto_l=UDP(sport=1234, dport=5678)):
233         """Create input packet stream for defined interfaces.
234
235         :param VppInterface src_if: Source Interface for packet stream.
236         :param VppInterface dst_if: Destination Interface for packet stream.
237         :param list packet_sizes: packet size to test.
238         :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
239         """
240         pkts = []
241
242         for size in packet_sizes:
243             info = self.create_packet_info(src_if, dst_if)
244             payload = self.info_to_payload(info)
245             if self.af == AF_INET:
246                 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
247                      IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) /
248                      proto_l /
249                      Raw(payload))
250             elif self.af == AF_INET6:
251                 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
252                      IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6) /
253                      proto_l /
254                      Raw(payload))
255             info.data = p.copy()
256             self.extend_packet(p, size)
257             pkts.append(p)
258         return pkts
259
260     def verify_capture(self, dst_if, capture, proto_l=UDP):
261         """Verify captured input packet stream for defined interface.
262
263         :param VppInterface dst_if: Interface to verify captured packet stream.
264         :param list capture: Captured packet stream.
265         :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
266         """
267         ip_proto = IP
268         if self.af == AF_INET6:
269             ip_proto = IPv6
270         self.logger.info("Verifying capture on interface %s" % dst_if.name)
271         last_info = dict()
272         for i in self.interfaces:
273             last_info[i.sw_if_index] = None
274         dst_sw_if_index = dst_if.sw_if_index
275         for packet in capture:
276             try:
277                 ip_received = packet[ip_proto]
278                 proto_received = packet[proto_l]
279                 payload_info = self.payload_to_info(packet[Raw])
280                 packet_index = payload_info.index
281                 self.assertEqual(payload_info.dst, dst_sw_if_index)
282                 self.logger.debug(
283                     "Got packet on port %s: src=%u (id=%u)" %
284                     (dst_if.name, payload_info.src, packet_index))
285                 next_info = self.get_next_packet_info_for_interface2(
286                     payload_info.src, dst_sw_if_index,
287                     last_info[payload_info.src])
288                 last_info[payload_info.src] = next_info
289                 self.assertTrue(next_info is not None)
290                 self.assertEqual(packet_index, next_info.index)
291                 saved_packet = next_info.data
292                 ip_saved = saved_packet[ip_proto]
293                 proto_saved = saved_packet[proto_l]
294                 # Check standard fields
295                 self.assertEqual(ip_received.src, ip_saved.src)
296                 self.assertEqual(ip_received.dst, ip_saved.dst)
297                 self.assertEqual(proto_received.sport, proto_saved.sport)
298                 self.assertEqual(proto_received.dport, proto_saved.dport)
299             except BaseException:
300                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
301                 raise
302         for i in self.interfaces:
303             remaining_packet = self.get_next_packet_info_for_interface2(
304                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
305             self.assertTrue(remaining_packet is None,
306                             "Interface %s: Packet expected from interface %s "
307                             "didn't arrive" % (dst_if.name, i.name))
308
309     def create_classify_table(self, key, mask, data_offset=0):
310         """Create Classify Table
311
312         :param str key: key for classify table (ex, ACL name).
313         :param str mask: mask value for interested traffic.
314         :param int data_offset:
315         """
316         mask_match, mask_match_len = self._resolve_mask_match(mask)
317         r = self.vapi.classify_add_del_table(
318             is_add=1,
319             mask=mask_match,
320             mask_len=mask_match_len,
321             match_n_vectors=(len(mask) - 1) // 32 + 1,
322             miss_next_index=0,
323             current_data_flag=1,
324             current_data_offset=data_offset)
325         self.assertIsNotNone(r, 'No response msg for add_del_table')
326         self.acl_tbl_idx[key] = r.new_table_index
327
328     def create_classify_session(self, table_index, match, pbr_option=0,
329                                 vrfid=0, is_add=1):
330         """Create Classify Session
331
332         :param int table_index: table index to identify classify table.
333         :param str match: matched value for interested traffic.
334         :param int pbr_option: enable/disable PBR feature.
335         :param int vrfid: VRF id.
336         :param int is_add: option to configure classify session.
337             - create(1) or delete(0)
338         """
339         mask_match, mask_match_len = self._resolve_mask_match(match)
340         r = self.vapi.classify_add_del_session(
341             is_add=is_add,
342             table_index=table_index,
343             match=mask_match,
344             match_len=mask_match_len,
345             opaque_index=0,
346             action=pbr_option,
347             metadata=vrfid)
348         self.assertIsNotNone(r, 'No response msg for add_del_session')
349
350     def input_acl_set_interface(self, intf, table_index, is_add=1):
351         """Configure Input ACL interface
352
353         :param VppInterface intf: Interface to apply Input ACL feature.
354         :param int table_index: table index to identify classify table.
355         :param int is_add: option to configure classify session.
356             - enable(1) or disable(0)
357         """
358         r = None
359         if self.af == AF_INET:
360             r = self.vapi.input_acl_set_interface(
361                 is_add,
362                 intf.sw_if_index,
363                 ip4_table_index=table_index)
364         elif self.af == AF_INET6:
365             r = self.vapi.input_acl_set_interface(
366                 is_add,
367                 intf.sw_if_index,
368                 ip6_table_index=table_index)
369         else:
370             r = self.vapi.input_acl_set_interface(
371                 is_add,
372                 intf.sw_if_index,
373                 l2_table_index=table_index)
374         self.assertIsNotNone(r, 'No response msg for acl_set_interface')
375
376     def output_acl_set_interface(self, intf, table_index, is_add=1):
377         """Configure Output ACL interface
378
379         :param VppInterface intf: Interface to apply Output ACL feature.
380         :param int table_index: table index to identify classify table.
381         :param int is_add: option to configure classify session.
382             - enable(1) or disable(0)
383         """
384         r = None
385         if self.af == AF_INET:
386             r = self.vapi.output_acl_set_interface(
387                 is_add,
388                 intf.sw_if_index,
389                 ip4_table_index=table_index)
390         elif self.af == AF_INET6:
391             r = self.vapi.output_acl_set_interface(
392                 is_add,
393                 intf.sw_if_index,
394                 ip6_table_index=table_index)
395         else:
396             r = self.vapi.output_acl_set_interface(
397                 is_add,
398                 intf.sw_if_index,
399                 l2_table_index=table_index)
400         self.assertIsNotNone(r, 'No response msg for acl_set_interface')
401
402     def config_pbr_fib_entry(self, intf, is_add=1):
403         """Configure fib entry to route traffic toward PBR VRF table
404
405         :param VppInterface intf: destination interface to be routed for PBR.
406
407         """
408         addr_len = 24
409         self.vapi.ip_add_del_route(dst_address=intf.local_ip4n,
410                                    dst_address_length=addr_len,
411                                    next_hop_address=intf.remote_ip4n,
412                                    table_id=self.pbr_vrfid, is_add=is_add)
413
414     def verify_vrf(self, vrf_id):
415         """
416         Check if the FIB table / VRF ID is configured.
417
418         :param int vrf_id: The FIB table / VRF ID to be verified.
419         :return: 1 if the FIB table / VRF ID is configured, otherwise return 0.
420         """
421         ip_fib_dump = self.vapi.ip_route_dump(vrf_id, False)
422         vrf_count = len(ip_fib_dump)
423         if vrf_count == 0:
424             self.logger.info("IPv4 VRF ID %d is not configured" % vrf_id)
425             return 0
426         else:
427             self.logger.info("IPv4 VRF ID %d is configured" % vrf_id)
428             return 1