Tests: Refactor tearDown show command logging, add lifecycle markers.
[vpp.git] / test / test_classifier_ip6.py
1 #!/usr/bin/env python
2
3 import unittest
4 import socket
5 import binascii
6 import sys
7
8 from framework import VppTestCase, VppTestRunner
9
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether
12 from scapy.layers.inet6 import IPv6, UDP, TCP
13 from util import ppp
14
15
16 class TestClassifier(VppTestCase):
17     """ Classifier Test Case """
18
19     @classmethod
20     def setUpClass(cls):
21         """
22         Perform standard class setup (defined by class method setUpClass in
23         class VppTestCase) before running the test case, set test case related
24         variables and configure VPP.
25         """
26         super(TestClassifier, cls).setUpClass()
27         cls.acl_active_table = ''
28
29     @classmethod
30     def tearDownClass(cls):
31         super(TestClassifier, cls).tearDownClass()
32
33     def setUp(self):
34         """
35         Perform test setup before test case.
36
37         **Config:**
38             - create 4 pg interfaces
39                 - untagged pg0/pg1/pg2 interface
40                     pg0 -------> pg1 (IP ACL)
41                            \
42                             ---> pg2 (MAC ACL))
43             - setup interfaces:
44                 - put it into UP state
45                 - set IPv6 addresses
46                 - resolve neighbor address using NDP
47
48         :ivar list interfaces: pg interfaces.
49         :ivar list pg_if_packet_sizes: packet sizes in test.
50         :ivar dict acl_tbl_idx: ACL table index.
51         :ivar int pbr_vrfid: VRF id for PBR test.
52         """
53         self.reset_packet_infos()
54         super(TestClassifier, self).setUp()
55
56         # create 4 pg interfaces
57         self.create_pg_interfaces(range(3))
58
59         # packet sizes to test
60         self.pg_if_packet_sizes = [64, 9018]
61
62         self.interfaces = list(self.pg_interfaces)
63
64         # ACL vars
65         self.acl_tbl_idx = {}
66
67         # setup all interfaces
68         for intf in self.interfaces:
69             intf.admin_up()
70             intf.config_ip6()
71             intf.resolve_ndp()
72
73     def tearDown(self):
74         """Run standard test teardown and acl related log."""
75         if not self.vpp_dead:
76             if self.acl_active_table == 'ip6_out':
77                 self.output_acl_set_interface(
78                     self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
79                 self.acl_active_table = ''
80             elif self.acl_active_table != '':
81                 self.input_acl_set_interface(
82                     self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
83                 self.acl_active_table = ''
84             for intf in self.interfaces:
85                 intf.unconfig_ip6()
86                 intf.admin_down()
87
88         super(TestClassifier, self).tearDown()
89
90     def show_commands_at_teardown(self):
91         self.logger.info(self.vapi.ppcli("show inacl type ip6"))
92         self.logger.info(self.vapi.ppcli("show outacl type ip6"))
93         self.logger.info(self.vapi.cli("show classify table verbose"))
94         self.logger.info(self.vapi.cli("show ip fib"))
95
96     def create_stream(self, src_if, dst_if, packet_sizes,
97                       proto_l=UDP(sport=1234, dport=5678)):
98         """Create input packet stream for defined interfaces.
99
100         :param VppInterface src_if: Source Interface for packet stream.
101         :param VppInterface dst_if: Destination Interface for packet stream.
102         :param list packet_sizes: packet size to test.
103         :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
104         """
105         pkts = []
106
107         for size in packet_sizes:
108             info = self.create_packet_info(src_if, dst_if)
109             payload = self.info_to_payload(info)
110             p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
111                  IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6) /
112                  proto_l /
113                  Raw(payload))
114             info.data = p.copy()
115             self.extend_packet(p, size)
116             pkts.append(p)
117         return pkts
118
119     def verify_capture(self, dst_if, capture, proto_l=UDP):
120         """Verify captured input packet stream for defined interface.
121
122         :param VppInterface dst_if: Interface to verify captured packet stream.
123         :param list capture: Captured packet stream.
124         :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
125         """
126         self.logger.info("Verifying capture on interface %s" % dst_if.name)
127         last_info = dict()
128         for i in self.interfaces:
129             last_info[i.sw_if_index] = None
130         dst_sw_if_index = dst_if.sw_if_index
131         for packet in capture:
132             try:
133                 ip6_received = packet[IPv6]
134                 proto_received = packet[proto_l]
135                 payload_info = self.payload_to_info(packet[Raw])
136                 packet_index = payload_info.index
137                 self.assertEqual(payload_info.dst, dst_sw_if_index)
138                 self.logger.debug(
139                     "Got packet on port %s: src=%u (id=%u)" %
140                     (dst_if.name, payload_info.src, packet_index))
141                 next_info = self.get_next_packet_info_for_interface2(
142                     payload_info.src, dst_sw_if_index,
143                     last_info[payload_info.src])
144                 last_info[payload_info.src] = next_info
145                 self.assertTrue(next_info is not None)
146                 self.assertEqual(packet_index, next_info.index)
147                 saved_packet = next_info.data
148                 ip_saved = saved_packet[IPv6]
149                 proto_saved = saved_packet[proto_l]
150                 # Check standard fields
151                 self.assertEqual(ip6_received.src, ip_saved.src)
152                 self.assertEqual(ip6_received.dst, ip_saved.dst)
153                 self.assertEqual(proto_received.sport, proto_saved.sport)
154                 self.assertEqual(proto_received.dport, proto_saved.dport)
155             except:
156                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
157                 raise
158         for i in self.interfaces:
159             remaining_packet = self.get_next_packet_info_for_interface2(
160                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
161             self.assertTrue(remaining_packet is None,
162                             "Interface %s: Packet expected from interface %s "
163                             "didn't arrive" % (dst_if.name, i.name))
164
165     @staticmethod
166     def build_ip6_mask(nh='', src_ip='', dst_ip='',
167                        src_port='', dst_port=''):
168         """Build IPv6 ACL mask data with hexstring format.
169
170         :param str nh: next header number <0-ff>
171         :param str src_ip: source ip address <0-ffffffff>
172         :param str dst_ip: destination ip address <0-ffffffff>
173         :param str src_port: source port number <0-ffff>
174         :param str dst_port: destination port number <0-ffff>
175         """
176
177         return ('{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}'.format(
178             nh, src_ip, dst_ip, src_port, dst_port)).rstrip('0')
179
180     @staticmethod
181     def build_ip6_match(nh=0, src_ip='', dst_ip='',
182                         src_port=0, dst_port=0):
183         """Build IPv6 ACL match data with hexstring format.
184
185         :param int nh: next header number with valid option "x"
186         :param str src_ip: source ip6 address with format of "xxx:xxxx::xxxx"
187         :param str dst_ip: destination ip6 address with format of
188             "xxx:xxxx::xxxx"
189         :param int src_port: source port number "x"
190         :param int dst_port: destination port number "x"
191         """
192         if src_ip:
193             src_ip = binascii.hexlify(socket.inet_pton(
194                 socket.AF_INET6, src_ip))
195         if dst_ip:
196             dst_ip = binascii.hexlify(socket.inet_pton(
197                 socket.AF_INET6, dst_ip))
198
199         return ('{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}'.format(
200             hex(nh)[2:], src_ip, dst_ip, hex(src_port)[2:],
201             hex(dst_port)[2:])).rstrip('0')
202
203     @staticmethod
204     def build_mac_mask(dst_mac='', src_mac='', ether_type=''):
205         """Build MAC ACL mask data with hexstring format.
206
207         :param str dst_mac: source MAC address <0-ffffffffffff>
208         :param str src_mac: destination MAC address <0-ffffffffffff>
209         :param str ether_type: ethernet type <0-ffff>
210         """
211
212         return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
213             dst_mac, src_mac, ether_type)).rstrip('0')
214
215     @staticmethod
216     def build_mac_match(dst_mac='', src_mac='', ether_type=''):
217         """Build MAC ACL match data with hexstring format.
218
219         :param str dst_mac: source MAC address <x:x:x:x:x:x>
220         :param str src_mac: destination MAC address <x:x:x:x:x:x>
221         :param str ether_type: ethernet type <0-ffff>
222         """
223         if dst_mac:
224             dst_mac = dst_mac.replace(':', '')
225         if src_mac:
226             src_mac = src_mac.replace(':', '')
227
228         return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
229             dst_mac, src_mac, ether_type)).rstrip('0')
230
231     def create_classify_table(self, key, mask, data_offset=0):
232         """Create Classify Table
233
234         :param str key: key for classify table (ex, ACL name).
235         :param str mask: mask value for interested traffic.
236         :param int data_offset:
237         """
238         r = self.vapi.classify_add_del_table(
239             is_add=1,
240             mask=binascii.unhexlify(mask),
241             match_n_vectors=(len(mask) - 1) // 32 + 1,
242             miss_next_index=0,
243             current_data_flag=1,
244             current_data_offset=data_offset)
245         self.assertIsNotNone(r, 'No response msg for add_del_table')
246         self.acl_tbl_idx[key] = r.new_table_index
247
248     def create_classify_session(self, table_index, match, vrfid=0, is_add=1):
249         """Create Classify Session
250
251         :param int table_index: table index to identify classify table.
252         :param str match: matched value for interested traffic.
253         :param int vrfid: VRF id.
254         :param int is_add: option to configure classify session.
255             - create(1) or delete(0)
256         """
257         r = self.vapi.classify_add_del_session(
258             is_add,
259             table_index,
260             binascii.unhexlify(match),
261             opaque_index=0,
262             metadata=vrfid)
263         self.assertIsNotNone(r, 'No response msg for add_del_session')
264
265     def input_acl_set_interface(self, intf, table_index, is_add=1):
266         """Configure Input ACL interface
267
268         :param VppInterface intf: Interface to apply Input ACL feature.
269         :param int table_index: table index to identify classify table.
270         :param int is_add: option to configure classify session.
271             - enable(1) or disable(0)
272         """
273         r = self.vapi.input_acl_set_interface(
274             is_add,
275             intf.sw_if_index,
276             ip6_table_index=table_index)
277         self.assertIsNotNone(r, 'No response msg for acl_set_interface')
278
279     def output_acl_set_interface(self, intf, table_index, is_add=1):
280         """Configure Output ACL interface
281
282         :param VppInterface intf: Interface to apply Output ACL feature.
283         :param int table_index: table index to identify classify table.
284         :param int is_add: option to configure classify session.
285             - enable(1) or disable(0)
286         """
287         r = self.vapi.output_acl_set_interface(
288             is_add,
289             intf.sw_if_index,
290             ip6_table_index=table_index)
291         self.assertIsNotNone(r, 'No response msg for acl_set_interface')
292
293
294 class TestClassifierIP6(TestClassifier):
295     """ Classifier IP6 Test Case """
296
297     @classmethod
298     def setUpClass(cls):
299         super(TestClassifierIP6, cls).setUpClass()
300
301     @classmethod
302     def tearDownClass(cls):
303         super(TestClassifierIP6, cls).tearDownClass()
304
305     def test_iacl_src_ip(self):
306         """ Source IP6 iACL test
307
308         Test scenario for basic IP ACL with source IP
309             - Create IPv6 stream for pg0 -> pg1 interface.
310             - Create iACL with source IP address.
311             - Send and verify received packets on pg1 interface.
312         """
313
314         # Basic iACL testing with source IP
315         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
316         self.pg0.add_stream(pkts)
317
318         key = 'ip6_src'
319         self.create_classify_table(
320             key,
321             self.build_ip6_mask(src_ip='ffffffffffffffffffffffffffffffff'))
322         self.create_classify_session(
323             self.acl_tbl_idx.get(key),
324             self.build_ip6_match(src_ip=self.pg0.remote_ip6))
325         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
326         self.acl_active_table = key
327
328         self.pg_enable_capture(self.pg_interfaces)
329         self.pg_start()
330
331         pkts = self.pg1.get_capture(len(pkts))
332         self.verify_capture(self.pg1, pkts)
333         self.pg0.assert_nothing_captured(remark="packets forwarded")
334         self.pg2.assert_nothing_captured(remark="packets forwarded")
335
336     def test_iacl_dst_ip(self):
337         """ Destination IP6 iACL test
338
339         Test scenario for basic IP ACL with destination IP
340             - Create IPv6 stream for pg0 -> pg1 interface.
341             - Create iACL with destination IP address.
342             - Send and verify received packets on pg1 interface.
343         """
344
345         # Basic iACL testing with destination IP
346         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
347         self.pg0.add_stream(pkts)
348
349         key = 'ip6_dst'
350         self.create_classify_table(
351             key,
352             self.build_ip6_mask(dst_ip='ffffffffffffffffffffffffffffffff'))
353         self.create_classify_session(
354             self.acl_tbl_idx.get(key),
355             self.build_ip6_match(dst_ip=self.pg1.remote_ip6))
356         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
357         self.acl_active_table = key
358
359         self.pg_enable_capture(self.pg_interfaces)
360         self.pg_start()
361
362         pkts = self.pg1.get_capture(len(pkts))
363         self.verify_capture(self.pg1, pkts)
364         self.pg0.assert_nothing_captured(remark="packets forwarded")
365         self.pg2.assert_nothing_captured(remark="packets forwarded")
366
367     def test_iacl_src_dst_ip(self):
368         """ Source and destination IP6 iACL test
369
370         Test scenario for basic IP ACL with source and destination IP
371             - Create IPv4 stream for pg0 -> pg1 interface.
372             - Create iACL with source and destination IP addresses.
373             - Send and verify received packets on pg1 interface.
374         """
375
376         # Basic iACL testing with source and destination IP
377         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
378         self.pg0.add_stream(pkts)
379
380         key = 'ip6'
381         self.create_classify_table(
382             key,
383             self.build_ip6_mask(src_ip='ffffffffffffffffffffffffffffffff',
384                                 dst_ip='ffffffffffffffffffffffffffffffff'))
385         self.create_classify_session(
386             self.acl_tbl_idx.get(key),
387             self.build_ip6_match(src_ip=self.pg0.remote_ip6,
388                                  dst_ip=self.pg1.remote_ip6))
389         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
390         self.acl_active_table = key
391
392         self.pg_enable_capture(self.pg_interfaces)
393         self.pg_start()
394
395         pkts = self.pg1.get_capture(len(pkts))
396         self.verify_capture(self.pg1, pkts)
397         self.pg0.assert_nothing_captured(remark="packets forwarded")
398         self.pg2.assert_nothing_captured(remark="packets forwarded")
399
400
401 # Tests split to different test case classes because of issue reported in
402 # ticket VPP-1336
403 class TestClassifierIP6UDP(TestClassifier):
404     """ Classifier IP6 UDP proto Test Case """
405
406     @classmethod
407     def setUpClass(cls):
408         super(TestClassifierIP6UDP, cls).setUpClass()
409
410     @classmethod
411     def tearDownClass(cls):
412         super(TestClassifierIP6UDP, cls).tearDownClass()
413
414     def test_iacl_proto_udp(self):
415         """ IP6 UDP protocol iACL test
416
417         Test scenario for basic protocol ACL with UDP protocol
418             - Create IPv6 stream for pg0 -> pg1 interface.
419             - Create iACL with UDP IP protocol.
420             - Send and verify received packets on pg1 interface.
421         """
422
423         # Basic iACL testing with UDP protocol
424         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
425         self.pg0.add_stream(pkts)
426
427         key = 'nh_udp'
428         self.create_classify_table(key, self.build_ip6_mask(nh='ff'))
429         self.create_classify_session(
430             self.acl_tbl_idx.get(key),
431             self.build_ip6_match(nh=socket.IPPROTO_UDP))
432         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
433         self.acl_active_table = key
434
435         self.pg_enable_capture(self.pg_interfaces)
436         self.pg_start()
437
438         pkts = self.pg1.get_capture(len(pkts))
439         self.verify_capture(self.pg1, pkts)
440         self.pg0.assert_nothing_captured(remark="packets forwarded")
441         self.pg2.assert_nothing_captured(remark="packets forwarded")
442
443     def test_iacl_proto_udp_sport(self):
444         """ IP6 UDP source port iACL test
445
446         Test scenario for basic protocol ACL with UDP and sport
447             - Create IPv6 stream for pg0 -> pg1 interface.
448             - Create iACL with UDP IP protocol and defined sport.
449             - Send and verify received packets on pg1 interface.
450         """
451
452         # Basic iACL testing with UDP and sport
453         sport = 38
454         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
455                                   UDP(sport=sport, dport=5678))
456         self.pg0.add_stream(pkts)
457
458         key = 'nh_udp_sport'
459         self.create_classify_table(
460             key, self.build_ip6_mask(nh='ff', src_port='ffff'))
461         self.create_classify_session(
462             self.acl_tbl_idx.get(key),
463             self.build_ip6_match(nh=socket.IPPROTO_UDP, src_port=sport))
464         self.input_acl_set_interface(
465             self.pg0, self.acl_tbl_idx.get(key))
466         self.acl_active_table = key
467
468         self.pg_enable_capture(self.pg_interfaces)
469         self.pg_start()
470
471         pkts = self.pg1.get_capture(len(pkts))
472         self.verify_capture(self.pg1, pkts)
473         self.pg0.assert_nothing_captured(remark="packets forwarded")
474         self.pg2.assert_nothing_captured(remark="packets forwarded")
475
476     def test_iacl_proto_udp_dport(self):
477         """ IP6 UDP destination port iACL test
478
479         Test scenario for basic protocol ACL with UDP and dport
480             - Create IPv6 stream for pg0 -> pg1 interface.
481             - Create iACL with UDP IP protocol and defined dport.
482             - Send and verify received packets on pg1 interface.
483         """
484
485         # Basic iACL testing with UDP and dport
486         dport = 427
487         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
488                                   UDP(sport=1234, dport=dport))
489         self.pg0.add_stream(pkts)
490
491         key = 'nh_udp_dport'
492         self.create_classify_table(
493             key, self.build_ip6_mask(nh='ff', dst_port='ffff'))
494         self.create_classify_session(
495             self.acl_tbl_idx.get(key),
496             self.build_ip6_match(nh=socket.IPPROTO_UDP, dst_port=dport))
497         self.input_acl_set_interface(
498             self.pg0, self.acl_tbl_idx.get(key))
499         self.acl_active_table = key
500
501         self.pg_enable_capture(self.pg_interfaces)
502         self.pg_start()
503
504         pkts = self.pg1.get_capture(len(pkts))
505         self.verify_capture(self.pg1, pkts)
506         self.pg0.assert_nothing_captured(remark="packets forwarded")
507         self.pg2.assert_nothing_captured(remark="packets forwarded")
508
509     def test_iacl_proto_udp_sport_dport(self):
510         """ IP6 UDP source and destination ports iACL test
511
512         Test scenario for basic protocol ACL with UDP and sport and dport
513             - Create IPv6 stream for pg0 -> pg1 interface.
514             - Create iACL with UDP IP protocol and defined sport and dport.
515             - Send and verify received packets on pg1 interface.
516         """
517
518         # Basic iACL testing with UDP and sport and dport
519         sport = 13720
520         dport = 9080
521         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
522                                   UDP(sport=sport, dport=dport))
523         self.pg0.add_stream(pkts)
524
525         key = 'nh_udp_ports'
526         self.create_classify_table(
527             key,
528             self.build_ip6_mask(nh='ff', src_port='ffff', dst_port='ffff'))
529         self.create_classify_session(
530             self.acl_tbl_idx.get(key),
531             self.build_ip6_match(nh=socket.IPPROTO_UDP, src_port=sport,
532                                  dst_port=dport))
533         self.input_acl_set_interface(
534             self.pg0, self.acl_tbl_idx.get(key))
535         self.acl_active_table = key
536
537         self.pg_enable_capture(self.pg_interfaces)
538         self.pg_start()
539
540         pkts = self.pg1.get_capture(len(pkts))
541         self.verify_capture(self.pg1, pkts)
542         self.pg0.assert_nothing_captured(remark="packets forwarded")
543         self.pg2.assert_nothing_captured(remark="packets forwarded")
544
545
546 class TestClassifierIP6TCP(TestClassifier):
547     """ Classifier IP6 TCP proto Test Case """
548
549     @classmethod
550     def setUpClass(cls):
551         super(TestClassifierIP6TCP, cls).setUpClass()
552
553     @classmethod
554     def tearDownClass(cls):
555         super(TestClassifierIP6TCP, cls).tearDownClass()
556
557     def test_iacl_proto_tcp(self):
558         """ IP6 TCP protocol iACL test
559
560         Test scenario for basic protocol ACL with TCP protocol
561             - Create IPv6 stream for pg0 -> pg1 interface.
562             - Create iACL with TCP IP protocol.
563             - Send and verify received packets on pg1 interface.
564         """
565
566         # Basic iACL testing with TCP protocol
567         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
568                                   TCP(sport=1234, dport=5678))
569         self.pg0.add_stream(pkts)
570
571         key = 'nh_tcp'
572         self.create_classify_table(key, self.build_ip6_mask(nh='ff'))
573         self.create_classify_session(
574             self.acl_tbl_idx.get(key),
575             self.build_ip6_match(nh=socket.IPPROTO_TCP))
576         self.input_acl_set_interface(
577             self.pg0, self.acl_tbl_idx.get(key))
578         self.acl_active_table = key
579
580         self.pg_enable_capture(self.pg_interfaces)
581         self.pg_start()
582
583         pkts = self.pg1.get_capture(len(pkts))
584         self.verify_capture(self.pg1, pkts, TCP)
585         self.pg0.assert_nothing_captured(remark="packets forwarded")
586         self.pg2.assert_nothing_captured(remark="packets forwarded")
587
588     def test_iacl_proto_tcp_sport(self):
589         """ IP6 TCP source port iACL test
590
591         Test scenario for basic protocol ACL with TCP and sport
592             - Create IPv6 stream for pg0 -> pg1 interface.
593             - Create iACL with TCP IP protocol and defined sport.
594             - Send and verify received packets on pg1 interface.
595         """
596
597         # Basic iACL testing with TCP and sport
598         sport = 38
599         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
600                                   TCP(sport=sport, dport=5678))
601         self.pg0.add_stream(pkts)
602
603         key = 'nh_tcp_sport'
604         self.create_classify_table(
605             key, self.build_ip6_mask(nh='ff', src_port='ffff'))
606         self.create_classify_session(
607             self.acl_tbl_idx.get(key),
608             self.build_ip6_match(nh=socket.IPPROTO_TCP, src_port=sport))
609         self.input_acl_set_interface(
610             self.pg0, self.acl_tbl_idx.get(key))
611         self.acl_active_table = key
612
613         self.pg_enable_capture(self.pg_interfaces)
614         self.pg_start()
615
616         pkts = self.pg1.get_capture(len(pkts))
617         self.verify_capture(self.pg1, pkts, TCP)
618         self.pg0.assert_nothing_captured(remark="packets forwarded")
619         self.pg2.assert_nothing_captured(remark="packets forwarded")
620
621     def test_iacl_proto_tcp_dport(self):
622         """ IP6 TCP destination port iACL test
623
624         Test scenario for basic protocol ACL with TCP and dport
625             - Create IPv6 stream for pg0 -> pg1 interface.
626             - Create iACL with TCP IP protocol and defined dport.
627             - Send and verify received packets on pg1 interface.
628         """
629
630         # Basic iACL testing with TCP and dport
631         dport = 427
632         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
633                                   TCP(sport=1234, dport=dport))
634         self.pg0.add_stream(pkts)
635
636         key = 'nh_tcp_dport'
637         self.create_classify_table(
638             key, self.build_ip6_mask(nh='ff', dst_port='ffff'))
639         self.create_classify_session(
640             self.acl_tbl_idx.get(key),
641             self.build_ip6_match(nh=socket.IPPROTO_TCP, dst_port=dport))
642         self.input_acl_set_interface(
643             self.pg0, self.acl_tbl_idx.get(key))
644         self.acl_active_table = key
645
646         self.pg_enable_capture(self.pg_interfaces)
647         self.pg_start()
648
649         pkts = self.pg1.get_capture(len(pkts))
650         self.verify_capture(self.pg1, pkts, TCP)
651         self.pg0.assert_nothing_captured(remark="packets forwarded")
652         self.pg2.assert_nothing_captured(remark="packets forwarded")
653
654     def test_iacl_proto_tcp_sport_dport(self):
655         """ IP6 TCP source and destination ports iACL test
656
657         Test scenario for basic protocol ACL with TCP and sport and dport
658             - Create IPv6 stream for pg0 -> pg1 interface.
659             - Create iACL with TCP IP protocol and defined sport and dport.
660             - Send and verify received packets on pg1 interface.
661         """
662
663         # Basic iACL testing with TCP and sport and dport
664         sport = 13720
665         dport = 9080
666         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
667                                   TCP(sport=sport, dport=dport))
668         self.pg0.add_stream(pkts)
669
670         key = 'nh_tcp_ports'
671         self.create_classify_table(
672             key,
673             self.build_ip6_mask(nh='ff', src_port='ffff', dst_port='ffff'))
674         self.create_classify_session(
675             self.acl_tbl_idx.get(key),
676             self.build_ip6_match(nh=socket.IPPROTO_TCP, src_port=sport,
677                                  dst_port=dport))
678         self.input_acl_set_interface(
679             self.pg0, self.acl_tbl_idx.get(key))
680         self.acl_active_table = key
681
682         self.pg_enable_capture(self.pg_interfaces)
683         self.pg_start()
684
685         pkts = self.pg1.get_capture(len(pkts))
686         self.verify_capture(self.pg1, pkts, TCP)
687         self.pg0.assert_nothing_captured(remark="packets forwarded")
688         self.pg2.assert_nothing_captured(remark="packets forwarded")
689
690
691 class TestClassifierIP6Out(TestClassifier):
692     """ Classifier output IP6 Test Case """
693
694     @classmethod
695     def setUpClass(cls):
696         super(TestClassifierIP6Out, cls).setUpClass()
697
698     @classmethod
699     def tearDownClass(cls):
700         super(TestClassifierIP6Out, cls).tearDownClass()
701
702     def test_acl_ip_out(self):
703         """ Output IP6 ACL test
704
705         Test scenario for basic IP ACL with source IP
706             - Create IPv6 stream for pg1 -> pg0 interface.
707             - Create ACL with source IP address.
708             - Send and verify received packets on pg0 interface.
709         """
710
711         # Basic oACL testing with source IP
712         pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes)
713         self.pg1.add_stream(pkts)
714
715         key = 'ip6_out'
716         self.create_classify_table(
717             key,
718             self.build_ip6_mask(src_ip='ffffffffffffffffffffffffffffffff'),
719             data_offset=0)
720         self.create_classify_session(
721             self.acl_tbl_idx.get(key),
722             self.build_ip6_match(src_ip=self.pg1.remote_ip6))
723         self.output_acl_set_interface(
724             self.pg0, self.acl_tbl_idx.get(key))
725         self.acl_active_table = key
726
727         self.pg_enable_capture(self.pg_interfaces)
728         self.pg_start()
729
730         pkts = self.pg0.get_capture(len(pkts))
731         self.verify_capture(self.pg0, pkts)
732         self.pg1.assert_nothing_captured(remark="packets forwarded")
733         self.pg2.assert_nothing_captured(remark="packets forwarded")
734
735
736 class TestClassifierIP6MAC(TestClassifier):
737     """ Classifier IP6 MAC Test Case """
738
739     @classmethod
740     def setUpClass(cls):
741         super(TestClassifierIP6MAC, cls).setUpClass()
742
743     @classmethod
744     def tearDownClass(cls):
745         super(TestClassifierIP6MAC, cls).tearDownClass()
746
747     def test_acl_mac(self):
748         """ IP6 MAC iACL test
749
750         Test scenario for basic MAC ACL with source MAC
751             - Create IPv6 stream for pg0 -> pg2 interface.
752             - Create ACL with source MAC address.
753             - Send and verify received packets on pg2 interface.
754         """
755
756         # Basic iACL testing with source MAC
757         pkts = self.create_stream(self.pg0, self.pg2, self.pg_if_packet_sizes)
758         self.pg0.add_stream(pkts)
759
760         key = 'mac'
761         self.create_classify_table(
762             key, self.build_mac_mask(src_mac='ffffffffffff'), data_offset=-14)
763         self.create_classify_session(
764             self.acl_tbl_idx.get(key),
765             self.build_mac_match(src_mac=self.pg0.remote_mac))
766         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
767         self.acl_active_table = key
768
769         self.pg_enable_capture(self.pg_interfaces)
770         self.pg_start()
771
772         pkts = self.pg2.get_capture(len(pkts))
773         self.verify_capture(self.pg2, pkts)
774         self.pg0.assert_nothing_captured(remark="packets forwarded")
775         self.pg1.assert_nothing_captured(remark="packets forwarded")
776
777
778 if __name__ == '__main__':
779     unittest.main(testRunner=VppTestRunner)