vpp_papi_provider: Remove more wrapper functions.
[vpp.git] / test / test_classifier.py
1 #!/usr/bin/env python
2
3 import binascii
4 import socket
5 import unittest
6
7 from framework import VppTestCase, VppTestRunner
8
9 from scapy.packet import Raw
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP, UDP, TCP
12 from util import ppp
13
14
15 class TestClassifier(VppTestCase):
16     """ Classifier Test Case """
17
18     @classmethod
19     def setUpClass(cls):
20         """
21         Perform standard class setup (defined by class method setUpClass in
22         class VppTestCase) before running the test case, set test case related
23         variables and configure VPP.
24         """
25         super(TestClassifier, cls).setUpClass()
26         cls.acl_active_table = ''
27
28     def setUp(self):
29         """
30         Perform test setup before test case.
31
32         **Config:**
33             - create 4 pg interfaces
34                 - untagged pg0/pg1/pg2 interface
35                     pg0 -------> pg1 (IP ACL)
36                            \
37                             ---> pg2 (MAC ACL))
38                              \
39                               -> pg3 (PBR)
40             - setup interfaces:
41                 - put it into UP state
42                 - set IPv4 addresses
43                 - resolve neighbor address using ARP
44
45         :ivar list interfaces: pg interfaces.
46         :ivar list pg_if_packet_sizes: packet sizes in test.
47         :ivar dict acl_tbl_idx: ACL table index.
48         :ivar int pbr_vrfid: VRF id for PBR test.
49         """
50         self.reset_packet_infos()
51         super(TestClassifier, self).setUp()
52
53         # create 4 pg interfaces
54         self.create_pg_interfaces(range(4))
55
56         # packet sizes to test
57         self.pg_if_packet_sizes = [64, 9018]
58
59         self.interfaces = list(self.pg_interfaces)
60
61         # ACL & PBR vars
62         self.acl_tbl_idx = {}
63         self.pbr_vrfid = 200
64
65         # setup all interfaces
66         for intf in self.interfaces:
67             intf.admin_up()
68             intf.config_ip4()
69             intf.resolve_arp()
70
71     def tearDown(self):
72         """Run standard test teardown and acl related log."""
73         if not self.vpp_dead:
74             self.logger.info(self.vapi.ppcli("show inacl type ip4"))
75             self.logger.info(self.vapi.ppcli("show outacl type ip4"))
76             self.logger.info(self.vapi.cli("show classify table verbose"))
77             self.logger.info(self.vapi.cli("show ip fib"))
78             if self.acl_active_table == 'ip_out':
79                 self.output_acl_set_interface(
80                     self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
81                 self.acl_active_table = ''
82             elif self.acl_active_table != '':
83                 self.input_acl_set_interface(
84                     self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
85                 self.acl_active_table = ''
86             for intf in self.interfaces:
87                 intf.unconfig_ip4()
88                 intf.admin_down()
89
90         super(TestClassifier, self).tearDown()
91
92     def config_pbr_fib_entry(self, intf, is_add=1):
93         """Configure fib entry to route traffic toward PBR VRF table
94
95         :param VppInterface intf: destination interface to be routed for PBR.
96
97         """
98         addr_len = 24
99         self.vapi.ip_add_del_route(dst_address=intf.local_ip4n,
100                                    dst_address_length=addr_len,
101                                    next_hop_address=intf.remote_ip4n,
102                                    table_id=self.pbr_vrfid, is_add=is_add)
103
104     def create_stream(self, src_if, dst_if, packet_sizes,
105                       proto_l=UDP(sport=1234, dport=5678)):
106         """Create input packet stream for defined interfaces.
107
108         :param VppInterface src_if: Source Interface for packet stream.
109         :param VppInterface dst_if: Destination Interface for packet stream.
110         :param list packet_sizes: packet size to test.
111         :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
112         """
113         pkts = []
114
115         for size in packet_sizes:
116             info = self.create_packet_info(src_if, dst_if)
117             payload = self.info_to_payload(info)
118             p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
119                  IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) /
120                  proto_l /
121                  Raw(payload))
122             info.data = p.copy()
123             self.extend_packet(p, size)
124             pkts.append(p)
125         return pkts
126
127     def verify_capture(self, dst_if, capture, proto_l=UDP):
128         """Verify captured input packet stream for defined interface.
129
130         :param VppInterface dst_if: Interface to verify captured packet stream.
131         :param list capture: Captured packet stream.
132         :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
133         """
134         self.logger.info("Verifying capture on interface %s" % dst_if.name)
135         last_info = dict()
136         for i in self.interfaces:
137             last_info[i.sw_if_index] = None
138         dst_sw_if_index = dst_if.sw_if_index
139         for packet in capture:
140             try:
141                 ip_received = packet[IP]
142                 proto_received = packet[proto_l]
143                 payload_info = self.payload_to_info(packet[Raw])
144                 packet_index = payload_info.index
145                 self.assertEqual(payload_info.dst, dst_sw_if_index)
146                 self.logger.debug(
147                     "Got packet on port %s: src=%u (id=%u)" %
148                     (dst_if.name, payload_info.src, packet_index))
149                 next_info = self.get_next_packet_info_for_interface2(
150                     payload_info.src, dst_sw_if_index,
151                     last_info[payload_info.src])
152                 last_info[payload_info.src] = next_info
153                 self.assertTrue(next_info is not None)
154                 self.assertEqual(packet_index, next_info.index)
155                 saved_packet = next_info.data
156                 ip_saved = saved_packet[IP]
157                 proto_saved = saved_packet[proto_l]
158                 # Check standard fields
159                 self.assertEqual(ip_received.src, ip_saved.src)
160                 self.assertEqual(ip_received.dst, ip_saved.dst)
161                 self.assertEqual(proto_received.sport, proto_saved.sport)
162                 self.assertEqual(proto_received.dport, proto_saved.dport)
163             except:
164                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
165                 raise
166         for i in self.interfaces:
167             remaining_packet = self.get_next_packet_info_for_interface2(
168                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
169             self.assertTrue(remaining_packet is None,
170                             "Interface %s: Packet expected from interface %s "
171                             "didn't arrive" % (dst_if.name, i.name))
172
173     def verify_vrf(self, vrf_id):
174         """
175         Check if the FIB table / VRF ID is configured.
176
177         :param int vrf_id: The FIB table / VRF ID to be verified.
178         :return: 1 if the FIB table / VRF ID is configured, otherwise return 0.
179         """
180         ip_fib_dump = self.vapi.ip_fib_dump()
181         vrf_count = 0
182         for ip_fib_details in ip_fib_dump:
183             if ip_fib_details[2] == vrf_id:
184                 vrf_count += 1
185         if vrf_count == 0:
186             self.logger.info("IPv4 VRF ID %d is not configured" % vrf_id)
187             return 0
188         else:
189             self.logger.info("IPv4 VRF ID %d is configured" % vrf_id)
190             return 1
191
192     @staticmethod
193     def build_ip_mask(proto='', src_ip='', dst_ip='',
194                       src_port='', dst_port=''):
195         """Build IP ACL mask data with hexstring format.
196
197         :param str proto: protocol number <0-ff>
198         :param str src_ip: source ip address <0-ffffffff>
199         :param str dst_ip: destination ip address <0-ffffffff>
200         :param str src_port: source port number <0-ffff>
201         :param str dst_port: destination port number <0-ffff>
202         """
203
204         return ('{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}'.format(
205             proto, src_ip, dst_ip, src_port, dst_port)).rstrip('0')
206
207     @staticmethod
208     def build_ip_match(proto=0, src_ip='', dst_ip='',
209                        src_port=0, dst_port=0):
210         """Build IP ACL match data with hexstring format.
211
212         :param int proto: protocol number with valid option "x"
213         :param str src_ip: source ip address with format of "x.x.x.x"
214         :param str dst_ip: destination ip address with format of "x.x.x.x"
215         :param int src_port: source port number "x"
216         :param int dst_port: destination port number "x"
217         """
218         if src_ip:
219             src_ip = binascii.hexlify(socket.inet_aton(src_ip))
220         if dst_ip:
221             dst_ip = binascii.hexlify(socket.inet_aton(dst_ip))
222
223         return ('{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}'.format(
224             hex(proto)[2:], src_ip, dst_ip, hex(src_port)[2:],
225             hex(dst_port)[2:])).rstrip('0')
226
227     @staticmethod
228     def build_mac_mask(dst_mac='', src_mac='', ether_type=''):
229         """Build MAC ACL mask data with hexstring format.
230
231         :param str dst_mac: source MAC address <0-ffffffffffff>
232         :param str src_mac: destination MAC address <0-ffffffffffff>
233         :param str ether_type: ethernet type <0-ffff>
234         """
235
236         return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
237             dst_mac, src_mac, ether_type)).rstrip('0')
238
239     @staticmethod
240     def build_mac_match(dst_mac='', src_mac='', ether_type=''):
241         """Build MAC ACL match data with hexstring format.
242
243         :param str dst_mac: source MAC address <x:x:x:x:x:x>
244         :param str src_mac: destination MAC address <x:x:x:x:x:x>
245         :param str ether_type: ethernet type <0-ffff>
246         """
247         if dst_mac:
248             dst_mac = dst_mac.replace(':', '')
249         if src_mac:
250             src_mac = src_mac.replace(':', '')
251
252         return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
253             dst_mac, src_mac, ether_type)).rstrip('0')
254
255     def create_classify_table(self, key, mask, data_offset=0):
256         """Create Classify Table
257
258         :param str key: key for classify table (ex, ACL name).
259         :param str mask: mask value for interested traffic.
260         :param int data_offset:
261         """
262         r = self.vapi.classify_add_del_table(
263             is_add=1,
264             mask=binascii.unhexlify(mask),
265             match_n_vectors=(len(mask) - 1) // 32 + 1,
266             miss_next_index=0,
267             current_data_flag=1,
268             current_data_offset=data_offset)
269         self.assertIsNotNone(r, 'No response msg for add_del_table')
270         self.acl_tbl_idx[key] = r.new_table_index
271
272     def create_classify_session(self, table_index, match, pbr_option=0,
273                                 vrfid=0, is_add=1):
274         """Create Classify Session
275
276         :param int table_index: table index to identify classify table.
277         :param str match: matched value for interested traffic.
278         :param int pbr_option: enable/disable PBR feature.
279         :param int vrfid: VRF id.
280         :param int is_add: option to configure classify session.
281             - create(1) or delete(0)
282         """
283         r = self.vapi.classify_add_del_session(
284             is_add,
285             table_index,
286             binascii.unhexlify(match),
287             opaque_index=0,
288             action=pbr_option,
289             metadata=vrfid)
290         self.assertIsNotNone(r, 'No response msg for add_del_session')
291
292     def input_acl_set_interface(self, intf, table_index, is_add=1):
293         """Configure Input ACL interface
294
295         :param VppInterface intf: Interface to apply Input ACL feature.
296         :param int table_index: table index to identify classify table.
297         :param int is_add: option to configure classify session.
298             - enable(1) or disable(0)
299         """
300         r = self.vapi.input_acl_set_interface(
301             is_add,
302             intf.sw_if_index,
303             ip4_table_index=table_index)
304         self.assertIsNotNone(r, 'No response msg for acl_set_interface')
305
306     def output_acl_set_interface(self, intf, table_index, is_add=1):
307         """Configure Output ACL interface
308
309         :param VppInterface intf: Interface to apply Output ACL feature.
310         :param int table_index: table index to identify classify table.
311         :param int is_add: option to configure classify session.
312             - enable(1) or disable(0)
313         """
314         r = self.vapi.output_acl_set_interface(
315             is_add,
316             intf.sw_if_index,
317             ip4_table_index=table_index)
318         self.assertIsNotNone(r, 'No response msg for acl_set_interface')
319
320
321 # Tests split to different test case classes because of issue reported in
322 # ticket VPP-1336
323 class TestClassifierIP(TestClassifier):
324     """ Classifier IP Test Case """
325
326     def test_iacl_src_ip(self):
327         """ Source IP iACL test
328
329         Test scenario for basic IP ACL with source IP
330             - Create IPv4 stream for pg0 -> pg1 interface.
331             - Create iACL with source IP address.
332             - Send and verify received packets on pg1 interface.
333         """
334
335         # Basic iACL testing with source IP
336         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
337         self.pg0.add_stream(pkts)
338
339         key = 'ip_src'
340         self.create_classify_table(key, self.build_ip_mask(src_ip='ffffffff'))
341         self.create_classify_session(
342             self.acl_tbl_idx.get(key),
343             self.build_ip_match(src_ip=self.pg0.remote_ip4))
344         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
345         self.acl_active_table = key
346
347         self.pg_enable_capture(self.pg_interfaces)
348         self.pg_start()
349
350         pkts = self.pg1.get_capture(len(pkts))
351         self.verify_capture(self.pg1, pkts)
352         self.pg0.assert_nothing_captured(remark="packets forwarded")
353         self.pg2.assert_nothing_captured(remark="packets forwarded")
354         self.pg3.assert_nothing_captured(remark="packets forwarded")
355
356     def test_iacl_dst_ip(self):
357         """ Destination IP iACL test
358
359         Test scenario for basic IP ACL with destination IP
360             - Create IPv4 stream for pg0 -> pg1 interface.
361             - Create iACL with destination IP address.
362             - Send and verify received packets on pg1 interface.
363         """
364
365         # Basic iACL testing with destination IP
366         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
367         self.pg0.add_stream(pkts)
368
369         key = 'ip_dst'
370         self.create_classify_table(key, self.build_ip_mask(dst_ip='ffffffff'))
371         self.create_classify_session(
372             self.acl_tbl_idx.get(key),
373             self.build_ip_match(dst_ip=self.pg1.remote_ip4))
374         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
375         self.acl_active_table = key
376
377         self.pg_enable_capture(self.pg_interfaces)
378         self.pg_start()
379
380         pkts = self.pg1.get_capture(len(pkts))
381         self.verify_capture(self.pg1, pkts)
382         self.pg0.assert_nothing_captured(remark="packets forwarded")
383         self.pg2.assert_nothing_captured(remark="packets forwarded")
384         self.pg3.assert_nothing_captured(remark="packets forwarded")
385
386     def test_iacl_src_dst_ip(self):
387         """ Source and destination IP iACL test
388
389         Test scenario for basic IP ACL with source and destination IP
390             - Create IPv4 stream for pg0 -> pg1 interface.
391             - Create iACL with source and destination IP addresses.
392             - Send and verify received packets on pg1 interface.
393         """
394
395         # Basic iACL testing with source and destination IP
396         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
397         self.pg0.add_stream(pkts)
398
399         key = 'ip'
400         self.create_classify_table(
401             key, self.build_ip_mask(src_ip='ffffffff', dst_ip='ffffffff'))
402         self.create_classify_session(
403             self.acl_tbl_idx.get(key),
404             self.build_ip_match(src_ip=self.pg0.remote_ip4,
405                                 dst_ip=self.pg1.remote_ip4))
406         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
407         self.acl_active_table = key
408
409         self.pg_enable_capture(self.pg_interfaces)
410         self.pg_start()
411
412         pkts = self.pg1.get_capture(len(pkts))
413         self.verify_capture(self.pg1, pkts)
414         self.pg0.assert_nothing_captured(remark="packets forwarded")
415         self.pg2.assert_nothing_captured(remark="packets forwarded")
416         self.pg3.assert_nothing_captured(remark="packets forwarded")
417
418
419 class TestClassifierUDP(TestClassifier):
420     """ Classifier UDP proto Test Case """
421
422     def test_iacl_proto_udp(self):
423         """ UDP protocol iACL test
424
425         Test scenario for basic protocol ACL with UDP protocol
426             - Create IPv4 stream for pg0 -> pg1 interface.
427             - Create iACL with UDP IP protocol.
428             - Send and verify received packets on pg1 interface.
429         """
430
431         # Basic iACL testing with UDP protocol
432         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
433         self.pg0.add_stream(pkts)
434
435         key = 'proto_udp'
436         self.create_classify_table(key, self.build_ip_mask(proto='ff'))
437         self.create_classify_session(
438             self.acl_tbl_idx.get(key),
439             self.build_ip_match(proto=socket.IPPROTO_UDP))
440         self.input_acl_set_interface(
441             self.pg0, self.acl_tbl_idx.get(key))
442         self.acl_active_table = key
443
444         self.pg_enable_capture(self.pg_interfaces)
445         self.pg_start()
446
447         pkts = self.pg1.get_capture(len(pkts))
448         self.verify_capture(self.pg1, pkts)
449         self.pg0.assert_nothing_captured(remark="packets forwarded")
450         self.pg2.assert_nothing_captured(remark="packets forwarded")
451         self.pg3.assert_nothing_captured(remark="packets forwarded")
452
453     def test_iacl_proto_udp_sport(self):
454         """ UDP source port iACL test
455
456         Test scenario for basic protocol ACL with UDP and sport
457             - Create IPv4 stream for pg0 -> pg1 interface.
458             - Create iACL with UDP IP protocol and defined sport.
459             - Send and verify received packets on pg1 interface.
460         """
461
462         # Basic iACL testing with UDP and sport
463         sport = 38
464         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
465                                   UDP(sport=sport, dport=5678))
466         self.pg0.add_stream(pkts)
467
468         key = 'proto_udp_sport'
469         self.create_classify_table(
470             key, self.build_ip_mask(proto='ff', src_port='ffff'))
471         self.create_classify_session(
472             self.acl_tbl_idx.get(key),
473             self.build_ip_match(proto=socket.IPPROTO_UDP, src_port=sport))
474         self.input_acl_set_interface(
475             self.pg0, self.acl_tbl_idx.get(key))
476         self.acl_active_table = key
477
478         self.pg_enable_capture(self.pg_interfaces)
479         self.pg_start()
480
481         pkts = self.pg1.get_capture(len(pkts))
482         self.verify_capture(self.pg1, pkts)
483         self.pg0.assert_nothing_captured(remark="packets forwarded")
484         self.pg2.assert_nothing_captured(remark="packets forwarded")
485         self.pg3.assert_nothing_captured(remark="packets forwarded")
486
487     def test_iacl_proto_udp_dport(self):
488         """ UDP destination port iACL test
489
490         Test scenario for basic protocol ACL with UDP and dport
491             - Create IPv4 stream for pg0 -> pg1 interface.
492             - Create iACL with UDP IP protocol and defined dport.
493             - Send and verify received packets on pg1 interface.
494         """
495
496         # Basic iACL testing with UDP and dport
497         dport = 427
498         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
499                                   UDP(sport=1234, dport=dport))
500         self.pg0.add_stream(pkts)
501
502         key = 'proto_udp_dport'
503         self.create_classify_table(
504             key, self.build_ip_mask(proto='ff', dst_port='ffff'))
505         self.create_classify_session(
506             self.acl_tbl_idx.get(key),
507             self.build_ip_match(proto=socket.IPPROTO_UDP, dst_port=dport))
508         self.input_acl_set_interface(
509             self.pg0, self.acl_tbl_idx.get(key))
510         self.acl_active_table = key
511
512         self.pg_enable_capture(self.pg_interfaces)
513         self.pg_start()
514
515         pkts = self.pg1.get_capture(len(pkts))
516         self.verify_capture(self.pg1, pkts)
517         self.pg0.assert_nothing_captured(remark="packets forwarded")
518         self.pg2.assert_nothing_captured(remark="packets forwarded")
519         self.pg3.assert_nothing_captured(remark="packets forwarded")
520
521     def test_iacl_proto_udp_sport_dport(self):
522         """ UDP source and destination ports iACL test
523
524         Test scenario for basic protocol ACL with UDP and sport and dport
525             - Create IPv4 stream for pg0 -> pg1 interface.
526             - Create iACL with UDP IP protocol and defined sport and dport.
527             - Send and verify received packets on pg1 interface.
528         """
529
530         # Basic iACL testing with UDP and sport and dport
531         sport = 13720
532         dport = 9080
533         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
534                                   UDP(sport=sport, dport=dport))
535         self.pg0.add_stream(pkts)
536
537         key = 'proto_udp_ports'
538         self.create_classify_table(
539             key,
540             self.build_ip_mask(proto='ff', src_port='ffff', dst_port='ffff'))
541         self.create_classify_session(
542             self.acl_tbl_idx.get(key),
543             self.build_ip_match(proto=socket.IPPROTO_UDP, src_port=sport,
544                                 dst_port=dport))
545         self.input_acl_set_interface(
546             self.pg0, self.acl_tbl_idx.get(key))
547         self.acl_active_table = key
548
549         self.pg_enable_capture(self.pg_interfaces)
550         self.pg_start()
551
552         pkts = self.pg1.get_capture(len(pkts))
553         self.verify_capture(self.pg1, pkts)
554         self.pg0.assert_nothing_captured(remark="packets forwarded")
555         self.pg2.assert_nothing_captured(remark="packets forwarded")
556         self.pg3.assert_nothing_captured(remark="packets forwarded")
557
558
559 class TestClassifierTCP(TestClassifier):
560     """ Classifier TCP proto Test Case """
561
562     def test_iacl_proto_tcp(self):
563         """ TCP protocol iACL test
564
565         Test scenario for basic protocol ACL with TCP protocol
566             - Create IPv4 stream for pg0 -> pg1 interface.
567             - Create iACL with TCP IP protocol.
568             - Send and verify received packets on pg1 interface.
569         """
570
571         # Basic iACL testing with TCP protocol
572         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
573                                   TCP(sport=1234, dport=5678))
574         self.pg0.add_stream(pkts)
575
576         key = 'proto_tcp'
577         self.create_classify_table(key, self.build_ip_mask(proto='ff'))
578         self.create_classify_session(
579             self.acl_tbl_idx.get(key),
580             self.build_ip_match(proto=socket.IPPROTO_TCP))
581         self.input_acl_set_interface(
582             self.pg0, self.acl_tbl_idx.get(key))
583         self.acl_active_table = key
584
585         self.pg_enable_capture(self.pg_interfaces)
586         self.pg_start()
587
588         pkts = self.pg1.get_capture(len(pkts))
589         self.verify_capture(self.pg1, pkts, TCP)
590         self.pg0.assert_nothing_captured(remark="packets forwarded")
591         self.pg2.assert_nothing_captured(remark="packets forwarded")
592         self.pg3.assert_nothing_captured(remark="packets forwarded")
593
594     def test_iacl_proto_tcp_sport(self):
595         """ TCP source port iACL test
596
597         Test scenario for basic protocol ACL with TCP and sport
598             - Create IPv4 stream for pg0 -> pg1 interface.
599             - Create iACL with TCP IP protocol and defined sport.
600             - Send and verify received packets on pg1 interface.
601         """
602
603         # Basic iACL testing with TCP and sport
604         sport = 38
605         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
606                                   TCP(sport=sport, dport=5678))
607         self.pg0.add_stream(pkts)
608
609         key = 'proto_tcp_sport'
610         self.create_classify_table(
611             key, self.build_ip_mask(proto='ff', src_port='ffff'))
612         self.create_classify_session(
613             self.acl_tbl_idx.get(key),
614             self.build_ip_match(proto=socket.IPPROTO_TCP, src_port=sport))
615         self.input_acl_set_interface(
616             self.pg0, self.acl_tbl_idx.get(key))
617         self.acl_active_table = key
618
619         self.pg_enable_capture(self.pg_interfaces)
620         self.pg_start()
621
622         pkts = self.pg1.get_capture(len(pkts))
623         self.verify_capture(self.pg1, pkts, TCP)
624         self.pg0.assert_nothing_captured(remark="packets forwarded")
625         self.pg2.assert_nothing_captured(remark="packets forwarded")
626         self.pg3.assert_nothing_captured(remark="packets forwarded")
627
628     def test_iacl_proto_tcp_dport(self):
629         """ TCP destination port iACL test
630
631         Test scenario for basic protocol ACL with TCP and dport
632             - Create IPv4 stream for pg0 -> pg1 interface.
633             - Create iACL with TCP IP protocol and defined dport.
634             - Send and verify received packets on pg1 interface.
635         """
636
637         # Basic iACL testing with TCP and dport
638         dport = 427
639         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
640                                   TCP(sport=1234, dport=dport))
641         self.pg0.add_stream(pkts)
642
643         key = 'proto_tcp_sport'
644         self.create_classify_table(
645             key, self.build_ip_mask(proto='ff', dst_port='ffff'))
646         self.create_classify_session(
647             self.acl_tbl_idx.get(key),
648             self.build_ip_match(proto=socket.IPPROTO_TCP, dst_port=dport))
649         self.input_acl_set_interface(
650             self.pg0, self.acl_tbl_idx.get(key))
651         self.acl_active_table = key
652
653         self.pg_enable_capture(self.pg_interfaces)
654         self.pg_start()
655
656         pkts = self.pg1.get_capture(len(pkts))
657         self.verify_capture(self.pg1, pkts, TCP)
658         self.pg0.assert_nothing_captured(remark="packets forwarded")
659         self.pg2.assert_nothing_captured(remark="packets forwarded")
660         self.pg3.assert_nothing_captured(remark="packets forwarded")
661
662     def test_iacl_proto_tcp_sport_dport(self):
663         """ TCP source and destination ports iACL test
664
665         Test scenario for basic protocol ACL with TCP and sport and dport
666             - Create IPv4 stream for pg0 -> pg1 interface.
667             - Create iACL with TCP IP protocol and defined sport and dport.
668             - Send and verify received packets on pg1 interface.
669         """
670
671         # Basic iACL testing with TCP and sport and dport
672         sport = 13720
673         dport = 9080
674         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
675                                   TCP(sport=sport, dport=dport))
676         self.pg0.add_stream(pkts)
677
678         key = 'proto_tcp_ports'
679         self.create_classify_table(
680             key,
681             self.build_ip_mask(proto='ff', src_port='ffff', dst_port='ffff'))
682         self.create_classify_session(
683             self.acl_tbl_idx.get(key),
684             self.build_ip_match(proto=socket.IPPROTO_TCP, src_port=sport,
685                                 dst_port=dport))
686         self.input_acl_set_interface(
687             self.pg0, self.acl_tbl_idx.get(key))
688         self.acl_active_table = key
689
690         self.pg_enable_capture(self.pg_interfaces)
691         self.pg_start()
692
693         pkts = self.pg1.get_capture(len(pkts))
694         self.verify_capture(self.pg1, pkts, TCP)
695         self.pg0.assert_nothing_captured(remark="packets forwarded")
696         self.pg2.assert_nothing_captured(remark="packets forwarded")
697         self.pg3.assert_nothing_captured(remark="packets forwarded")
698
699
700 class TestClassifierIPOut(TestClassifier):
701     """ Classifier output IP Test Case """
702
703     def test_acl_ip_out(self):
704         """ Output IP ACL test
705
706         Test scenario for basic IP ACL with source IP
707             - Create IPv4 stream for pg1 -> pg0 interface.
708             - Create ACL with source IP address.
709             - Send and verify received packets on pg0 interface.
710         """
711
712         # Basic oACL testing with source IP
713         pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes)
714         self.pg1.add_stream(pkts)
715
716         key = 'ip_out'
717         self.create_classify_table(
718             key, self.build_ip_mask(src_ip='ffffffff'), data_offset=0)
719         self.create_classify_session(
720             self.acl_tbl_idx.get(key),
721             self.build_ip_match(src_ip=self.pg1.remote_ip4))
722         self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
723         self.acl_active_table = key
724
725         self.pg_enable_capture(self.pg_interfaces)
726         self.pg_start()
727
728         pkts = self.pg0.get_capture(len(pkts))
729         self.verify_capture(self.pg0, pkts)
730         self.pg1.assert_nothing_captured(remark="packets forwarded")
731         self.pg2.assert_nothing_captured(remark="packets forwarded")
732         self.pg3.assert_nothing_captured(remark="packets forwarded")
733
734
735 class TestClassifierMAC(TestClassifier):
736     """ Classifier MAC Test Case """
737
738     def test_acl_mac(self):
739         """ MAC ACL test
740
741         Test scenario for basic MAC ACL with source MAC
742             - Create IPv4 stream for pg0 -> pg2 interface.
743             - Create ACL with source MAC address.
744             - Send and verify received packets on pg2 interface.
745         """
746
747         # Basic iACL testing with source MAC
748         pkts = self.create_stream(self.pg0, self.pg2, self.pg_if_packet_sizes)
749         self.pg0.add_stream(pkts)
750
751         key = 'mac'
752         self.create_classify_table(
753             key, self.build_mac_mask(src_mac='ffffffffffff'), data_offset=-14)
754         self.create_classify_session(
755             self.acl_tbl_idx.get(key),
756             self.build_mac_match(src_mac=self.pg0.remote_mac))
757         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
758         self.acl_active_table = key
759
760         self.pg_enable_capture(self.pg_interfaces)
761         self.pg_start()
762
763         pkts = self.pg2.get_capture(len(pkts))
764         self.verify_capture(self.pg2, pkts)
765         self.pg0.assert_nothing_captured(remark="packets forwarded")
766         self.pg1.assert_nothing_captured(remark="packets forwarded")
767         self.pg3.assert_nothing_captured(remark="packets forwarded")
768
769
770 class TestClassifierPBR(TestClassifier):
771     """ Classifier PBR Test Case """
772
773     def test_acl_pbr(self):
774         """ IP PBR test
775
776         Test scenario for PBR with source IP
777             - Create IPv4 stream for pg0 -> pg3 interface.
778             - Configure PBR fib entry for packet forwarding.
779             - Send and verify received packets on pg3 interface.
780         """
781
782         # PBR testing with source IP
783         pkts = self.create_stream(self.pg0, self.pg3, self.pg_if_packet_sizes)
784         self.pg0.add_stream(pkts)
785
786         key = 'pbr'
787         self.create_classify_table(key, self.build_ip_mask(src_ip='ffffffff'))
788         pbr_option = 1
789         # this will create the VRF/table in which we will insert the route
790         self.create_classify_session(
791             self.acl_tbl_idx.get(key),
792             self.build_ip_match(src_ip=self.pg0.remote_ip4),
793             pbr_option, self.pbr_vrfid)
794         self.assertTrue(self.verify_vrf(self.pbr_vrfid))
795         self.config_pbr_fib_entry(self.pg3)
796         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
797
798         self.pg_enable_capture(self.pg_interfaces)
799         self.pg_start()
800
801         pkts = self.pg3.get_capture(len(pkts))
802         self.verify_capture(self.pg3, pkts)
803         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key), 0)
804         self.pg0.assert_nothing_captured(remark="packets forwarded")
805         self.pg1.assert_nothing_captured(remark="packets forwarded")
806         self.pg2.assert_nothing_captured(remark="packets forwarded")
807
808         # remove the classify session and the route
809         self.config_pbr_fib_entry(self.pg3, is_add=0)
810         self.create_classify_session(
811             self.acl_tbl_idx.get(key),
812             self.build_ip_match(src_ip=self.pg0.remote_ip4),
813             pbr_option, self.pbr_vrfid, is_add=0)
814
815         # and the table should be gone.
816         self.assertFalse(self.verify_vrf(self.pbr_vrfid))
817
818 if __name__ == '__main__':
819     unittest.main(testRunner=VppTestRunner)