Tests Cleanup: Fix missing calls to setUpClass/tearDownClass.
[vpp.git] / test / test_classifier.py
1 #!/usr/bin/env python
2
3 import binascii
4 import socket
5 import unittest
6
7 from framework import VppTestCase, VppTestRunner
8
9 from scapy.packet import Raw
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP, UDP, TCP
12 from util import ppp
13
14
15 class TestClassifier(VppTestCase):
16     """ Classifier Test Case """
17
18     @classmethod
19     def setUpClass(cls):
20         """
21         Perform standard class setup (defined by class method setUpClass in
22         class VppTestCase) before running the test case, set test case related
23         variables and configure VPP.
24         """
25         super(TestClassifier, cls).setUpClass()
26         cls.acl_active_table = ''
27
28     @classmethod
29     def tearDownClass(cls):
30         super(TestClassifier, cls).tearDownClass()
31
32     def setUp(self):
33         """
34         Perform test setup before test case.
35
36         **Config:**
37             - create 4 pg interfaces
38                 - untagged pg0/pg1/pg2 interface
39                     pg0 -------> pg1 (IP ACL)
40                            \
41                             ---> pg2 (MAC ACL))
42                              \
43                               -> pg3 (PBR)
44             - setup interfaces:
45                 - put it into UP state
46                 - set IPv4 addresses
47                 - resolve neighbor address using ARP
48
49         :ivar list interfaces: pg interfaces.
50         :ivar list pg_if_packet_sizes: packet sizes in test.
51         :ivar dict acl_tbl_idx: ACL table index.
52         :ivar int pbr_vrfid: VRF id for PBR test.
53         """
54         self.reset_packet_infos()
55         super(TestClassifier, self).setUp()
56
57         # create 4 pg interfaces
58         self.create_pg_interfaces(range(4))
59
60         # packet sizes to test
61         self.pg_if_packet_sizes = [64, 9018]
62
63         self.interfaces = list(self.pg_interfaces)
64
65         # ACL & PBR vars
66         self.acl_tbl_idx = {}
67         self.pbr_vrfid = 200
68
69         # setup all interfaces
70         for intf in self.interfaces:
71             intf.admin_up()
72             intf.config_ip4()
73             intf.resolve_arp()
74
75     def tearDown(self):
76         """Run standard test teardown and acl related log."""
77         if not self.vpp_dead:
78             self.logger.info(self.vapi.ppcli("show inacl type ip4"))
79             self.logger.info(self.vapi.ppcli("show outacl type ip4"))
80             self.logger.info(self.vapi.cli("show classify table verbose"))
81             self.logger.info(self.vapi.cli("show ip fib"))
82             if self.acl_active_table == 'ip_out':
83                 self.output_acl_set_interface(
84                     self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
85                 self.acl_active_table = ''
86             elif self.acl_active_table != '':
87                 self.input_acl_set_interface(
88                     self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
89                 self.acl_active_table = ''
90             for intf in self.interfaces:
91                 intf.unconfig_ip4()
92                 intf.admin_down()
93
94         super(TestClassifier, self).tearDown()
95
96     def config_pbr_fib_entry(self, intf, is_add=1):
97         """Configure fib entry to route traffic toward PBR VRF table
98
99         :param VppInterface intf: destination interface to be routed for PBR.
100
101         """
102         addr_len = 24
103         self.vapi.ip_add_del_route(dst_address=intf.local_ip4n,
104                                    dst_address_length=addr_len,
105                                    next_hop_address=intf.remote_ip4n,
106                                    table_id=self.pbr_vrfid, is_add=is_add)
107
108     def create_stream(self, src_if, dst_if, packet_sizes,
109                       proto_l=UDP(sport=1234, dport=5678)):
110         """Create input packet stream for defined interfaces.
111
112         :param VppInterface src_if: Source Interface for packet stream.
113         :param VppInterface dst_if: Destination Interface for packet stream.
114         :param list packet_sizes: packet size to test.
115         :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
116         """
117         pkts = []
118
119         for size in packet_sizes:
120             info = self.create_packet_info(src_if, dst_if)
121             payload = self.info_to_payload(info)
122             p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
123                  IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) /
124                  proto_l /
125                  Raw(payload))
126             info.data = p.copy()
127             self.extend_packet(p, size)
128             pkts.append(p)
129         return pkts
130
131     def verify_capture(self, dst_if, capture, proto_l=UDP):
132         """Verify captured input packet stream for defined interface.
133
134         :param VppInterface dst_if: Interface to verify captured packet stream.
135         :param list capture: Captured packet stream.
136         :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
137         """
138         self.logger.info("Verifying capture on interface %s" % dst_if.name)
139         last_info = dict()
140         for i in self.interfaces:
141             last_info[i.sw_if_index] = None
142         dst_sw_if_index = dst_if.sw_if_index
143         for packet in capture:
144             try:
145                 ip_received = packet[IP]
146                 proto_received = packet[proto_l]
147                 payload_info = self.payload_to_info(packet[Raw])
148                 packet_index = payload_info.index
149                 self.assertEqual(payload_info.dst, dst_sw_if_index)
150                 self.logger.debug(
151                     "Got packet on port %s: src=%u (id=%u)" %
152                     (dst_if.name, payload_info.src, packet_index))
153                 next_info = self.get_next_packet_info_for_interface2(
154                     payload_info.src, dst_sw_if_index,
155                     last_info[payload_info.src])
156                 last_info[payload_info.src] = next_info
157                 self.assertTrue(next_info is not None)
158                 self.assertEqual(packet_index, next_info.index)
159                 saved_packet = next_info.data
160                 ip_saved = saved_packet[IP]
161                 proto_saved = saved_packet[proto_l]
162                 # Check standard fields
163                 self.assertEqual(ip_received.src, ip_saved.src)
164                 self.assertEqual(ip_received.dst, ip_saved.dst)
165                 self.assertEqual(proto_received.sport, proto_saved.sport)
166                 self.assertEqual(proto_received.dport, proto_saved.dport)
167             except:
168                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
169                 raise
170         for i in self.interfaces:
171             remaining_packet = self.get_next_packet_info_for_interface2(
172                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
173             self.assertTrue(remaining_packet is None,
174                             "Interface %s: Packet expected from interface %s "
175                             "didn't arrive" % (dst_if.name, i.name))
176
177     def verify_vrf(self, vrf_id):
178         """
179         Check if the FIB table / VRF ID is configured.
180
181         :param int vrf_id: The FIB table / VRF ID to be verified.
182         :return: 1 if the FIB table / VRF ID is configured, otherwise return 0.
183         """
184         ip_fib_dump = self.vapi.ip_fib_dump()
185         vrf_count = 0
186         for ip_fib_details in ip_fib_dump:
187             if ip_fib_details[2] == vrf_id:
188                 vrf_count += 1
189         if vrf_count == 0:
190             self.logger.info("IPv4 VRF ID %d is not configured" % vrf_id)
191             return 0
192         else:
193             self.logger.info("IPv4 VRF ID %d is configured" % vrf_id)
194             return 1
195
196     @staticmethod
197     def build_ip_mask(proto='', src_ip='', dst_ip='',
198                       src_port='', dst_port=''):
199         """Build IP ACL mask data with hexstring format.
200
201         :param str proto: protocol number <0-ff>
202         :param str src_ip: source ip address <0-ffffffff>
203         :param str dst_ip: destination ip address <0-ffffffff>
204         :param str src_port: source port number <0-ffff>
205         :param str dst_port: destination port number <0-ffff>
206         """
207
208         return ('{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}'.format(
209             proto, src_ip, dst_ip, src_port, dst_port)).rstrip('0')
210
211     @staticmethod
212     def build_ip_match(proto=0, src_ip='', dst_ip='',
213                        src_port=0, dst_port=0):
214         """Build IP ACL match data with hexstring format.
215
216         :param int proto: protocol number with valid option "x"
217         :param str src_ip: source ip address with format of "x.x.x.x"
218         :param str dst_ip: destination ip address with format of "x.x.x.x"
219         :param int src_port: source port number "x"
220         :param int dst_port: destination port number "x"
221         """
222         if src_ip:
223             src_ip = binascii.hexlify(socket.inet_aton(src_ip))
224         if dst_ip:
225             dst_ip = binascii.hexlify(socket.inet_aton(dst_ip))
226
227         return ('{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}'.format(
228             hex(proto)[2:], src_ip, dst_ip, hex(src_port)[2:],
229             hex(dst_port)[2:])).rstrip('0')
230
231     @staticmethod
232     def build_mac_mask(dst_mac='', src_mac='', ether_type=''):
233         """Build MAC ACL mask data with hexstring format.
234
235         :param str dst_mac: source MAC address <0-ffffffffffff>
236         :param str src_mac: destination MAC address <0-ffffffffffff>
237         :param str ether_type: ethernet type <0-ffff>
238         """
239
240         return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
241             dst_mac, src_mac, ether_type)).rstrip('0')
242
243     @staticmethod
244     def build_mac_match(dst_mac='', src_mac='', ether_type=''):
245         """Build MAC ACL match data with hexstring format.
246
247         :param str dst_mac: source MAC address <x:x:x:x:x:x>
248         :param str src_mac: destination MAC address <x:x:x:x:x:x>
249         :param str ether_type: ethernet type <0-ffff>
250         """
251         if dst_mac:
252             dst_mac = dst_mac.replace(':', '')
253         if src_mac:
254             src_mac = src_mac.replace(':', '')
255
256         return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
257             dst_mac, src_mac, ether_type)).rstrip('0')
258
259     def create_classify_table(self, key, mask, data_offset=0):
260         """Create Classify Table
261
262         :param str key: key for classify table (ex, ACL name).
263         :param str mask: mask value for interested traffic.
264         :param int data_offset:
265         """
266         r = self.vapi.classify_add_del_table(
267             is_add=1,
268             mask=binascii.unhexlify(mask),
269             match_n_vectors=(len(mask) - 1) // 32 + 1,
270             miss_next_index=0,
271             current_data_flag=1,
272             current_data_offset=data_offset)
273         self.assertIsNotNone(r, 'No response msg for add_del_table')
274         self.acl_tbl_idx[key] = r.new_table_index
275
276     def create_classify_session(self, table_index, match, pbr_option=0,
277                                 vrfid=0, is_add=1):
278         """Create Classify Session
279
280         :param int table_index: table index to identify classify table.
281         :param str match: matched value for interested traffic.
282         :param int pbr_option: enable/disable PBR feature.
283         :param int vrfid: VRF id.
284         :param int is_add: option to configure classify session.
285             - create(1) or delete(0)
286         """
287         r = self.vapi.classify_add_del_session(
288             is_add,
289             table_index,
290             binascii.unhexlify(match),
291             opaque_index=0,
292             action=pbr_option,
293             metadata=vrfid)
294         self.assertIsNotNone(r, 'No response msg for add_del_session')
295
296     def input_acl_set_interface(self, intf, table_index, is_add=1):
297         """Configure Input ACL interface
298
299         :param VppInterface intf: Interface to apply Input ACL feature.
300         :param int table_index: table index to identify classify table.
301         :param int is_add: option to configure classify session.
302             - enable(1) or disable(0)
303         """
304         r = self.vapi.input_acl_set_interface(
305             is_add,
306             intf.sw_if_index,
307             ip4_table_index=table_index)
308         self.assertIsNotNone(r, 'No response msg for acl_set_interface')
309
310     def output_acl_set_interface(self, intf, table_index, is_add=1):
311         """Configure Output ACL interface
312
313         :param VppInterface intf: Interface to apply Output ACL feature.
314         :param int table_index: table index to identify classify table.
315         :param int is_add: option to configure classify session.
316             - enable(1) or disable(0)
317         """
318         r = self.vapi.output_acl_set_interface(
319             is_add,
320             intf.sw_if_index,
321             ip4_table_index=table_index)
322         self.assertIsNotNone(r, 'No response msg for acl_set_interface')
323
324
325 # Tests split to different test case classes because of issue reported in
326 # ticket VPP-1336
327 class TestClassifierIP(TestClassifier):
328     """ Classifier IP Test Case """
329
330     @classmethod
331     def setUpClass(cls):
332         super(TestClassifierIP, cls).setUpClass()
333
334     @classmethod
335     def tearDownClass(cls):
336         super(TestClassifierIP, cls).tearDownClass()
337
338     def test_iacl_src_ip(self):
339         """ Source IP iACL test
340
341         Test scenario for basic IP ACL with source IP
342             - Create IPv4 stream for pg0 -> pg1 interface.
343             - Create iACL with source IP address.
344             - Send and verify received packets on pg1 interface.
345         """
346
347         # Basic iACL testing with source IP
348         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
349         self.pg0.add_stream(pkts)
350
351         key = 'ip_src'
352         self.create_classify_table(key, self.build_ip_mask(src_ip='ffffffff'))
353         self.create_classify_session(
354             self.acl_tbl_idx.get(key),
355             self.build_ip_match(src_ip=self.pg0.remote_ip4))
356         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
357         self.acl_active_table = key
358
359         self.pg_enable_capture(self.pg_interfaces)
360         self.pg_start()
361
362         pkts = self.pg1.get_capture(len(pkts))
363         self.verify_capture(self.pg1, pkts)
364         self.pg0.assert_nothing_captured(remark="packets forwarded")
365         self.pg2.assert_nothing_captured(remark="packets forwarded")
366         self.pg3.assert_nothing_captured(remark="packets forwarded")
367
368     def test_iacl_dst_ip(self):
369         """ Destination IP iACL test
370
371         Test scenario for basic IP ACL with destination IP
372             - Create IPv4 stream for pg0 -> pg1 interface.
373             - Create iACL with destination IP address.
374             - Send and verify received packets on pg1 interface.
375         """
376
377         # Basic iACL testing with destination IP
378         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
379         self.pg0.add_stream(pkts)
380
381         key = 'ip_dst'
382         self.create_classify_table(key, self.build_ip_mask(dst_ip='ffffffff'))
383         self.create_classify_session(
384             self.acl_tbl_idx.get(key),
385             self.build_ip_match(dst_ip=self.pg1.remote_ip4))
386         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
387         self.acl_active_table = key
388
389         self.pg_enable_capture(self.pg_interfaces)
390         self.pg_start()
391
392         pkts = self.pg1.get_capture(len(pkts))
393         self.verify_capture(self.pg1, pkts)
394         self.pg0.assert_nothing_captured(remark="packets forwarded")
395         self.pg2.assert_nothing_captured(remark="packets forwarded")
396         self.pg3.assert_nothing_captured(remark="packets forwarded")
397
398     def test_iacl_src_dst_ip(self):
399         """ Source and destination IP iACL test
400
401         Test scenario for basic IP ACL with source and destination IP
402             - Create IPv4 stream for pg0 -> pg1 interface.
403             - Create iACL with source and destination IP addresses.
404             - Send and verify received packets on pg1 interface.
405         """
406
407         # Basic iACL testing with source and destination IP
408         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
409         self.pg0.add_stream(pkts)
410
411         key = 'ip'
412         self.create_classify_table(
413             key, self.build_ip_mask(src_ip='ffffffff', dst_ip='ffffffff'))
414         self.create_classify_session(
415             self.acl_tbl_idx.get(key),
416             self.build_ip_match(src_ip=self.pg0.remote_ip4,
417                                 dst_ip=self.pg1.remote_ip4))
418         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
419         self.acl_active_table = key
420
421         self.pg_enable_capture(self.pg_interfaces)
422         self.pg_start()
423
424         pkts = self.pg1.get_capture(len(pkts))
425         self.verify_capture(self.pg1, pkts)
426         self.pg0.assert_nothing_captured(remark="packets forwarded")
427         self.pg2.assert_nothing_captured(remark="packets forwarded")
428         self.pg3.assert_nothing_captured(remark="packets forwarded")
429
430
431 class TestClassifierUDP(TestClassifier):
432     """ Classifier UDP proto Test Case """
433
434     @classmethod
435     def setUpClass(cls):
436         super(TestClassifierUDP, cls).setUpClass()
437
438     @classmethod
439     def tearDownClass(cls):
440         super(TestClassifierUDP, cls).tearDownClass()
441
442     def test_iacl_proto_udp(self):
443         """ UDP protocol iACL test
444
445         Test scenario for basic protocol ACL with UDP protocol
446             - Create IPv4 stream for pg0 -> pg1 interface.
447             - Create iACL with UDP IP protocol.
448             - Send and verify received packets on pg1 interface.
449         """
450
451         # Basic iACL testing with UDP protocol
452         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
453         self.pg0.add_stream(pkts)
454
455         key = 'proto_udp'
456         self.create_classify_table(key, self.build_ip_mask(proto='ff'))
457         self.create_classify_session(
458             self.acl_tbl_idx.get(key),
459             self.build_ip_match(proto=socket.IPPROTO_UDP))
460         self.input_acl_set_interface(
461             self.pg0, self.acl_tbl_idx.get(key))
462         self.acl_active_table = key
463
464         self.pg_enable_capture(self.pg_interfaces)
465         self.pg_start()
466
467         pkts = self.pg1.get_capture(len(pkts))
468         self.verify_capture(self.pg1, pkts)
469         self.pg0.assert_nothing_captured(remark="packets forwarded")
470         self.pg2.assert_nothing_captured(remark="packets forwarded")
471         self.pg3.assert_nothing_captured(remark="packets forwarded")
472
473     def test_iacl_proto_udp_sport(self):
474         """ UDP source port iACL test
475
476         Test scenario for basic protocol ACL with UDP and sport
477             - Create IPv4 stream for pg0 -> pg1 interface.
478             - Create iACL with UDP IP protocol and defined sport.
479             - Send and verify received packets on pg1 interface.
480         """
481
482         # Basic iACL testing with UDP and sport
483         sport = 38
484         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
485                                   UDP(sport=sport, dport=5678))
486         self.pg0.add_stream(pkts)
487
488         key = 'proto_udp_sport'
489         self.create_classify_table(
490             key, self.build_ip_mask(proto='ff', src_port='ffff'))
491         self.create_classify_session(
492             self.acl_tbl_idx.get(key),
493             self.build_ip_match(proto=socket.IPPROTO_UDP, src_port=sport))
494         self.input_acl_set_interface(
495             self.pg0, self.acl_tbl_idx.get(key))
496         self.acl_active_table = key
497
498         self.pg_enable_capture(self.pg_interfaces)
499         self.pg_start()
500
501         pkts = self.pg1.get_capture(len(pkts))
502         self.verify_capture(self.pg1, pkts)
503         self.pg0.assert_nothing_captured(remark="packets forwarded")
504         self.pg2.assert_nothing_captured(remark="packets forwarded")
505         self.pg3.assert_nothing_captured(remark="packets forwarded")
506
507     def test_iacl_proto_udp_dport(self):
508         """ UDP destination port iACL test
509
510         Test scenario for basic protocol ACL with UDP and dport
511             - Create IPv4 stream for pg0 -> pg1 interface.
512             - Create iACL with UDP IP protocol and defined dport.
513             - Send and verify received packets on pg1 interface.
514         """
515
516         # Basic iACL testing with UDP and dport
517         dport = 427
518         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
519                                   UDP(sport=1234, dport=dport))
520         self.pg0.add_stream(pkts)
521
522         key = 'proto_udp_dport'
523         self.create_classify_table(
524             key, self.build_ip_mask(proto='ff', dst_port='ffff'))
525         self.create_classify_session(
526             self.acl_tbl_idx.get(key),
527             self.build_ip_match(proto=socket.IPPROTO_UDP, dst_port=dport))
528         self.input_acl_set_interface(
529             self.pg0, self.acl_tbl_idx.get(key))
530         self.acl_active_table = key
531
532         self.pg_enable_capture(self.pg_interfaces)
533         self.pg_start()
534
535         pkts = self.pg1.get_capture(len(pkts))
536         self.verify_capture(self.pg1, pkts)
537         self.pg0.assert_nothing_captured(remark="packets forwarded")
538         self.pg2.assert_nothing_captured(remark="packets forwarded")
539         self.pg3.assert_nothing_captured(remark="packets forwarded")
540
541     def test_iacl_proto_udp_sport_dport(self):
542         """ UDP source and destination ports iACL test
543
544         Test scenario for basic protocol ACL with UDP and sport and dport
545             - Create IPv4 stream for pg0 -> pg1 interface.
546             - Create iACL with UDP IP protocol and defined sport and dport.
547             - Send and verify received packets on pg1 interface.
548         """
549
550         # Basic iACL testing with UDP and sport and dport
551         sport = 13720
552         dport = 9080
553         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
554                                   UDP(sport=sport, dport=dport))
555         self.pg0.add_stream(pkts)
556
557         key = 'proto_udp_ports'
558         self.create_classify_table(
559             key,
560             self.build_ip_mask(proto='ff', src_port='ffff', dst_port='ffff'))
561         self.create_classify_session(
562             self.acl_tbl_idx.get(key),
563             self.build_ip_match(proto=socket.IPPROTO_UDP, src_port=sport,
564                                 dst_port=dport))
565         self.input_acl_set_interface(
566             self.pg0, self.acl_tbl_idx.get(key))
567         self.acl_active_table = key
568
569         self.pg_enable_capture(self.pg_interfaces)
570         self.pg_start()
571
572         pkts = self.pg1.get_capture(len(pkts))
573         self.verify_capture(self.pg1, pkts)
574         self.pg0.assert_nothing_captured(remark="packets forwarded")
575         self.pg2.assert_nothing_captured(remark="packets forwarded")
576         self.pg3.assert_nothing_captured(remark="packets forwarded")
577
578
579 class TestClassifierTCP(TestClassifier):
580     """ Classifier TCP proto Test Case """
581
582     @classmethod
583     def setUpClass(cls):
584         super(TestClassifierTCP, cls).setUpClass()
585
586     @classmethod
587     def tearDownClass(cls):
588         super(TestClassifierTCP, cls).tearDownClass()
589
590     def test_iacl_proto_tcp(self):
591         """ TCP protocol iACL test
592
593         Test scenario for basic protocol ACL with TCP protocol
594             - Create IPv4 stream for pg0 -> pg1 interface.
595             - Create iACL with TCP IP protocol.
596             - Send and verify received packets on pg1 interface.
597         """
598
599         # Basic iACL testing with TCP protocol
600         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
601                                   TCP(sport=1234, dport=5678))
602         self.pg0.add_stream(pkts)
603
604         key = 'proto_tcp'
605         self.create_classify_table(key, self.build_ip_mask(proto='ff'))
606         self.create_classify_session(
607             self.acl_tbl_idx.get(key),
608             self.build_ip_match(proto=socket.IPPROTO_TCP))
609         self.input_acl_set_interface(
610             self.pg0, self.acl_tbl_idx.get(key))
611         self.acl_active_table = key
612
613         self.pg_enable_capture(self.pg_interfaces)
614         self.pg_start()
615
616         pkts = self.pg1.get_capture(len(pkts))
617         self.verify_capture(self.pg1, pkts, TCP)
618         self.pg0.assert_nothing_captured(remark="packets forwarded")
619         self.pg2.assert_nothing_captured(remark="packets forwarded")
620         self.pg3.assert_nothing_captured(remark="packets forwarded")
621
622     def test_iacl_proto_tcp_sport(self):
623         """ TCP source port iACL test
624
625         Test scenario for basic protocol ACL with TCP and sport
626             - Create IPv4 stream for pg0 -> pg1 interface.
627             - Create iACL with TCP IP protocol and defined sport.
628             - Send and verify received packets on pg1 interface.
629         """
630
631         # Basic iACL testing with TCP and sport
632         sport = 38
633         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
634                                   TCP(sport=sport, dport=5678))
635         self.pg0.add_stream(pkts)
636
637         key = 'proto_tcp_sport'
638         self.create_classify_table(
639             key, self.build_ip_mask(proto='ff', src_port='ffff'))
640         self.create_classify_session(
641             self.acl_tbl_idx.get(key),
642             self.build_ip_match(proto=socket.IPPROTO_TCP, src_port=sport))
643         self.input_acl_set_interface(
644             self.pg0, self.acl_tbl_idx.get(key))
645         self.acl_active_table = key
646
647         self.pg_enable_capture(self.pg_interfaces)
648         self.pg_start()
649
650         pkts = self.pg1.get_capture(len(pkts))
651         self.verify_capture(self.pg1, pkts, TCP)
652         self.pg0.assert_nothing_captured(remark="packets forwarded")
653         self.pg2.assert_nothing_captured(remark="packets forwarded")
654         self.pg3.assert_nothing_captured(remark="packets forwarded")
655
656     def test_iacl_proto_tcp_dport(self):
657         """ TCP destination port iACL test
658
659         Test scenario for basic protocol ACL with TCP and dport
660             - Create IPv4 stream for pg0 -> pg1 interface.
661             - Create iACL with TCP IP protocol and defined dport.
662             - Send and verify received packets on pg1 interface.
663         """
664
665         # Basic iACL testing with TCP and dport
666         dport = 427
667         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
668                                   TCP(sport=1234, dport=dport))
669         self.pg0.add_stream(pkts)
670
671         key = 'proto_tcp_sport'
672         self.create_classify_table(
673             key, self.build_ip_mask(proto='ff', dst_port='ffff'))
674         self.create_classify_session(
675             self.acl_tbl_idx.get(key),
676             self.build_ip_match(proto=socket.IPPROTO_TCP, dst_port=dport))
677         self.input_acl_set_interface(
678             self.pg0, self.acl_tbl_idx.get(key))
679         self.acl_active_table = key
680
681         self.pg_enable_capture(self.pg_interfaces)
682         self.pg_start()
683
684         pkts = self.pg1.get_capture(len(pkts))
685         self.verify_capture(self.pg1, pkts, TCP)
686         self.pg0.assert_nothing_captured(remark="packets forwarded")
687         self.pg2.assert_nothing_captured(remark="packets forwarded")
688         self.pg3.assert_nothing_captured(remark="packets forwarded")
689
690     def test_iacl_proto_tcp_sport_dport(self):
691         """ TCP source and destination ports iACL test
692
693         Test scenario for basic protocol ACL with TCP and sport and dport
694             - Create IPv4 stream for pg0 -> pg1 interface.
695             - Create iACL with TCP IP protocol and defined sport and dport.
696             - Send and verify received packets on pg1 interface.
697         """
698
699         # Basic iACL testing with TCP and sport and dport
700         sport = 13720
701         dport = 9080
702         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
703                                   TCP(sport=sport, dport=dport))
704         self.pg0.add_stream(pkts)
705
706         key = 'proto_tcp_ports'
707         self.create_classify_table(
708             key,
709             self.build_ip_mask(proto='ff', src_port='ffff', dst_port='ffff'))
710         self.create_classify_session(
711             self.acl_tbl_idx.get(key),
712             self.build_ip_match(proto=socket.IPPROTO_TCP, src_port=sport,
713                                 dst_port=dport))
714         self.input_acl_set_interface(
715             self.pg0, self.acl_tbl_idx.get(key))
716         self.acl_active_table = key
717
718         self.pg_enable_capture(self.pg_interfaces)
719         self.pg_start()
720
721         pkts = self.pg1.get_capture(len(pkts))
722         self.verify_capture(self.pg1, pkts, TCP)
723         self.pg0.assert_nothing_captured(remark="packets forwarded")
724         self.pg2.assert_nothing_captured(remark="packets forwarded")
725         self.pg3.assert_nothing_captured(remark="packets forwarded")
726
727
728 class TestClassifierIPOut(TestClassifier):
729     """ Classifier output IP Test Case """
730
731     @classmethod
732     def setUpClass(cls):
733         super(TestClassifierIPOut, cls).setUpClass()
734
735     @classmethod
736     def tearDownClass(cls):
737         super(TestClassifierIPOut, cls).tearDownClass()
738
739     def test_acl_ip_out(self):
740         """ Output IP ACL test
741
742         Test scenario for basic IP ACL with source IP
743             - Create IPv4 stream for pg1 -> pg0 interface.
744             - Create ACL with source IP address.
745             - Send and verify received packets on pg0 interface.
746         """
747
748         # Basic oACL testing with source IP
749         pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes)
750         self.pg1.add_stream(pkts)
751
752         key = 'ip_out'
753         self.create_classify_table(
754             key, self.build_ip_mask(src_ip='ffffffff'), data_offset=0)
755         self.create_classify_session(
756             self.acl_tbl_idx.get(key),
757             self.build_ip_match(src_ip=self.pg1.remote_ip4))
758         self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
759         self.acl_active_table = key
760
761         self.pg_enable_capture(self.pg_interfaces)
762         self.pg_start()
763
764         pkts = self.pg0.get_capture(len(pkts))
765         self.verify_capture(self.pg0, pkts)
766         self.pg1.assert_nothing_captured(remark="packets forwarded")
767         self.pg2.assert_nothing_captured(remark="packets forwarded")
768         self.pg3.assert_nothing_captured(remark="packets forwarded")
769
770
771 class TestClassifierMAC(TestClassifier):
772     """ Classifier MAC Test Case """
773
774     @classmethod
775     def setUpClass(cls):
776         super(TestClassifierMAC, cls).setUpClass()
777
778     @classmethod
779     def tearDownClass(cls):
780         super(TestClassifierMAC, cls).tearDownClass()
781
782     def test_acl_mac(self):
783         """ MAC ACL test
784
785         Test scenario for basic MAC ACL with source MAC
786             - Create IPv4 stream for pg0 -> pg2 interface.
787             - Create ACL with source MAC address.
788             - Send and verify received packets on pg2 interface.
789         """
790
791         # Basic iACL testing with source MAC
792         pkts = self.create_stream(self.pg0, self.pg2, self.pg_if_packet_sizes)
793         self.pg0.add_stream(pkts)
794
795         key = 'mac'
796         self.create_classify_table(
797             key, self.build_mac_mask(src_mac='ffffffffffff'), data_offset=-14)
798         self.create_classify_session(
799             self.acl_tbl_idx.get(key),
800             self.build_mac_match(src_mac=self.pg0.remote_mac))
801         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
802         self.acl_active_table = key
803
804         self.pg_enable_capture(self.pg_interfaces)
805         self.pg_start()
806
807         pkts = self.pg2.get_capture(len(pkts))
808         self.verify_capture(self.pg2, pkts)
809         self.pg0.assert_nothing_captured(remark="packets forwarded")
810         self.pg1.assert_nothing_captured(remark="packets forwarded")
811         self.pg3.assert_nothing_captured(remark="packets forwarded")
812
813
814 class TestClassifierPBR(TestClassifier):
815     """ Classifier PBR Test Case """
816
817     @classmethod
818     def setUpClass(cls):
819         super(TestClassifierPBR, cls).setUpClass()
820
821     @classmethod
822     def tearDownClass(cls):
823         super(TestClassifierPBR, cls).tearDownClass()
824
825     def test_acl_pbr(self):
826         """ IP PBR test
827
828         Test scenario for PBR with source IP
829             - Create IPv4 stream for pg0 -> pg3 interface.
830             - Configure PBR fib entry for packet forwarding.
831             - Send and verify received packets on pg3 interface.
832         """
833
834         # PBR testing with source IP
835         pkts = self.create_stream(self.pg0, self.pg3, self.pg_if_packet_sizes)
836         self.pg0.add_stream(pkts)
837
838         key = 'pbr'
839         self.create_classify_table(key, self.build_ip_mask(src_ip='ffffffff'))
840         pbr_option = 1
841         # this will create the VRF/table in which we will insert the route
842         self.create_classify_session(
843             self.acl_tbl_idx.get(key),
844             self.build_ip_match(src_ip=self.pg0.remote_ip4),
845             pbr_option, self.pbr_vrfid)
846         self.assertTrue(self.verify_vrf(self.pbr_vrfid))
847         self.config_pbr_fib_entry(self.pg3)
848         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
849
850         self.pg_enable_capture(self.pg_interfaces)
851         self.pg_start()
852
853         pkts = self.pg3.get_capture(len(pkts))
854         self.verify_capture(self.pg3, pkts)
855         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key), 0)
856         self.pg0.assert_nothing_captured(remark="packets forwarded")
857         self.pg1.assert_nothing_captured(remark="packets forwarded")
858         self.pg2.assert_nothing_captured(remark="packets forwarded")
859
860         # remove the classify session and the route
861         self.config_pbr_fib_entry(self.pg3, is_add=0)
862         self.create_classify_session(
863             self.acl_tbl_idx.get(key),
864             self.build_ip_match(src_ip=self.pg0.remote_ip4),
865             pbr_option, self.pbr_vrfid, is_add=0)
866
867         # and the table should be gone.
868         self.assertFalse(self.verify_vrf(self.pbr_vrfid))
869
870 if __name__ == '__main__':
871     unittest.main(testRunner=VppTestRunner)