VPP-1508 python3 tests: .encode('hex')
[vpp.git] / test / test_classifier.py
1 #!/usr/bin/env python
2
3 import binascii
4 import socket
5 import unittest
6
7 from framework import VppTestCase, VppTestRunner
8
9 from scapy.packet import Raw
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP, UDP, TCP
12 from util import ppp
13
14
15 class TestClassifier(VppTestCase):
16     """ Classifier Test Case """
17
18     @classmethod
19     def setUpClass(cls):
20         """
21         Perform standard class setup (defined by class method setUpClass in
22         class VppTestCase) before running the test case, set test case related
23         variables and configure VPP.
24         """
25         super(TestClassifier, cls).setUpClass()
26         cls.acl_active_table = ''
27
28     def setUp(self):
29         """
30         Perform test setup before test case.
31
32         **Config:**
33             - create 4 pg interfaces
34                 - untagged pg0/pg1/pg2 interface
35                     pg0 -------> pg1 (IP ACL)
36                            \
37                             ---> pg2 (MAC ACL))
38                              \
39                               -> pg3 (PBR)
40             - setup interfaces:
41                 - put it into UP state
42                 - set IPv4 addresses
43                 - resolve neighbor address using ARP
44
45         :ivar list interfaces: pg interfaces.
46         :ivar list pg_if_packet_sizes: packet sizes in test.
47         :ivar dict acl_tbl_idx: ACL table index.
48         :ivar int pbr_vrfid: VRF id for PBR test.
49         """
50         self.reset_packet_infos()
51         super(TestClassifier, self).setUp()
52
53         # create 4 pg interfaces
54         self.create_pg_interfaces(range(4))
55
56         # packet sizes to test
57         self.pg_if_packet_sizes = [64, 9018]
58
59         self.interfaces = list(self.pg_interfaces)
60
61         # ACL & PBR vars
62         self.acl_tbl_idx = {}
63         self.pbr_vrfid = 200
64
65         # setup all interfaces
66         for intf in self.interfaces:
67             intf.admin_up()
68             intf.config_ip4()
69             intf.resolve_arp()
70
71     def tearDown(self):
72         """Run standard test teardown and acl related log."""
73         if not self.vpp_dead:
74             self.logger.info(self.vapi.ppcli("show inacl type ip4"))
75             self.logger.info(self.vapi.ppcli("show outacl type ip4"))
76             self.logger.info(self.vapi.cli("show classify table verbose"))
77             self.logger.info(self.vapi.cli("show ip fib"))
78             if self.acl_active_table == 'ip_out':
79                 self.output_acl_set_interface(
80                     self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
81                 self.acl_active_table = ''
82             elif self.acl_active_table != '':
83                 self.input_acl_set_interface(
84                     self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
85                 self.acl_active_table = ''
86             for intf in self.interfaces:
87                 intf.unconfig_ip4()
88                 intf.admin_down()
89
90         super(TestClassifier, self).tearDown()
91
92     def config_pbr_fib_entry(self, intf, is_add=1):
93         """Configure fib entry to route traffic toward PBR VRF table
94
95         :param VppInterface intf: destination interface to be routed for PBR.
96
97         """
98         addr_len = 24
99         self.vapi.ip_add_del_route(intf.local_ip4n,
100                                    addr_len,
101                                    intf.remote_ip4n,
102                                    table_id=self.pbr_vrfid,
103                                    is_add=is_add)
104
105     def create_stream(self, src_if, dst_if, packet_sizes,
106                       proto_l=UDP(sport=1234, dport=5678)):
107         """Create input packet stream for defined interfaces.
108
109         :param VppInterface src_if: Source Interface for packet stream.
110         :param VppInterface dst_if: Destination Interface for packet stream.
111         :param list packet_sizes: packet size to test.
112         :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
113         """
114         pkts = []
115
116         for size in packet_sizes:
117             info = self.create_packet_info(src_if, dst_if)
118             payload = self.info_to_payload(info)
119             p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
120                  IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) /
121                  proto_l /
122                  Raw(payload))
123             info.data = p.copy()
124             self.extend_packet(p, size)
125             pkts.append(p)
126         return pkts
127
128     def verify_capture(self, dst_if, capture, proto_l=UDP):
129         """Verify captured input packet stream for defined interface.
130
131         :param VppInterface dst_if: Interface to verify captured packet stream.
132         :param list capture: Captured packet stream.
133         :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
134         """
135         self.logger.info("Verifying capture on interface %s" % dst_if.name)
136         last_info = dict()
137         for i in self.interfaces:
138             last_info[i.sw_if_index] = None
139         dst_sw_if_index = dst_if.sw_if_index
140         for packet in capture:
141             try:
142                 ip_received = packet[IP]
143                 proto_received = packet[proto_l]
144                 payload_info = self.payload_to_info(str(packet[Raw]))
145                 packet_index = payload_info.index
146                 self.assertEqual(payload_info.dst, dst_sw_if_index)
147                 self.logger.debug(
148                     "Got packet on port %s: src=%u (id=%u)" %
149                     (dst_if.name, payload_info.src, packet_index))
150                 next_info = self.get_next_packet_info_for_interface2(
151                     payload_info.src, dst_sw_if_index,
152                     last_info[payload_info.src])
153                 last_info[payload_info.src] = next_info
154                 self.assertTrue(next_info is not None)
155                 self.assertEqual(packet_index, next_info.index)
156                 saved_packet = next_info.data
157                 ip_saved = saved_packet[IP]
158                 proto_saved = saved_packet[proto_l]
159                 # Check standard fields
160                 self.assertEqual(ip_received.src, ip_saved.src)
161                 self.assertEqual(ip_received.dst, ip_saved.dst)
162                 self.assertEqual(proto_received.sport, proto_saved.sport)
163                 self.assertEqual(proto_received.dport, proto_saved.dport)
164             except:
165                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
166                 raise
167         for i in self.interfaces:
168             remaining_packet = self.get_next_packet_info_for_interface2(
169                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
170             self.assertTrue(remaining_packet is None,
171                             "Interface %s: Packet expected from interface %s "
172                             "didn't arrive" % (dst_if.name, i.name))
173
174     def verify_vrf(self, vrf_id):
175         """
176         Check if the FIB table / VRF ID is configured.
177
178         :param int vrf_id: The FIB table / VRF ID to be verified.
179         :return: 1 if the FIB table / VRF ID is configured, otherwise return 0.
180         """
181         ip_fib_dump = self.vapi.ip_fib_dump()
182         vrf_count = 0
183         for ip_fib_details in ip_fib_dump:
184             if ip_fib_details[2] == vrf_id:
185                 vrf_count += 1
186         if vrf_count == 0:
187             self.logger.info("IPv4 VRF ID %d is not configured" % vrf_id)
188             return 0
189         else:
190             self.logger.info("IPv4 VRF ID %d is configured" % vrf_id)
191             return 1
192
193     @staticmethod
194     def build_ip_mask(proto='', src_ip='', dst_ip='',
195                       src_port='', dst_port=''):
196         """Build IP ACL mask data with hexstring format.
197
198         :param str proto: protocol number <0-ff>
199         :param str src_ip: source ip address <0-ffffffff>
200         :param str dst_ip: destination ip address <0-ffffffff>
201         :param str src_port: source port number <0-ffff>
202         :param str dst_port: destination port number <0-ffff>
203         """
204
205         return ('{:0>20}{:0>12}{:0>8}{:0>4}{:0>4}'.format(
206             proto, src_ip, dst_ip, src_port, dst_port)).rstrip('0')
207
208     @staticmethod
209     def build_ip_match(proto=0, src_ip='', dst_ip='',
210                        src_port=0, dst_port=0):
211         """Build IP ACL match data with hexstring format.
212
213         :param int proto: protocol number with valid option "x"
214         :param str src_ip: source ip address with format of "x.x.x.x"
215         :param str dst_ip: destination ip address with format of "x.x.x.x"
216         :param int src_port: source port number "x"
217         :param int dst_port: destination port number "x"
218         """
219         if src_ip:
220             src_ip = binascii.hexlify(socket.inet_aton(src_ip))
221         if dst_ip:
222             dst_ip = binascii.hexlify(socket.inet_aton(dst_ip))
223
224         return ('{:0>20}{:0>12}{:0>8}{:0>4}{:0>4}'.format(
225             hex(proto)[2:], src_ip, dst_ip, hex(src_port)[2:],
226             hex(dst_port)[2:])).rstrip('0')
227
228     @staticmethod
229     def build_mac_mask(dst_mac='', src_mac='', ether_type=''):
230         """Build MAC ACL mask data with hexstring format.
231
232         :param str dst_mac: source MAC address <0-ffffffffffff>
233         :param str src_mac: destination MAC address <0-ffffffffffff>
234         :param str ether_type: ethernet type <0-ffff>
235         """
236
237         return ('{:0>12}{:0>12}{:0>4}'.format(dst_mac, src_mac,
238                                               ether_type)).rstrip('0')
239
240     @staticmethod
241     def build_mac_match(dst_mac='', src_mac='', ether_type=''):
242         """Build MAC ACL match data with hexstring format.
243
244         :param str dst_mac: source MAC address <x:x:x:x:x:x>
245         :param str src_mac: destination MAC address <x:x:x:x:x:x>
246         :param str ether_type: ethernet type <0-ffff>
247         """
248         if dst_mac:
249             dst_mac = dst_mac.replace(':', '')
250         if src_mac:
251             src_mac = src_mac.replace(':', '')
252
253         return ('{:0>12}{:0>12}{:0>4}'.format(dst_mac, src_mac,
254                                               ether_type)).rstrip('0')
255
256     def create_classify_table(self, key, mask, data_offset=0):
257         """Create Classify Table
258
259         :param str key: key for classify table (ex, ACL name).
260         :param str mask: mask value for interested traffic.
261         :param int data_offset:
262         """
263         r = self.vapi.classify_add_del_table(
264             is_add=1,
265             mask=binascii.unhexlify(mask),
266             match_n_vectors=(len(mask) - 1) // 32 + 1,
267             miss_next_index=0,
268             current_data_flag=1,
269             current_data_offset=data_offset)
270         self.assertIsNotNone(r, msg='No response msg for add_del_table')
271         self.acl_tbl_idx[key] = r.new_table_index
272
273     def create_classify_session(self, table_index, match, pbr_option=0,
274                                 vrfid=0, is_add=1):
275         """Create Classify Session
276
277         :param int table_index: table index to identify classify table.
278         :param str match: matched value for interested traffic.
279         :param int pbr_option: enable/disable PBR feature.
280         :param int vrfid: VRF id.
281         :param int is_add: option to configure classify session.
282             - create(1) or delete(0)
283         """
284         r = self.vapi.classify_add_del_session(
285             is_add,
286             table_index,
287             binascii.unhexlify(match),
288             opaque_index=0,
289             action=pbr_option,
290             metadata=vrfid)
291         self.assertIsNotNone(r, msg='No response msg for add_del_session')
292
293     def input_acl_set_interface(self, intf, table_index, is_add=1):
294         """Configure Input ACL interface
295
296         :param VppInterface intf: Interface to apply Input ACL feature.
297         :param int table_index: table index to identify classify table.
298         :param int is_add: option to configure classify session.
299             - enable(1) or disable(0)
300         """
301         r = self.vapi.input_acl_set_interface(
302             is_add,
303             intf.sw_if_index,
304             ip4_table_index=table_index)
305         self.assertIsNotNone(r, msg='No response msg for acl_set_interface')
306
307     def output_acl_set_interface(self, intf, table_index, is_add=1):
308         """Configure Output ACL interface
309
310         :param VppInterface intf: Interface to apply Output ACL feature.
311         :param int table_index: table index to identify classify table.
312         :param int is_add: option to configure classify session.
313             - enable(1) or disable(0)
314         """
315         r = self.vapi.output_acl_set_interface(
316             is_add,
317             intf.sw_if_index,
318             ip4_table_index=table_index)
319         self.assertIsNotNone(r, msg='No response msg for acl_set_interface')
320
321
322 # Tests split to different test case classes because of issue reported in
323 # ticket VPP-1336
324 class TestClassifierIP(TestClassifier):
325     """ Classifier IP Test Case """
326
327     def test_iacl_src_ip(self):
328         """ Source IP iACL test
329
330         Test scenario for basic IP ACL with source IP
331             - Create IPv4 stream for pg0 -> pg1 interface.
332             - Create iACL with source IP address.
333             - Send and verify received packets on pg1 interface.
334         """
335
336         # Basic iACL testing with source IP
337         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
338         self.pg0.add_stream(pkts)
339
340         key = 'ip_src'
341         self.create_classify_table(key, self.build_ip_mask(src_ip='ffffffff'))
342         self.create_classify_session(
343             self.acl_tbl_idx.get(key),
344             self.build_ip_match(src_ip=self.pg0.remote_ip4))
345         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
346         self.acl_active_table = key
347
348         self.pg_enable_capture(self.pg_interfaces)
349         self.pg_start()
350
351         pkts = self.pg1.get_capture(len(pkts))
352         self.verify_capture(self.pg1, pkts)
353         self.pg0.assert_nothing_captured(remark="packets forwarded")
354         self.pg2.assert_nothing_captured(remark="packets forwarded")
355         self.pg3.assert_nothing_captured(remark="packets forwarded")
356
357     def test_iacl_dst_ip(self):
358         """ Destination IP iACL test
359
360         Test scenario for basic IP ACL with destination IP
361             - Create IPv4 stream for pg0 -> pg1 interface.
362             - Create iACL with destination IP address.
363             - Send and verify received packets on pg1 interface.
364         """
365
366         # Basic iACL testing with destination IP
367         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
368         self.pg0.add_stream(pkts)
369
370         key = 'ip_dst'
371         self.create_classify_table(key, self.build_ip_mask(dst_ip='ffffffff'))
372         self.create_classify_session(
373             self.acl_tbl_idx.get(key),
374             self.build_ip_match(dst_ip=self.pg1.remote_ip4))
375         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
376         self.acl_active_table = key
377
378         self.pg_enable_capture(self.pg_interfaces)
379         self.pg_start()
380
381         pkts = self.pg1.get_capture(len(pkts))
382         self.verify_capture(self.pg1, pkts)
383         self.pg0.assert_nothing_captured(remark="packets forwarded")
384         self.pg2.assert_nothing_captured(remark="packets forwarded")
385         self.pg3.assert_nothing_captured(remark="packets forwarded")
386
387     def test_iacl_src_dst_ip(self):
388         """ Source and destination IP iACL test
389
390         Test scenario for basic IP ACL with source and destination IP
391             - Create IPv4 stream for pg0 -> pg1 interface.
392             - Create iACL with source and destination IP addresses.
393             - Send and verify received packets on pg1 interface.
394         """
395
396         # Basic iACL testing with source and destination IP
397         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
398         self.pg0.add_stream(pkts)
399
400         key = 'ip'
401         self.create_classify_table(
402             key, self.build_ip_mask(src_ip='ffffffff', dst_ip='ffffffff'))
403         self.create_classify_session(
404             self.acl_tbl_idx.get(key),
405             self.build_ip_match(src_ip=self.pg0.remote_ip4,
406                                 dst_ip=self.pg1.remote_ip4))
407         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
408         self.acl_active_table = key
409
410         self.pg_enable_capture(self.pg_interfaces)
411         self.pg_start()
412
413         pkts = self.pg1.get_capture(len(pkts))
414         self.verify_capture(self.pg1, pkts)
415         self.pg0.assert_nothing_captured(remark="packets forwarded")
416         self.pg2.assert_nothing_captured(remark="packets forwarded")
417         self.pg3.assert_nothing_captured(remark="packets forwarded")
418
419
420 class TestClassifierUDP(TestClassifier):
421     """ Classifier UDP proto Test Case """
422
423     def test_iacl_proto_udp(self):
424         """ UDP protocol iACL test
425
426         Test scenario for basic protocol ACL with UDP protocol
427             - Create IPv4 stream for pg0 -> pg1 interface.
428             - Create iACL with UDP IP protocol.
429             - Send and verify received packets on pg1 interface.
430         """
431
432         # Basic iACL testing with UDP protocol
433         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
434         self.pg0.add_stream(pkts)
435
436         key = 'proto_udp'
437         self.create_classify_table(key, self.build_ip_mask(proto='ff'))
438         self.create_classify_session(
439             self.acl_tbl_idx.get(key),
440             self.build_ip_match(proto=socket.IPPROTO_UDP))
441         self.input_acl_set_interface(
442             self.pg0, self.acl_tbl_idx.get(key))
443         self.acl_active_table = key
444
445         self.pg_enable_capture(self.pg_interfaces)
446         self.pg_start()
447
448         pkts = self.pg1.get_capture(len(pkts))
449         self.verify_capture(self.pg1, pkts)
450         self.pg0.assert_nothing_captured(remark="packets forwarded")
451         self.pg2.assert_nothing_captured(remark="packets forwarded")
452         self.pg3.assert_nothing_captured(remark="packets forwarded")
453
454     def test_iacl_proto_udp_sport(self):
455         """ UDP source port iACL test
456
457         Test scenario for basic protocol ACL with UDP and sport
458             - Create IPv4 stream for pg0 -> pg1 interface.
459             - Create iACL with UDP IP protocol and defined sport.
460             - Send and verify received packets on pg1 interface.
461         """
462
463         # Basic iACL testing with UDP and sport
464         sport = 38
465         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
466                                   UDP(sport=sport, dport=5678))
467         self.pg0.add_stream(pkts)
468
469         key = 'proto_udp_sport'
470         self.create_classify_table(
471             key, self.build_ip_mask(proto='ff', src_port='ffff'))
472         self.create_classify_session(
473             self.acl_tbl_idx.get(key),
474             self.build_ip_match(proto=socket.IPPROTO_UDP, src_port=sport))
475         self.input_acl_set_interface(
476             self.pg0, self.acl_tbl_idx.get(key))
477         self.acl_active_table = key
478
479         self.pg_enable_capture(self.pg_interfaces)
480         self.pg_start()
481
482         pkts = self.pg1.get_capture(len(pkts))
483         self.verify_capture(self.pg1, pkts)
484         self.pg0.assert_nothing_captured(remark="packets forwarded")
485         self.pg2.assert_nothing_captured(remark="packets forwarded")
486         self.pg3.assert_nothing_captured(remark="packets forwarded")
487
488     def test_iacl_proto_udp_dport(self):
489         """ UDP destination port iACL test
490
491         Test scenario for basic protocol ACL with UDP and dport
492             - Create IPv4 stream for pg0 -> pg1 interface.
493             - Create iACL with UDP IP protocol and defined dport.
494             - Send and verify received packets on pg1 interface.
495         """
496
497         # Basic iACL testing with UDP and dport
498         dport = 427
499         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
500                                   UDP(sport=1234, dport=dport))
501         self.pg0.add_stream(pkts)
502
503         key = 'proto_udp_dport'
504         self.create_classify_table(
505             key, self.build_ip_mask(proto='ff', dst_port='ffff'))
506         self.create_classify_session(
507             self.acl_tbl_idx.get(key),
508             self.build_ip_match(proto=socket.IPPROTO_UDP, dst_port=dport))
509         self.input_acl_set_interface(
510             self.pg0, self.acl_tbl_idx.get(key))
511         self.acl_active_table = key
512
513         self.pg_enable_capture(self.pg_interfaces)
514         self.pg_start()
515
516         pkts = self.pg1.get_capture(len(pkts))
517         self.verify_capture(self.pg1, pkts)
518         self.pg0.assert_nothing_captured(remark="packets forwarded")
519         self.pg2.assert_nothing_captured(remark="packets forwarded")
520         self.pg3.assert_nothing_captured(remark="packets forwarded")
521
522     def test_iacl_proto_udp_sport_dport(self):
523         """ UDP source and destination ports iACL test
524
525         Test scenario for basic protocol ACL with UDP and sport and dport
526             - Create IPv4 stream for pg0 -> pg1 interface.
527             - Create iACL with UDP IP protocol and defined sport and dport.
528             - Send and verify received packets on pg1 interface.
529         """
530
531         # Basic iACL testing with UDP and sport and dport
532         sport = 13720
533         dport = 9080
534         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
535                                   UDP(sport=sport, dport=dport))
536         self.pg0.add_stream(pkts)
537
538         key = 'proto_udp_ports'
539         self.create_classify_table(
540             key,
541             self.build_ip_mask(proto='ff', src_port='ffff', dst_port='ffff'))
542         self.create_classify_session(
543             self.acl_tbl_idx.get(key),
544             self.build_ip_match(proto=socket.IPPROTO_UDP, src_port=sport,
545                                 dst_port=dport))
546         self.input_acl_set_interface(
547             self.pg0, self.acl_tbl_idx.get(key))
548         self.acl_active_table = key
549
550         self.pg_enable_capture(self.pg_interfaces)
551         self.pg_start()
552
553         pkts = self.pg1.get_capture(len(pkts))
554         self.verify_capture(self.pg1, pkts)
555         self.pg0.assert_nothing_captured(remark="packets forwarded")
556         self.pg2.assert_nothing_captured(remark="packets forwarded")
557         self.pg3.assert_nothing_captured(remark="packets forwarded")
558
559
560 class TestClassifierTCP(TestClassifier):
561     """ Classifier TCP proto Test Case """
562
563     def test_iacl_proto_tcp(self):
564         """ TCP protocol iACL test
565
566         Test scenario for basic protocol ACL with TCP protocol
567             - Create IPv4 stream for pg0 -> pg1 interface.
568             - Create iACL with TCP IP protocol.
569             - Send and verify received packets on pg1 interface.
570         """
571
572         # Basic iACL testing with TCP protocol
573         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
574                                   TCP(sport=1234, dport=5678))
575         self.pg0.add_stream(pkts)
576
577         key = 'proto_tcp'
578         self.create_classify_table(key, self.build_ip_mask(proto='ff'))
579         self.create_classify_session(
580             self.acl_tbl_idx.get(key),
581             self.build_ip_match(proto=socket.IPPROTO_TCP))
582         self.input_acl_set_interface(
583             self.pg0, self.acl_tbl_idx.get(key))
584         self.acl_active_table = key
585
586         self.pg_enable_capture(self.pg_interfaces)
587         self.pg_start()
588
589         pkts = self.pg1.get_capture(len(pkts))
590         self.verify_capture(self.pg1, pkts, TCP)
591         self.pg0.assert_nothing_captured(remark="packets forwarded")
592         self.pg2.assert_nothing_captured(remark="packets forwarded")
593         self.pg3.assert_nothing_captured(remark="packets forwarded")
594
595     def test_iacl_proto_tcp_sport(self):
596         """ TCP source port iACL test
597
598         Test scenario for basic protocol ACL with TCP and sport
599             - Create IPv4 stream for pg0 -> pg1 interface.
600             - Create iACL with TCP IP protocol and defined sport.
601             - Send and verify received packets on pg1 interface.
602         """
603
604         # Basic iACL testing with TCP and sport
605         sport = 38
606         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
607                                   TCP(sport=sport, dport=5678))
608         self.pg0.add_stream(pkts)
609
610         key = 'proto_tcp_sport'
611         self.create_classify_table(
612             key, self.build_ip_mask(proto='ff', src_port='ffff'))
613         self.create_classify_session(
614             self.acl_tbl_idx.get(key),
615             self.build_ip_match(proto=socket.IPPROTO_TCP, src_port=sport))
616         self.input_acl_set_interface(
617             self.pg0, self.acl_tbl_idx.get(key))
618         self.acl_active_table = key
619
620         self.pg_enable_capture(self.pg_interfaces)
621         self.pg_start()
622
623         pkts = self.pg1.get_capture(len(pkts))
624         self.verify_capture(self.pg1, pkts, TCP)
625         self.pg0.assert_nothing_captured(remark="packets forwarded")
626         self.pg2.assert_nothing_captured(remark="packets forwarded")
627         self.pg3.assert_nothing_captured(remark="packets forwarded")
628
629     def test_iacl_proto_tcp_dport(self):
630         """ TCP destination port iACL test
631
632         Test scenario for basic protocol ACL with TCP and dport
633             - Create IPv4 stream for pg0 -> pg1 interface.
634             - Create iACL with TCP IP protocol and defined dport.
635             - Send and verify received packets on pg1 interface.
636         """
637
638         # Basic iACL testing with TCP and dport
639         dport = 427
640         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
641                                   TCP(sport=1234, dport=dport))
642         self.pg0.add_stream(pkts)
643
644         key = 'proto_tcp_sport'
645         self.create_classify_table(
646             key, self.build_ip_mask(proto='ff', dst_port='ffff'))
647         self.create_classify_session(
648             self.acl_tbl_idx.get(key),
649             self.build_ip_match(proto=socket.IPPROTO_TCP, dst_port=dport))
650         self.input_acl_set_interface(
651             self.pg0, self.acl_tbl_idx.get(key))
652         self.acl_active_table = key
653
654         self.pg_enable_capture(self.pg_interfaces)
655         self.pg_start()
656
657         pkts = self.pg1.get_capture(len(pkts))
658         self.verify_capture(self.pg1, pkts, TCP)
659         self.pg0.assert_nothing_captured(remark="packets forwarded")
660         self.pg2.assert_nothing_captured(remark="packets forwarded")
661         self.pg3.assert_nothing_captured(remark="packets forwarded")
662
663     def test_iacl_proto_tcp_sport_dport(self):
664         """ TCP source and destination ports iACL test
665
666         Test scenario for basic protocol ACL with TCP and sport and dport
667             - Create IPv4 stream for pg0 -> pg1 interface.
668             - Create iACL with TCP IP protocol and defined sport and dport.
669             - Send and verify received packets on pg1 interface.
670         """
671
672         # Basic iACL testing with TCP and sport and dport
673         sport = 13720
674         dport = 9080
675         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
676                                   TCP(sport=sport, dport=dport))
677         self.pg0.add_stream(pkts)
678
679         key = 'proto_tcp_ports'
680         self.create_classify_table(
681             key,
682             self.build_ip_mask(proto='ff', src_port='ffff', dst_port='ffff'))
683         self.create_classify_session(
684             self.acl_tbl_idx.get(key),
685             self.build_ip_match(proto=socket.IPPROTO_TCP, src_port=sport,
686                                 dst_port=dport))
687         self.input_acl_set_interface(
688             self.pg0, self.acl_tbl_idx.get(key))
689         self.acl_active_table = key
690
691         self.pg_enable_capture(self.pg_interfaces)
692         self.pg_start()
693
694         pkts = self.pg1.get_capture(len(pkts))
695         self.verify_capture(self.pg1, pkts, TCP)
696         self.pg0.assert_nothing_captured(remark="packets forwarded")
697         self.pg2.assert_nothing_captured(remark="packets forwarded")
698         self.pg3.assert_nothing_captured(remark="packets forwarded")
699
700
701 class TestClassifierIPOut(TestClassifier):
702     """ Classifier output IP Test Case """
703
704     def test_acl_ip_out(self):
705         """ Output IP ACL test
706
707         Test scenario for basic IP ACL with source IP
708             - Create IPv4 stream for pg1 -> pg0 interface.
709             - Create ACL with source IP address.
710             - Send and verify received packets on pg0 interface.
711         """
712
713         # Basic oACL testing with source IP
714         pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes)
715         self.pg1.add_stream(pkts)
716
717         key = 'ip_out'
718         self.create_classify_table(
719             key, self.build_ip_mask(src_ip='ffffffff'), data_offset=0)
720         self.create_classify_session(
721             self.acl_tbl_idx.get(key),
722             self.build_ip_match(src_ip=self.pg1.remote_ip4))
723         self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
724         self.acl_active_table = key
725
726         self.pg_enable_capture(self.pg_interfaces)
727         self.pg_start()
728
729         pkts = self.pg0.get_capture(len(pkts))
730         self.verify_capture(self.pg0, pkts)
731         self.pg1.assert_nothing_captured(remark="packets forwarded")
732         self.pg2.assert_nothing_captured(remark="packets forwarded")
733         self.pg3.assert_nothing_captured(remark="packets forwarded")
734
735
736 class TestClassifierMAC(TestClassifier):
737     """ Classifier MAC Test Case """
738
739     def test_acl_mac(self):
740         """ MAC ACL test
741
742         Test scenario for basic MAC ACL with source MAC
743             - Create IPv4 stream for pg0 -> pg2 interface.
744             - Create ACL with source MAC address.
745             - Send and verify received packets on pg2 interface.
746         """
747
748         # Basic iACL testing with source MAC
749         pkts = self.create_stream(self.pg0, self.pg2, self.pg_if_packet_sizes)
750         self.pg0.add_stream(pkts)
751
752         key = 'mac'
753         self.create_classify_table(
754             key, self.build_mac_mask(src_mac='ffffffffffff'), data_offset=-14)
755         self.create_classify_session(
756             self.acl_tbl_idx.get(key),
757             self.build_mac_match(src_mac=self.pg0.remote_mac))
758         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
759         self.acl_active_table = key
760
761         self.pg_enable_capture(self.pg_interfaces)
762         self.pg_start()
763
764         pkts = self.pg2.get_capture(len(pkts))
765         self.verify_capture(self.pg2, pkts)
766         self.pg0.assert_nothing_captured(remark="packets forwarded")
767         self.pg1.assert_nothing_captured(remark="packets forwarded")
768         self.pg3.assert_nothing_captured(remark="packets forwarded")
769
770
771 class TestClassifierPBR(TestClassifier):
772     """ Classifier PBR Test Case """
773
774     def test_acl_pbr(self):
775         """ IP PBR test
776
777         Test scenario for PBR with source IP
778             - Create IPv4 stream for pg0 -> pg3 interface.
779             - Configure PBR fib entry for packet forwarding.
780             - Send and verify received packets on pg3 interface.
781         """
782
783         # PBR testing with source IP
784         pkts = self.create_stream(self.pg0, self.pg3, self.pg_if_packet_sizes)
785         self.pg0.add_stream(pkts)
786
787         key = 'pbr'
788         self.create_classify_table(key, self.build_ip_mask(src_ip='ffffffff'))
789         pbr_option = 1
790         # this will create the VRF/table in which we will insert the route
791         self.create_classify_session(
792             self.acl_tbl_idx.get(key),
793             self.build_ip_match(src_ip=self.pg0.remote_ip4),
794             pbr_option, self.pbr_vrfid)
795         self.assertTrue(self.verify_vrf(self.pbr_vrfid))
796         self.config_pbr_fib_entry(self.pg3)
797         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
798
799         self.pg_enable_capture(self.pg_interfaces)
800         self.pg_start()
801
802         pkts = self.pg3.get_capture(len(pkts))
803         self.verify_capture(self.pg3, pkts)
804         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key), 0)
805         self.pg0.assert_nothing_captured(remark="packets forwarded")
806         self.pg1.assert_nothing_captured(remark="packets forwarded")
807         self.pg2.assert_nothing_captured(remark="packets forwarded")
808
809         # remove the classify session and the route
810         self.config_pbr_fib_entry(self.pg3, is_add=0)
811         self.create_classify_session(
812             self.acl_tbl_idx.get(key),
813             self.build_ip_match(src_ip=self.pg0.remote_ip4),
814             pbr_option, self.pbr_vrfid, is_add=0)
815
816         # and the table should be gone.
817         self.assertFalse(self.verify_vrf(self.pbr_vrfid))
818
819 if __name__ == '__main__':
820     unittest.main(testRunner=VppTestRunner)