Tests: Refactor tearDown show command logging, add lifecycle markers.
[vpp.git] / test / test_classifier.py
1 #!/usr/bin/env python
2
3 import binascii
4 import socket
5 import unittest
6
7 from framework import VppTestCase, VppTestRunner
8
9 from scapy.packet import Raw
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP, UDP, TCP
12 from util import ppp
13
14
15 class TestClassifier(VppTestCase):
16     """ Classifier Test Case """
17
18     @classmethod
19     def setUpClass(cls):
20         """
21         Perform standard class setup (defined by class method setUpClass in
22         class VppTestCase) before running the test case, set test case related
23         variables and configure VPP.
24         """
25         super(TestClassifier, cls).setUpClass()
26         cls.acl_active_table = ''
27
28     @classmethod
29     def tearDownClass(cls):
30         super(TestClassifier, cls).tearDownClass()
31
32     def setUp(self):
33         """
34         Perform test setup before test case.
35
36         **Config:**
37             - create 4 pg interfaces
38                 - untagged pg0/pg1/pg2 interface
39                     pg0 -------> pg1 (IP ACL)
40                            \
41                             ---> pg2 (MAC ACL))
42                              \
43                               -> pg3 (PBR)
44             - setup interfaces:
45                 - put it into UP state
46                 - set IPv4 addresses
47                 - resolve neighbor address using ARP
48
49         :ivar list interfaces: pg interfaces.
50         :ivar list pg_if_packet_sizes: packet sizes in test.
51         :ivar dict acl_tbl_idx: ACL table index.
52         :ivar int pbr_vrfid: VRF id for PBR test.
53         """
54         self.reset_packet_infos()
55         super(TestClassifier, self).setUp()
56
57         # create 4 pg interfaces
58         self.create_pg_interfaces(range(4))
59
60         # packet sizes to test
61         self.pg_if_packet_sizes = [64, 9018]
62
63         self.interfaces = list(self.pg_interfaces)
64
65         # ACL & PBR vars
66         self.acl_tbl_idx = {}
67         self.pbr_vrfid = 200
68
69         # setup all interfaces
70         for intf in self.interfaces:
71             intf.admin_up()
72             intf.config_ip4()
73             intf.resolve_arp()
74
75     def tearDown(self):
76         """Run standard test teardown and acl related log."""
77         if not self.vpp_dead:
78             if self.acl_active_table == 'ip_out':
79                 self.output_acl_set_interface(
80                     self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
81                 self.acl_active_table = ''
82             elif self.acl_active_table != '':
83                 self.input_acl_set_interface(
84                     self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
85                 self.acl_active_table = ''
86             for intf in self.interfaces:
87                 intf.unconfig_ip4()
88                 intf.admin_down()
89
90         super(TestClassifier, self).tearDown()
91
92     def show_commands_at_teardown(self):
93         self.logger.info(self.vapi.ppcli("show inacl type ip4"))
94         self.logger.info(self.vapi.ppcli("show outacl type ip4"))
95         self.logger.info(self.vapi.cli("show classify table verbose"))
96         self.logger.info(self.vapi.cli("show ip fib"))
97
98     def config_pbr_fib_entry(self, intf, is_add=1):
99         """Configure fib entry to route traffic toward PBR VRF table
100
101         :param VppInterface intf: destination interface to be routed for PBR.
102
103         """
104         addr_len = 24
105         self.vapi.ip_add_del_route(dst_address=intf.local_ip4n,
106                                    dst_address_length=addr_len,
107                                    next_hop_address=intf.remote_ip4n,
108                                    table_id=self.pbr_vrfid, is_add=is_add)
109
110     def create_stream(self, src_if, dst_if, packet_sizes,
111                       proto_l=UDP(sport=1234, dport=5678)):
112         """Create input packet stream for defined interfaces.
113
114         :param VppInterface src_if: Source Interface for packet stream.
115         :param VppInterface dst_if: Destination Interface for packet stream.
116         :param list packet_sizes: packet size to test.
117         :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
118         """
119         pkts = []
120
121         for size in packet_sizes:
122             info = self.create_packet_info(src_if, dst_if)
123             payload = self.info_to_payload(info)
124             p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
125                  IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) /
126                  proto_l /
127                  Raw(payload))
128             info.data = p.copy()
129             self.extend_packet(p, size)
130             pkts.append(p)
131         return pkts
132
133     def verify_capture(self, dst_if, capture, proto_l=UDP):
134         """Verify captured input packet stream for defined interface.
135
136         :param VppInterface dst_if: Interface to verify captured packet stream.
137         :param list capture: Captured packet stream.
138         :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
139         """
140         self.logger.info("Verifying capture on interface %s" % dst_if.name)
141         last_info = dict()
142         for i in self.interfaces:
143             last_info[i.sw_if_index] = None
144         dst_sw_if_index = dst_if.sw_if_index
145         for packet in capture:
146             try:
147                 ip_received = packet[IP]
148                 proto_received = packet[proto_l]
149                 payload_info = self.payload_to_info(packet[Raw])
150                 packet_index = payload_info.index
151                 self.assertEqual(payload_info.dst, dst_sw_if_index)
152                 self.logger.debug(
153                     "Got packet on port %s: src=%u (id=%u)" %
154                     (dst_if.name, payload_info.src, packet_index))
155                 next_info = self.get_next_packet_info_for_interface2(
156                     payload_info.src, dst_sw_if_index,
157                     last_info[payload_info.src])
158                 last_info[payload_info.src] = next_info
159                 self.assertTrue(next_info is not None)
160                 self.assertEqual(packet_index, next_info.index)
161                 saved_packet = next_info.data
162                 ip_saved = saved_packet[IP]
163                 proto_saved = saved_packet[proto_l]
164                 # Check standard fields
165                 self.assertEqual(ip_received.src, ip_saved.src)
166                 self.assertEqual(ip_received.dst, ip_saved.dst)
167                 self.assertEqual(proto_received.sport, proto_saved.sport)
168                 self.assertEqual(proto_received.dport, proto_saved.dport)
169             except:
170                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
171                 raise
172         for i in self.interfaces:
173             remaining_packet = self.get_next_packet_info_for_interface2(
174                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
175             self.assertTrue(remaining_packet is None,
176                             "Interface %s: Packet expected from interface %s "
177                             "didn't arrive" % (dst_if.name, i.name))
178
179     def verify_vrf(self, vrf_id):
180         """
181         Check if the FIB table / VRF ID is configured.
182
183         :param int vrf_id: The FIB table / VRF ID to be verified.
184         :return: 1 if the FIB table / VRF ID is configured, otherwise return 0.
185         """
186         ip_fib_dump = self.vapi.ip_fib_dump()
187         vrf_count = 0
188         for ip_fib_details in ip_fib_dump:
189             if ip_fib_details[2] == vrf_id:
190                 vrf_count += 1
191         if vrf_count == 0:
192             self.logger.info("IPv4 VRF ID %d is not configured" % vrf_id)
193             return 0
194         else:
195             self.logger.info("IPv4 VRF ID %d is configured" % vrf_id)
196             return 1
197
198     @staticmethod
199     def build_ip_mask(proto='', src_ip='', dst_ip='',
200                       src_port='', dst_port=''):
201         """Build IP ACL mask data with hexstring format.
202
203         :param str proto: protocol number <0-ff>
204         :param str src_ip: source ip address <0-ffffffff>
205         :param str dst_ip: destination ip address <0-ffffffff>
206         :param str src_port: source port number <0-ffff>
207         :param str dst_port: destination port number <0-ffff>
208         """
209
210         return ('{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}'.format(
211             proto, src_ip, dst_ip, src_port, dst_port)).rstrip('0')
212
213     @staticmethod
214     def build_ip_match(proto=0, src_ip='', dst_ip='',
215                        src_port=0, dst_port=0):
216         """Build IP ACL match data with hexstring format.
217
218         :param int proto: protocol number with valid option "x"
219         :param str src_ip: source ip address with format of "x.x.x.x"
220         :param str dst_ip: destination ip address with format of "x.x.x.x"
221         :param int src_port: source port number "x"
222         :param int dst_port: destination port number "x"
223         """
224         if src_ip:
225             src_ip = binascii.hexlify(socket.inet_aton(src_ip))
226         if dst_ip:
227             dst_ip = binascii.hexlify(socket.inet_aton(dst_ip))
228
229         return ('{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}'.format(
230             hex(proto)[2:], src_ip, dst_ip, hex(src_port)[2:],
231             hex(dst_port)[2:])).rstrip('0')
232
233     @staticmethod
234     def build_mac_mask(dst_mac='', src_mac='', ether_type=''):
235         """Build MAC ACL mask data with hexstring format.
236
237         :param str dst_mac: source MAC address <0-ffffffffffff>
238         :param str src_mac: destination MAC address <0-ffffffffffff>
239         :param str ether_type: ethernet type <0-ffff>
240         """
241
242         return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
243             dst_mac, src_mac, ether_type)).rstrip('0')
244
245     @staticmethod
246     def build_mac_match(dst_mac='', src_mac='', ether_type=''):
247         """Build MAC ACL match data with hexstring format.
248
249         :param str dst_mac: source MAC address <x:x:x:x:x:x>
250         :param str src_mac: destination MAC address <x:x:x:x:x:x>
251         :param str ether_type: ethernet type <0-ffff>
252         """
253         if dst_mac:
254             dst_mac = dst_mac.replace(':', '')
255         if src_mac:
256             src_mac = src_mac.replace(':', '')
257
258         return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
259             dst_mac, src_mac, ether_type)).rstrip('0')
260
261     def create_classify_table(self, key, mask, data_offset=0):
262         """Create Classify Table
263
264         :param str key: key for classify table (ex, ACL name).
265         :param str mask: mask value for interested traffic.
266         :param int data_offset:
267         """
268         r = self.vapi.classify_add_del_table(
269             is_add=1,
270             mask=binascii.unhexlify(mask),
271             match_n_vectors=(len(mask) - 1) // 32 + 1,
272             miss_next_index=0,
273             current_data_flag=1,
274             current_data_offset=data_offset)
275         self.assertIsNotNone(r, 'No response msg for add_del_table')
276         self.acl_tbl_idx[key] = r.new_table_index
277
278     def create_classify_session(self, table_index, match, pbr_option=0,
279                                 vrfid=0, is_add=1):
280         """Create Classify Session
281
282         :param int table_index: table index to identify classify table.
283         :param str match: matched value for interested traffic.
284         :param int pbr_option: enable/disable PBR feature.
285         :param int vrfid: VRF id.
286         :param int is_add: option to configure classify session.
287             - create(1) or delete(0)
288         """
289         r = self.vapi.classify_add_del_session(
290             is_add,
291             table_index,
292             binascii.unhexlify(match),
293             opaque_index=0,
294             action=pbr_option,
295             metadata=vrfid)
296         self.assertIsNotNone(r, 'No response msg for add_del_session')
297
298     def input_acl_set_interface(self, intf, table_index, is_add=1):
299         """Configure Input ACL interface
300
301         :param VppInterface intf: Interface to apply Input ACL feature.
302         :param int table_index: table index to identify classify table.
303         :param int is_add: option to configure classify session.
304             - enable(1) or disable(0)
305         """
306         r = self.vapi.input_acl_set_interface(
307             is_add,
308             intf.sw_if_index,
309             ip4_table_index=table_index)
310         self.assertIsNotNone(r, 'No response msg for acl_set_interface')
311
312     def output_acl_set_interface(self, intf, table_index, is_add=1):
313         """Configure Output ACL interface
314
315         :param VppInterface intf: Interface to apply Output ACL feature.
316         :param int table_index: table index to identify classify table.
317         :param int is_add: option to configure classify session.
318             - enable(1) or disable(0)
319         """
320         r = self.vapi.output_acl_set_interface(
321             is_add,
322             intf.sw_if_index,
323             ip4_table_index=table_index)
324         self.assertIsNotNone(r, 'No response msg for acl_set_interface')
325
326
327 # Tests split to different test case classes because of issue reported in
328 # ticket VPP-1336
329 class TestClassifierIP(TestClassifier):
330     """ Classifier IP Test Case """
331
332     @classmethod
333     def setUpClass(cls):
334         super(TestClassifierIP, cls).setUpClass()
335
336     @classmethod
337     def tearDownClass(cls):
338         super(TestClassifierIP, cls).tearDownClass()
339
340     def test_iacl_src_ip(self):
341         """ Source IP iACL test
342
343         Test scenario for basic IP ACL with source IP
344             - Create IPv4 stream for pg0 -> pg1 interface.
345             - Create iACL with source IP address.
346             - Send and verify received packets on pg1 interface.
347         """
348
349         # Basic iACL testing with source IP
350         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
351         self.pg0.add_stream(pkts)
352
353         key = 'ip_src'
354         self.create_classify_table(key, self.build_ip_mask(src_ip='ffffffff'))
355         self.create_classify_session(
356             self.acl_tbl_idx.get(key),
357             self.build_ip_match(src_ip=self.pg0.remote_ip4))
358         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
359         self.acl_active_table = key
360
361         self.pg_enable_capture(self.pg_interfaces)
362         self.pg_start()
363
364         pkts = self.pg1.get_capture(len(pkts))
365         self.verify_capture(self.pg1, pkts)
366         self.pg0.assert_nothing_captured(remark="packets forwarded")
367         self.pg2.assert_nothing_captured(remark="packets forwarded")
368         self.pg3.assert_nothing_captured(remark="packets forwarded")
369
370     def test_iacl_dst_ip(self):
371         """ Destination IP iACL test
372
373         Test scenario for basic IP ACL with destination IP
374             - Create IPv4 stream for pg0 -> pg1 interface.
375             - Create iACL with destination IP address.
376             - Send and verify received packets on pg1 interface.
377         """
378
379         # Basic iACL testing with destination IP
380         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
381         self.pg0.add_stream(pkts)
382
383         key = 'ip_dst'
384         self.create_classify_table(key, self.build_ip_mask(dst_ip='ffffffff'))
385         self.create_classify_session(
386             self.acl_tbl_idx.get(key),
387             self.build_ip_match(dst_ip=self.pg1.remote_ip4))
388         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
389         self.acl_active_table = key
390
391         self.pg_enable_capture(self.pg_interfaces)
392         self.pg_start()
393
394         pkts = self.pg1.get_capture(len(pkts))
395         self.verify_capture(self.pg1, pkts)
396         self.pg0.assert_nothing_captured(remark="packets forwarded")
397         self.pg2.assert_nothing_captured(remark="packets forwarded")
398         self.pg3.assert_nothing_captured(remark="packets forwarded")
399
400     def test_iacl_src_dst_ip(self):
401         """ Source and destination IP iACL test
402
403         Test scenario for basic IP ACL with source and destination IP
404             - Create IPv4 stream for pg0 -> pg1 interface.
405             - Create iACL with source and destination IP addresses.
406             - Send and verify received packets on pg1 interface.
407         """
408
409         # Basic iACL testing with source and destination IP
410         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
411         self.pg0.add_stream(pkts)
412
413         key = 'ip'
414         self.create_classify_table(
415             key, self.build_ip_mask(src_ip='ffffffff', dst_ip='ffffffff'))
416         self.create_classify_session(
417             self.acl_tbl_idx.get(key),
418             self.build_ip_match(src_ip=self.pg0.remote_ip4,
419                                 dst_ip=self.pg1.remote_ip4))
420         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
421         self.acl_active_table = key
422
423         self.pg_enable_capture(self.pg_interfaces)
424         self.pg_start()
425
426         pkts = self.pg1.get_capture(len(pkts))
427         self.verify_capture(self.pg1, pkts)
428         self.pg0.assert_nothing_captured(remark="packets forwarded")
429         self.pg2.assert_nothing_captured(remark="packets forwarded")
430         self.pg3.assert_nothing_captured(remark="packets forwarded")
431
432
433 class TestClassifierUDP(TestClassifier):
434     """ Classifier UDP proto Test Case """
435
436     @classmethod
437     def setUpClass(cls):
438         super(TestClassifierUDP, cls).setUpClass()
439
440     @classmethod
441     def tearDownClass(cls):
442         super(TestClassifierUDP, cls).tearDownClass()
443
444     def test_iacl_proto_udp(self):
445         """ UDP protocol iACL test
446
447         Test scenario for basic protocol ACL with UDP protocol
448             - Create IPv4 stream for pg0 -> pg1 interface.
449             - Create iACL with UDP IP protocol.
450             - Send and verify received packets on pg1 interface.
451         """
452
453         # Basic iACL testing with UDP protocol
454         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
455         self.pg0.add_stream(pkts)
456
457         key = 'proto_udp'
458         self.create_classify_table(key, self.build_ip_mask(proto='ff'))
459         self.create_classify_session(
460             self.acl_tbl_idx.get(key),
461             self.build_ip_match(proto=socket.IPPROTO_UDP))
462         self.input_acl_set_interface(
463             self.pg0, self.acl_tbl_idx.get(key))
464         self.acl_active_table = key
465
466         self.pg_enable_capture(self.pg_interfaces)
467         self.pg_start()
468
469         pkts = self.pg1.get_capture(len(pkts))
470         self.verify_capture(self.pg1, pkts)
471         self.pg0.assert_nothing_captured(remark="packets forwarded")
472         self.pg2.assert_nothing_captured(remark="packets forwarded")
473         self.pg3.assert_nothing_captured(remark="packets forwarded")
474
475     def test_iacl_proto_udp_sport(self):
476         """ UDP source port iACL test
477
478         Test scenario for basic protocol ACL with UDP and sport
479             - Create IPv4 stream for pg0 -> pg1 interface.
480             - Create iACL with UDP IP protocol and defined sport.
481             - Send and verify received packets on pg1 interface.
482         """
483
484         # Basic iACL testing with UDP and sport
485         sport = 38
486         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
487                                   UDP(sport=sport, dport=5678))
488         self.pg0.add_stream(pkts)
489
490         key = 'proto_udp_sport'
491         self.create_classify_table(
492             key, self.build_ip_mask(proto='ff', src_port='ffff'))
493         self.create_classify_session(
494             self.acl_tbl_idx.get(key),
495             self.build_ip_match(proto=socket.IPPROTO_UDP, src_port=sport))
496         self.input_acl_set_interface(
497             self.pg0, self.acl_tbl_idx.get(key))
498         self.acl_active_table = key
499
500         self.pg_enable_capture(self.pg_interfaces)
501         self.pg_start()
502
503         pkts = self.pg1.get_capture(len(pkts))
504         self.verify_capture(self.pg1, pkts)
505         self.pg0.assert_nothing_captured(remark="packets forwarded")
506         self.pg2.assert_nothing_captured(remark="packets forwarded")
507         self.pg3.assert_nothing_captured(remark="packets forwarded")
508
509     def test_iacl_proto_udp_dport(self):
510         """ UDP destination port iACL test
511
512         Test scenario for basic protocol ACL with UDP and dport
513             - Create IPv4 stream for pg0 -> pg1 interface.
514             - Create iACL with UDP IP protocol and defined dport.
515             - Send and verify received packets on pg1 interface.
516         """
517
518         # Basic iACL testing with UDP and dport
519         dport = 427
520         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
521                                   UDP(sport=1234, dport=dport))
522         self.pg0.add_stream(pkts)
523
524         key = 'proto_udp_dport'
525         self.create_classify_table(
526             key, self.build_ip_mask(proto='ff', dst_port='ffff'))
527         self.create_classify_session(
528             self.acl_tbl_idx.get(key),
529             self.build_ip_match(proto=socket.IPPROTO_UDP, dst_port=dport))
530         self.input_acl_set_interface(
531             self.pg0, self.acl_tbl_idx.get(key))
532         self.acl_active_table = key
533
534         self.pg_enable_capture(self.pg_interfaces)
535         self.pg_start()
536
537         pkts = self.pg1.get_capture(len(pkts))
538         self.verify_capture(self.pg1, pkts)
539         self.pg0.assert_nothing_captured(remark="packets forwarded")
540         self.pg2.assert_nothing_captured(remark="packets forwarded")
541         self.pg3.assert_nothing_captured(remark="packets forwarded")
542
543     def test_iacl_proto_udp_sport_dport(self):
544         """ UDP source and destination ports iACL test
545
546         Test scenario for basic protocol ACL with UDP and sport and dport
547             - Create IPv4 stream for pg0 -> pg1 interface.
548             - Create iACL with UDP IP protocol and defined sport and dport.
549             - Send and verify received packets on pg1 interface.
550         """
551
552         # Basic iACL testing with UDP and sport and dport
553         sport = 13720
554         dport = 9080
555         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
556                                   UDP(sport=sport, dport=dport))
557         self.pg0.add_stream(pkts)
558
559         key = 'proto_udp_ports'
560         self.create_classify_table(
561             key,
562             self.build_ip_mask(proto='ff', src_port='ffff', dst_port='ffff'))
563         self.create_classify_session(
564             self.acl_tbl_idx.get(key),
565             self.build_ip_match(proto=socket.IPPROTO_UDP, src_port=sport,
566                                 dst_port=dport))
567         self.input_acl_set_interface(
568             self.pg0, self.acl_tbl_idx.get(key))
569         self.acl_active_table = key
570
571         self.pg_enable_capture(self.pg_interfaces)
572         self.pg_start()
573
574         pkts = self.pg1.get_capture(len(pkts))
575         self.verify_capture(self.pg1, pkts)
576         self.pg0.assert_nothing_captured(remark="packets forwarded")
577         self.pg2.assert_nothing_captured(remark="packets forwarded")
578         self.pg3.assert_nothing_captured(remark="packets forwarded")
579
580
581 class TestClassifierTCP(TestClassifier):
582     """ Classifier TCP proto Test Case """
583
584     @classmethod
585     def setUpClass(cls):
586         super(TestClassifierTCP, cls).setUpClass()
587
588     @classmethod
589     def tearDownClass(cls):
590         super(TestClassifierTCP, cls).tearDownClass()
591
592     def test_iacl_proto_tcp(self):
593         """ TCP protocol iACL test
594
595         Test scenario for basic protocol ACL with TCP protocol
596             - Create IPv4 stream for pg0 -> pg1 interface.
597             - Create iACL with TCP IP protocol.
598             - Send and verify received packets on pg1 interface.
599         """
600
601         # Basic iACL testing with TCP protocol
602         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
603                                   TCP(sport=1234, dport=5678))
604         self.pg0.add_stream(pkts)
605
606         key = 'proto_tcp'
607         self.create_classify_table(key, self.build_ip_mask(proto='ff'))
608         self.create_classify_session(
609             self.acl_tbl_idx.get(key),
610             self.build_ip_match(proto=socket.IPPROTO_TCP))
611         self.input_acl_set_interface(
612             self.pg0, self.acl_tbl_idx.get(key))
613         self.acl_active_table = key
614
615         self.pg_enable_capture(self.pg_interfaces)
616         self.pg_start()
617
618         pkts = self.pg1.get_capture(len(pkts))
619         self.verify_capture(self.pg1, pkts, TCP)
620         self.pg0.assert_nothing_captured(remark="packets forwarded")
621         self.pg2.assert_nothing_captured(remark="packets forwarded")
622         self.pg3.assert_nothing_captured(remark="packets forwarded")
623
624     def test_iacl_proto_tcp_sport(self):
625         """ TCP source port iACL test
626
627         Test scenario for basic protocol ACL with TCP and sport
628             - Create IPv4 stream for pg0 -> pg1 interface.
629             - Create iACL with TCP IP protocol and defined sport.
630             - Send and verify received packets on pg1 interface.
631         """
632
633         # Basic iACL testing with TCP and sport
634         sport = 38
635         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
636                                   TCP(sport=sport, dport=5678))
637         self.pg0.add_stream(pkts)
638
639         key = 'proto_tcp_sport'
640         self.create_classify_table(
641             key, self.build_ip_mask(proto='ff', src_port='ffff'))
642         self.create_classify_session(
643             self.acl_tbl_idx.get(key),
644             self.build_ip_match(proto=socket.IPPROTO_TCP, src_port=sport))
645         self.input_acl_set_interface(
646             self.pg0, self.acl_tbl_idx.get(key))
647         self.acl_active_table = key
648
649         self.pg_enable_capture(self.pg_interfaces)
650         self.pg_start()
651
652         pkts = self.pg1.get_capture(len(pkts))
653         self.verify_capture(self.pg1, pkts, TCP)
654         self.pg0.assert_nothing_captured(remark="packets forwarded")
655         self.pg2.assert_nothing_captured(remark="packets forwarded")
656         self.pg3.assert_nothing_captured(remark="packets forwarded")
657
658     def test_iacl_proto_tcp_dport(self):
659         """ TCP destination port iACL test
660
661         Test scenario for basic protocol ACL with TCP and dport
662             - Create IPv4 stream for pg0 -> pg1 interface.
663             - Create iACL with TCP IP protocol and defined dport.
664             - Send and verify received packets on pg1 interface.
665         """
666
667         # Basic iACL testing with TCP and dport
668         dport = 427
669         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
670                                   TCP(sport=1234, dport=dport))
671         self.pg0.add_stream(pkts)
672
673         key = 'proto_tcp_sport'
674         self.create_classify_table(
675             key, self.build_ip_mask(proto='ff', dst_port='ffff'))
676         self.create_classify_session(
677             self.acl_tbl_idx.get(key),
678             self.build_ip_match(proto=socket.IPPROTO_TCP, dst_port=dport))
679         self.input_acl_set_interface(
680             self.pg0, self.acl_tbl_idx.get(key))
681         self.acl_active_table = key
682
683         self.pg_enable_capture(self.pg_interfaces)
684         self.pg_start()
685
686         pkts = self.pg1.get_capture(len(pkts))
687         self.verify_capture(self.pg1, pkts, TCP)
688         self.pg0.assert_nothing_captured(remark="packets forwarded")
689         self.pg2.assert_nothing_captured(remark="packets forwarded")
690         self.pg3.assert_nothing_captured(remark="packets forwarded")
691
692     def test_iacl_proto_tcp_sport_dport(self):
693         """ TCP source and destination ports iACL test
694
695         Test scenario for basic protocol ACL with TCP and sport and dport
696             - Create IPv4 stream for pg0 -> pg1 interface.
697             - Create iACL with TCP IP protocol and defined sport and dport.
698             - Send and verify received packets on pg1 interface.
699         """
700
701         # Basic iACL testing with TCP and sport and dport
702         sport = 13720
703         dport = 9080
704         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
705                                   TCP(sport=sport, dport=dport))
706         self.pg0.add_stream(pkts)
707
708         key = 'proto_tcp_ports'
709         self.create_classify_table(
710             key,
711             self.build_ip_mask(proto='ff', src_port='ffff', dst_port='ffff'))
712         self.create_classify_session(
713             self.acl_tbl_idx.get(key),
714             self.build_ip_match(proto=socket.IPPROTO_TCP, src_port=sport,
715                                 dst_port=dport))
716         self.input_acl_set_interface(
717             self.pg0, self.acl_tbl_idx.get(key))
718         self.acl_active_table = key
719
720         self.pg_enable_capture(self.pg_interfaces)
721         self.pg_start()
722
723         pkts = self.pg1.get_capture(len(pkts))
724         self.verify_capture(self.pg1, pkts, TCP)
725         self.pg0.assert_nothing_captured(remark="packets forwarded")
726         self.pg2.assert_nothing_captured(remark="packets forwarded")
727         self.pg3.assert_nothing_captured(remark="packets forwarded")
728
729
730 class TestClassifierIPOut(TestClassifier):
731     """ Classifier output IP Test Case """
732
733     @classmethod
734     def setUpClass(cls):
735         super(TestClassifierIPOut, cls).setUpClass()
736
737     @classmethod
738     def tearDownClass(cls):
739         super(TestClassifierIPOut, cls).tearDownClass()
740
741     def test_acl_ip_out(self):
742         """ Output IP ACL test
743
744         Test scenario for basic IP ACL with source IP
745             - Create IPv4 stream for pg1 -> pg0 interface.
746             - Create ACL with source IP address.
747             - Send and verify received packets on pg0 interface.
748         """
749
750         # Basic oACL testing with source IP
751         pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes)
752         self.pg1.add_stream(pkts)
753
754         key = 'ip_out'
755         self.create_classify_table(
756             key, self.build_ip_mask(src_ip='ffffffff'), data_offset=0)
757         self.create_classify_session(
758             self.acl_tbl_idx.get(key),
759             self.build_ip_match(src_ip=self.pg1.remote_ip4))
760         self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
761         self.acl_active_table = key
762
763         self.pg_enable_capture(self.pg_interfaces)
764         self.pg_start()
765
766         pkts = self.pg0.get_capture(len(pkts))
767         self.verify_capture(self.pg0, pkts)
768         self.pg1.assert_nothing_captured(remark="packets forwarded")
769         self.pg2.assert_nothing_captured(remark="packets forwarded")
770         self.pg3.assert_nothing_captured(remark="packets forwarded")
771
772
773 class TestClassifierMAC(TestClassifier):
774     """ Classifier MAC Test Case """
775
776     @classmethod
777     def setUpClass(cls):
778         super(TestClassifierMAC, cls).setUpClass()
779
780     @classmethod
781     def tearDownClass(cls):
782         super(TestClassifierMAC, cls).tearDownClass()
783
784     def test_acl_mac(self):
785         """ MAC ACL test
786
787         Test scenario for basic MAC ACL with source MAC
788             - Create IPv4 stream for pg0 -> pg2 interface.
789             - Create ACL with source MAC address.
790             - Send and verify received packets on pg2 interface.
791         """
792
793         # Basic iACL testing with source MAC
794         pkts = self.create_stream(self.pg0, self.pg2, self.pg_if_packet_sizes)
795         self.pg0.add_stream(pkts)
796
797         key = 'mac'
798         self.create_classify_table(
799             key, self.build_mac_mask(src_mac='ffffffffffff'), data_offset=-14)
800         self.create_classify_session(
801             self.acl_tbl_idx.get(key),
802             self.build_mac_match(src_mac=self.pg0.remote_mac))
803         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
804         self.acl_active_table = key
805
806         self.pg_enable_capture(self.pg_interfaces)
807         self.pg_start()
808
809         pkts = self.pg2.get_capture(len(pkts))
810         self.verify_capture(self.pg2, pkts)
811         self.pg0.assert_nothing_captured(remark="packets forwarded")
812         self.pg1.assert_nothing_captured(remark="packets forwarded")
813         self.pg3.assert_nothing_captured(remark="packets forwarded")
814
815
816 class TestClassifierPBR(TestClassifier):
817     """ Classifier PBR Test Case """
818
819     @classmethod
820     def setUpClass(cls):
821         super(TestClassifierPBR, cls).setUpClass()
822
823     @classmethod
824     def tearDownClass(cls):
825         super(TestClassifierPBR, cls).tearDownClass()
826
827     def test_acl_pbr(self):
828         """ IP PBR test
829
830         Test scenario for PBR with source IP
831             - Create IPv4 stream for pg0 -> pg3 interface.
832             - Configure PBR fib entry for packet forwarding.
833             - Send and verify received packets on pg3 interface.
834         """
835
836         # PBR testing with source IP
837         pkts = self.create_stream(self.pg0, self.pg3, self.pg_if_packet_sizes)
838         self.pg0.add_stream(pkts)
839
840         key = 'pbr'
841         self.create_classify_table(key, self.build_ip_mask(src_ip='ffffffff'))
842         pbr_option = 1
843         # this will create the VRF/table in which we will insert the route
844         self.create_classify_session(
845             self.acl_tbl_idx.get(key),
846             self.build_ip_match(src_ip=self.pg0.remote_ip4),
847             pbr_option, self.pbr_vrfid)
848         self.assertTrue(self.verify_vrf(self.pbr_vrfid))
849         self.config_pbr_fib_entry(self.pg3)
850         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
851
852         self.pg_enable_capture(self.pg_interfaces)
853         self.pg_start()
854
855         pkts = self.pg3.get_capture(len(pkts))
856         self.verify_capture(self.pg3, pkts)
857         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key), 0)
858         self.pg0.assert_nothing_captured(remark="packets forwarded")
859         self.pg1.assert_nothing_captured(remark="packets forwarded")
860         self.pg2.assert_nothing_captured(remark="packets forwarded")
861
862         # remove the classify session and the route
863         self.config_pbr_fib_entry(self.pg3, is_add=0)
864         self.create_classify_session(
865             self.acl_tbl_idx.get(key),
866             self.build_ip_match(src_ip=self.pg0.remote_ip4),
867             pbr_option, self.pbr_vrfid, is_add=0)
868
869         # and the table should be gone.
870         self.assertFalse(self.verify_vrf(self.pbr_vrfid))
871
872 if __name__ == '__main__':
873     unittest.main(testRunner=VppTestRunner)