0522520d30950b9440d67575182cd506d48a5724
[vpp.git] / test / test_classifier_ip6.py
1 #!/usr/bin/env python
2
3 import unittest
4 import socket
5 import binascii
6 import sys
7
8 from framework import VppTestCase, VppTestRunner
9
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether
12 from scapy.layers.inet6 import IPv6, UDP, TCP
13 from util import ppp
14
15
16 class TestClassifier(VppTestCase):
17     """ Classifier Test Case """
18
19     @classmethod
20     def setUpClass(cls):
21         """
22         Perform standard class setup (defined by class method setUpClass in
23         class VppTestCase) before running the test case, set test case related
24         variables and configure VPP.
25         """
26         super(TestClassifier, cls).setUpClass()
27         cls.acl_active_table = ''
28
29     @classmethod
30     def tearDownClass(cls):
31         super(TestClassifier, cls).tearDownClass()
32
33     def setUp(self):
34         """
35         Perform test setup before test case.
36
37         **Config:**
38             - create 4 pg interfaces
39                 - untagged pg0/pg1/pg2 interface
40                     pg0 -------> pg1 (IP ACL)
41                            \
42                             ---> pg2 (MAC ACL))
43             - setup interfaces:
44                 - put it into UP state
45                 - set IPv6 addresses
46                 - resolve neighbor address using NDP
47
48         :ivar list interfaces: pg interfaces.
49         :ivar list pg_if_packet_sizes: packet sizes in test.
50         :ivar dict acl_tbl_idx: ACL table index.
51         :ivar int pbr_vrfid: VRF id for PBR test.
52         """
53         self.reset_packet_infos()
54         super(TestClassifier, self).setUp()
55
56         # create 4 pg interfaces
57         self.create_pg_interfaces(range(3))
58
59         # packet sizes to test
60         self.pg_if_packet_sizes = [64, 9018]
61
62         self.interfaces = list(self.pg_interfaces)
63
64         # ACL vars
65         self.acl_tbl_idx = {}
66
67         # setup all interfaces
68         for intf in self.interfaces:
69             intf.admin_up()
70             intf.config_ip6()
71             intf.resolve_ndp()
72
73     def tearDown(self):
74         """Run standard test teardown and acl related log."""
75         if not self.vpp_dead:
76             if self.acl_active_table == 'ip6_out':
77                 self.output_acl_set_interface(
78                     self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
79                 self.acl_active_table = ''
80             elif self.acl_active_table != '':
81                 self.input_acl_set_interface(
82                     self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
83                 self.acl_active_table = ''
84             for intf in self.interfaces:
85                 intf.unconfig_ip6()
86                 intf.admin_down()
87
88         super(TestClassifier, self).tearDown()
89
90     def show_commands_at_teardown(self):
91         self.logger.info(self.vapi.ppcli("show inacl type ip6"))
92         self.logger.info(self.vapi.ppcli("show outacl type ip6"))
93         self.logger.info(self.vapi.cli("show classify table verbose"))
94         self.logger.info(self.vapi.cli("show ip fib"))
95
96     def create_stream(self, src_if, dst_if, packet_sizes,
97                       proto_l=UDP(sport=1234, dport=5678)):
98         """Create input packet stream for defined interfaces.
99
100         :param VppInterface src_if: Source Interface for packet stream.
101         :param VppInterface dst_if: Destination Interface for packet stream.
102         :param list packet_sizes: packet size to test.
103         :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
104         """
105         pkts = []
106
107         for size in packet_sizes:
108             info = self.create_packet_info(src_if, dst_if)
109             payload = self.info_to_payload(info)
110             p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
111                  IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6) /
112                  proto_l /
113                  Raw(payload))
114             info.data = p.copy()
115             self.extend_packet(p, size)
116             pkts.append(p)
117         return pkts
118
119     def verify_capture(self, dst_if, capture, proto_l=UDP):
120         """Verify captured input packet stream for defined interface.
121
122         :param VppInterface dst_if: Interface to verify captured packet stream.
123         :param list capture: Captured packet stream.
124         :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
125         """
126         self.logger.info("Verifying capture on interface %s" % dst_if.name)
127         last_info = dict()
128         for i in self.interfaces:
129             last_info[i.sw_if_index] = None
130         dst_sw_if_index = dst_if.sw_if_index
131         for packet in capture:
132             try:
133                 ip6_received = packet[IPv6]
134                 proto_received = packet[proto_l]
135                 payload_info = self.payload_to_info(packet[Raw])
136                 packet_index = payload_info.index
137                 self.assertEqual(payload_info.dst, dst_sw_if_index)
138                 self.logger.debug(
139                     "Got packet on port %s: src=%u (id=%u)" %
140                     (dst_if.name, payload_info.src, packet_index))
141                 next_info = self.get_next_packet_info_for_interface2(
142                     payload_info.src, dst_sw_if_index,
143                     last_info[payload_info.src])
144                 last_info[payload_info.src] = next_info
145                 self.assertTrue(next_info is not None)
146                 self.assertEqual(packet_index, next_info.index)
147                 saved_packet = next_info.data
148                 ip_saved = saved_packet[IPv6]
149                 proto_saved = saved_packet[proto_l]
150                 # Check standard fields
151                 self.assertEqual(ip6_received.src, ip_saved.src)
152                 self.assertEqual(ip6_received.dst, ip_saved.dst)
153                 self.assertEqual(proto_received.sport, proto_saved.sport)
154                 self.assertEqual(proto_received.dport, proto_saved.dport)
155             except:
156                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
157                 raise
158         for i in self.interfaces:
159             remaining_packet = self.get_next_packet_info_for_interface2(
160                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
161             self.assertTrue(remaining_packet is None,
162                             "Interface %s: Packet expected from interface %s "
163                             "didn't arrive" % (dst_if.name, i.name))
164
165     @staticmethod
166     def build_ip6_mask(nh='', src_ip='', dst_ip='',
167                        src_port='', dst_port=''):
168         """Build IPv6 ACL mask data with hexstring format.
169
170         :param str nh: next header number <0-ff>
171         :param str src_ip: source ip address <0-ffffffff>
172         :param str dst_ip: destination ip address <0-ffffffff>
173         :param str src_port: source port number <0-ffff>
174         :param str dst_port: destination port number <0-ffff>
175         """
176
177         return ('{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}'.format(
178             nh, src_ip, dst_ip, src_port, dst_port)).rstrip('0')
179
180     @staticmethod
181     def build_ip6_match(nh=0, src_ip='', dst_ip='',
182                         src_port=0, dst_port=0):
183         """Build IPv6 ACL match data with hexstring format.
184
185         :param int nh: next header number with valid option "x"
186         :param str src_ip: source ip6 address with format of "xxx:xxxx::xxxx"
187         :param str dst_ip: destination ip6 address with format of
188             "xxx:xxxx::xxxx"
189         :param int src_port: source port number "x"
190         :param int dst_port: destination port number "x"
191         """
192         if src_ip:
193             if sys.version_info[0] == 2:
194                 src_ip = binascii.hexlify(socket.inet_pton(
195                     socket.AF_INET6, src_ip))
196             else:
197                 src_ip = socket.inet_pton(socket.AF_INET6, src_ip).hex()
198
199         if dst_ip:
200             if sys.version_info[0] == 2:
201                 dst_ip = binascii.hexlify(socket.inet_pton(
202                     socket.AF_INET6, dst_ip))
203             else:
204                 dst_ip = socket.inet_pton(socket.AF_INET6, dst_ip).hex()
205
206         return ('{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}'.format(
207             hex(nh)[2:], src_ip, dst_ip, hex(src_port)[2:],
208             hex(dst_port)[2:])).rstrip('0')
209
210     @staticmethod
211     def build_mac_mask(dst_mac='', src_mac='', ether_type=''):
212         """Build MAC ACL mask data with hexstring format.
213
214         :param str dst_mac: source MAC address <0-ffffffffffff>
215         :param str src_mac: destination MAC address <0-ffffffffffff>
216         :param str ether_type: ethernet type <0-ffff>
217         """
218
219         return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
220             dst_mac, src_mac, ether_type)).rstrip('0')
221
222     @staticmethod
223     def build_mac_match(dst_mac='', src_mac='', ether_type=''):
224         """Build MAC ACL match data with hexstring format.
225
226         :param str dst_mac: source MAC address <x:x:x:x:x:x>
227         :param str src_mac: destination MAC address <x:x:x:x:x:x>
228         :param str ether_type: ethernet type <0-ffff>
229         """
230         if dst_mac:
231             dst_mac = dst_mac.replace(':', '')
232         if src_mac:
233             src_mac = src_mac.replace(':', '')
234
235         return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
236             dst_mac, src_mac, ether_type)).rstrip('0')
237
238     def create_classify_table(self, key, mask, data_offset=0):
239         """Create Classify Table
240
241         :param str key: key for classify table (ex, ACL name).
242         :param str mask: mask value for interested traffic.
243         :param int data_offset:
244         """
245         r = self.vapi.classify_add_del_table(
246             is_add=1,
247             mask=binascii.unhexlify(mask),
248             match_n_vectors=(len(mask) - 1) // 32 + 1,
249             miss_next_index=0,
250             current_data_flag=1,
251             current_data_offset=data_offset)
252         self.assertIsNotNone(r, 'No response msg for add_del_table')
253         self.acl_tbl_idx[key] = r.new_table_index
254
255     def create_classify_session(self, table_index, match, vrfid=0, is_add=1):
256         """Create Classify Session
257
258         :param int table_index: table index to identify classify table.
259         :param str match: matched value for interested traffic.
260         :param int vrfid: VRF id.
261         :param int is_add: option to configure classify session.
262             - create(1) or delete(0)
263         """
264         r = self.vapi.classify_add_del_session(
265             is_add,
266             table_index,
267             binascii.unhexlify(match),
268             opaque_index=0,
269             metadata=vrfid)
270         self.assertIsNotNone(r, 'No response msg for add_del_session')
271
272     def input_acl_set_interface(self, intf, table_index, is_add=1):
273         """Configure Input ACL interface
274
275         :param VppInterface intf: Interface to apply Input ACL feature.
276         :param int table_index: table index to identify classify table.
277         :param int is_add: option to configure classify session.
278             - enable(1) or disable(0)
279         """
280         r = self.vapi.input_acl_set_interface(
281             is_add,
282             intf.sw_if_index,
283             ip6_table_index=table_index)
284         self.assertIsNotNone(r, 'No response msg for acl_set_interface')
285
286     def output_acl_set_interface(self, intf, table_index, is_add=1):
287         """Configure Output ACL interface
288
289         :param VppInterface intf: Interface to apply Output ACL feature.
290         :param int table_index: table index to identify classify table.
291         :param int is_add: option to configure classify session.
292             - enable(1) or disable(0)
293         """
294         r = self.vapi.output_acl_set_interface(
295             is_add,
296             intf.sw_if_index,
297             ip6_table_index=table_index)
298         self.assertIsNotNone(r, 'No response msg for acl_set_interface')
299
300
301 class TestClassifierIP6(TestClassifier):
302     """ Classifier IP6 Test Case """
303
304     @classmethod
305     def setUpClass(cls):
306         super(TestClassifierIP6, cls).setUpClass()
307
308     @classmethod
309     def tearDownClass(cls):
310         super(TestClassifierIP6, cls).tearDownClass()
311
312     def test_iacl_src_ip(self):
313         """ Source IP6 iACL test
314
315         Test scenario for basic IP ACL with source IP
316             - Create IPv6 stream for pg0 -> pg1 interface.
317             - Create iACL with source IP address.
318             - Send and verify received packets on pg1 interface.
319         """
320
321         # Basic iACL testing with source IP
322         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
323         self.pg0.add_stream(pkts)
324
325         key = 'ip6_src'
326         self.create_classify_table(
327             key,
328             self.build_ip6_mask(src_ip='ffffffffffffffffffffffffffffffff'))
329         self.create_classify_session(
330             self.acl_tbl_idx.get(key),
331             self.build_ip6_match(src_ip=self.pg0.remote_ip6))
332         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
333         self.acl_active_table = key
334
335         self.pg_enable_capture(self.pg_interfaces)
336         self.pg_start()
337
338         pkts = self.pg1.get_capture(len(pkts))
339         self.verify_capture(self.pg1, pkts)
340         self.pg0.assert_nothing_captured(remark="packets forwarded")
341         self.pg2.assert_nothing_captured(remark="packets forwarded")
342
343     def test_iacl_dst_ip(self):
344         """ Destination IP6 iACL test
345
346         Test scenario for basic IP ACL with destination IP
347             - Create IPv6 stream for pg0 -> pg1 interface.
348             - Create iACL with destination IP address.
349             - Send and verify received packets on pg1 interface.
350         """
351
352         # Basic iACL testing with destination IP
353         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
354         self.pg0.add_stream(pkts)
355
356         key = 'ip6_dst'
357         self.create_classify_table(
358             key,
359             self.build_ip6_mask(dst_ip='ffffffffffffffffffffffffffffffff'))
360         self.create_classify_session(
361             self.acl_tbl_idx.get(key),
362             self.build_ip6_match(dst_ip=self.pg1.remote_ip6))
363         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
364         self.acl_active_table = key
365
366         self.pg_enable_capture(self.pg_interfaces)
367         self.pg_start()
368
369         pkts = self.pg1.get_capture(len(pkts))
370         self.verify_capture(self.pg1, pkts)
371         self.pg0.assert_nothing_captured(remark="packets forwarded")
372         self.pg2.assert_nothing_captured(remark="packets forwarded")
373
374     def test_iacl_src_dst_ip(self):
375         """ Source and destination IP6 iACL test
376
377         Test scenario for basic IP ACL with source and destination IP
378             - Create IPv4 stream for pg0 -> pg1 interface.
379             - Create iACL with source and destination IP addresses.
380             - Send and verify received packets on pg1 interface.
381         """
382
383         # Basic iACL testing with source and destination IP
384         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
385         self.pg0.add_stream(pkts)
386
387         key = 'ip6'
388         self.create_classify_table(
389             key,
390             self.build_ip6_mask(src_ip='ffffffffffffffffffffffffffffffff',
391                                 dst_ip='ffffffffffffffffffffffffffffffff'))
392         self.create_classify_session(
393             self.acl_tbl_idx.get(key),
394             self.build_ip6_match(src_ip=self.pg0.remote_ip6,
395                                  dst_ip=self.pg1.remote_ip6))
396         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
397         self.acl_active_table = key
398
399         self.pg_enable_capture(self.pg_interfaces)
400         self.pg_start()
401
402         pkts = self.pg1.get_capture(len(pkts))
403         self.verify_capture(self.pg1, pkts)
404         self.pg0.assert_nothing_captured(remark="packets forwarded")
405         self.pg2.assert_nothing_captured(remark="packets forwarded")
406
407
408 # Tests split to different test case classes because of issue reported in
409 # ticket VPP-1336
410 class TestClassifierIP6UDP(TestClassifier):
411     """ Classifier IP6 UDP proto Test Case """
412
413     @classmethod
414     def setUpClass(cls):
415         super(TestClassifierIP6UDP, cls).setUpClass()
416
417     @classmethod
418     def tearDownClass(cls):
419         super(TestClassifierIP6UDP, cls).tearDownClass()
420
421     def test_iacl_proto_udp(self):
422         """ IP6 UDP protocol iACL test
423
424         Test scenario for basic protocol ACL with UDP protocol
425             - Create IPv6 stream for pg0 -> pg1 interface.
426             - Create iACL with UDP IP protocol.
427             - Send and verify received packets on pg1 interface.
428         """
429
430         # Basic iACL testing with UDP protocol
431         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
432         self.pg0.add_stream(pkts)
433
434         key = 'nh_udp'
435         self.create_classify_table(key, self.build_ip6_mask(nh='ff'))
436         self.create_classify_session(
437             self.acl_tbl_idx.get(key),
438             self.build_ip6_match(nh=socket.IPPROTO_UDP))
439         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
440         self.acl_active_table = key
441
442         self.pg_enable_capture(self.pg_interfaces)
443         self.pg_start()
444
445         pkts = self.pg1.get_capture(len(pkts))
446         self.verify_capture(self.pg1, pkts)
447         self.pg0.assert_nothing_captured(remark="packets forwarded")
448         self.pg2.assert_nothing_captured(remark="packets forwarded")
449
450     def test_iacl_proto_udp_sport(self):
451         """ IP6 UDP source port iACL test
452
453         Test scenario for basic protocol ACL with UDP and sport
454             - Create IPv6 stream for pg0 -> pg1 interface.
455             - Create iACL with UDP IP protocol and defined sport.
456             - Send and verify received packets on pg1 interface.
457         """
458
459         # Basic iACL testing with UDP and sport
460         sport = 38
461         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
462                                   UDP(sport=sport, dport=5678))
463         self.pg0.add_stream(pkts)
464
465         key = 'nh_udp_sport'
466         self.create_classify_table(
467             key, self.build_ip6_mask(nh='ff', src_port='ffff'))
468         self.create_classify_session(
469             self.acl_tbl_idx.get(key),
470             self.build_ip6_match(nh=socket.IPPROTO_UDP, src_port=sport))
471         self.input_acl_set_interface(
472             self.pg0, self.acl_tbl_idx.get(key))
473         self.acl_active_table = key
474
475         self.pg_enable_capture(self.pg_interfaces)
476         self.pg_start()
477
478         pkts = self.pg1.get_capture(len(pkts))
479         self.verify_capture(self.pg1, pkts)
480         self.pg0.assert_nothing_captured(remark="packets forwarded")
481         self.pg2.assert_nothing_captured(remark="packets forwarded")
482
483     def test_iacl_proto_udp_dport(self):
484         """ IP6 UDP destination port iACL test
485
486         Test scenario for basic protocol ACL with UDP and dport
487             - Create IPv6 stream for pg0 -> pg1 interface.
488             - Create iACL with UDP IP protocol and defined dport.
489             - Send and verify received packets on pg1 interface.
490         """
491
492         # Basic iACL testing with UDP and dport
493         dport = 427
494         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
495                                   UDP(sport=1234, dport=dport))
496         self.pg0.add_stream(pkts)
497
498         key = 'nh_udp_dport'
499         self.create_classify_table(
500             key, self.build_ip6_mask(nh='ff', dst_port='ffff'))
501         self.create_classify_session(
502             self.acl_tbl_idx.get(key),
503             self.build_ip6_match(nh=socket.IPPROTO_UDP, dst_port=dport))
504         self.input_acl_set_interface(
505             self.pg0, self.acl_tbl_idx.get(key))
506         self.acl_active_table = key
507
508         self.pg_enable_capture(self.pg_interfaces)
509         self.pg_start()
510
511         pkts = self.pg1.get_capture(len(pkts))
512         self.verify_capture(self.pg1, pkts)
513         self.pg0.assert_nothing_captured(remark="packets forwarded")
514         self.pg2.assert_nothing_captured(remark="packets forwarded")
515
516     def test_iacl_proto_udp_sport_dport(self):
517         """ IP6 UDP source and destination ports iACL test
518
519         Test scenario for basic protocol ACL with UDP and sport and dport
520             - Create IPv6 stream for pg0 -> pg1 interface.
521             - Create iACL with UDP IP protocol and defined sport and dport.
522             - Send and verify received packets on pg1 interface.
523         """
524
525         # Basic iACL testing with UDP and sport and dport
526         sport = 13720
527         dport = 9080
528         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
529                                   UDP(sport=sport, dport=dport))
530         self.pg0.add_stream(pkts)
531
532         key = 'nh_udp_ports'
533         self.create_classify_table(
534             key,
535             self.build_ip6_mask(nh='ff', src_port='ffff', dst_port='ffff'))
536         self.create_classify_session(
537             self.acl_tbl_idx.get(key),
538             self.build_ip6_match(nh=socket.IPPROTO_UDP, src_port=sport,
539                                  dst_port=dport))
540         self.input_acl_set_interface(
541             self.pg0, self.acl_tbl_idx.get(key))
542         self.acl_active_table = key
543
544         self.pg_enable_capture(self.pg_interfaces)
545         self.pg_start()
546
547         pkts = self.pg1.get_capture(len(pkts))
548         self.verify_capture(self.pg1, pkts)
549         self.pg0.assert_nothing_captured(remark="packets forwarded")
550         self.pg2.assert_nothing_captured(remark="packets forwarded")
551
552
553 class TestClassifierIP6TCP(TestClassifier):
554     """ Classifier IP6 TCP proto Test Case """
555
556     @classmethod
557     def setUpClass(cls):
558         super(TestClassifierIP6TCP, cls).setUpClass()
559
560     @classmethod
561     def tearDownClass(cls):
562         super(TestClassifierIP6TCP, cls).tearDownClass()
563
564     def test_iacl_proto_tcp(self):
565         """ IP6 TCP protocol iACL test
566
567         Test scenario for basic protocol ACL with TCP protocol
568             - Create IPv6 stream for pg0 -> pg1 interface.
569             - Create iACL with TCP IP protocol.
570             - Send and verify received packets on pg1 interface.
571         """
572
573         # Basic iACL testing with TCP protocol
574         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
575                                   TCP(sport=1234, dport=5678))
576         self.pg0.add_stream(pkts)
577
578         key = 'nh_tcp'
579         self.create_classify_table(key, self.build_ip6_mask(nh='ff'))
580         self.create_classify_session(
581             self.acl_tbl_idx.get(key),
582             self.build_ip6_match(nh=socket.IPPROTO_TCP))
583         self.input_acl_set_interface(
584             self.pg0, self.acl_tbl_idx.get(key))
585         self.acl_active_table = key
586
587         self.pg_enable_capture(self.pg_interfaces)
588         self.pg_start()
589
590         pkts = self.pg1.get_capture(len(pkts))
591         self.verify_capture(self.pg1, pkts, TCP)
592         self.pg0.assert_nothing_captured(remark="packets forwarded")
593         self.pg2.assert_nothing_captured(remark="packets forwarded")
594
595     def test_iacl_proto_tcp_sport(self):
596         """ IP6 TCP source port iACL test
597
598         Test scenario for basic protocol ACL with TCP and sport
599             - Create IPv6 stream for pg0 -> pg1 interface.
600             - Create iACL with TCP IP protocol and defined sport.
601             - Send and verify received packets on pg1 interface.
602         """
603
604         # Basic iACL testing with TCP and sport
605         sport = 38
606         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
607                                   TCP(sport=sport, dport=5678))
608         self.pg0.add_stream(pkts)
609
610         key = 'nh_tcp_sport'
611         self.create_classify_table(
612             key, self.build_ip6_mask(nh='ff', src_port='ffff'))
613         self.create_classify_session(
614             self.acl_tbl_idx.get(key),
615             self.build_ip6_match(nh=socket.IPPROTO_TCP, src_port=sport))
616         self.input_acl_set_interface(
617             self.pg0, self.acl_tbl_idx.get(key))
618         self.acl_active_table = key
619
620         self.pg_enable_capture(self.pg_interfaces)
621         self.pg_start()
622
623         pkts = self.pg1.get_capture(len(pkts))
624         self.verify_capture(self.pg1, pkts, TCP)
625         self.pg0.assert_nothing_captured(remark="packets forwarded")
626         self.pg2.assert_nothing_captured(remark="packets forwarded")
627
628     def test_iacl_proto_tcp_dport(self):
629         """ IP6 TCP destination port iACL test
630
631         Test scenario for basic protocol ACL with TCP and dport
632             - Create IPv6 stream for pg0 -> pg1 interface.
633             - Create iACL with TCP IP protocol and defined dport.
634             - Send and verify received packets on pg1 interface.
635         """
636
637         # Basic iACL testing with TCP and dport
638         dport = 427
639         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
640                                   TCP(sport=1234, dport=dport))
641         self.pg0.add_stream(pkts)
642
643         key = 'nh_tcp_dport'
644         self.create_classify_table(
645             key, self.build_ip6_mask(nh='ff', dst_port='ffff'))
646         self.create_classify_session(
647             self.acl_tbl_idx.get(key),
648             self.build_ip6_match(nh=socket.IPPROTO_TCP, dst_port=dport))
649         self.input_acl_set_interface(
650             self.pg0, self.acl_tbl_idx.get(key))
651         self.acl_active_table = key
652
653         self.pg_enable_capture(self.pg_interfaces)
654         self.pg_start()
655
656         pkts = self.pg1.get_capture(len(pkts))
657         self.verify_capture(self.pg1, pkts, TCP)
658         self.pg0.assert_nothing_captured(remark="packets forwarded")
659         self.pg2.assert_nothing_captured(remark="packets forwarded")
660
661     def test_iacl_proto_tcp_sport_dport(self):
662         """ IP6 TCP source and destination ports iACL test
663
664         Test scenario for basic protocol ACL with TCP and sport and dport
665             - Create IPv6 stream for pg0 -> pg1 interface.
666             - Create iACL with TCP IP protocol and defined sport and dport.
667             - Send and verify received packets on pg1 interface.
668         """
669
670         # Basic iACL testing with TCP and sport and dport
671         sport = 13720
672         dport = 9080
673         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
674                                   TCP(sport=sport, dport=dport))
675         self.pg0.add_stream(pkts)
676
677         key = 'nh_tcp_ports'
678         self.create_classify_table(
679             key,
680             self.build_ip6_mask(nh='ff', src_port='ffff', dst_port='ffff'))
681         self.create_classify_session(
682             self.acl_tbl_idx.get(key),
683             self.build_ip6_match(nh=socket.IPPROTO_TCP, src_port=sport,
684                                  dst_port=dport))
685         self.input_acl_set_interface(
686             self.pg0, self.acl_tbl_idx.get(key))
687         self.acl_active_table = key
688
689         self.pg_enable_capture(self.pg_interfaces)
690         self.pg_start()
691
692         pkts = self.pg1.get_capture(len(pkts))
693         self.verify_capture(self.pg1, pkts, TCP)
694         self.pg0.assert_nothing_captured(remark="packets forwarded")
695         self.pg2.assert_nothing_captured(remark="packets forwarded")
696
697
698 class TestClassifierIP6Out(TestClassifier):
699     """ Classifier output IP6 Test Case """
700
701     @classmethod
702     def setUpClass(cls):
703         super(TestClassifierIP6Out, cls).setUpClass()
704
705     @classmethod
706     def tearDownClass(cls):
707         super(TestClassifierIP6Out, cls).tearDownClass()
708
709     def test_acl_ip_out(self):
710         """ Output IP6 ACL test
711
712         Test scenario for basic IP ACL with source IP
713             - Create IPv6 stream for pg1 -> pg0 interface.
714             - Create ACL with source IP address.
715             - Send and verify received packets on pg0 interface.
716         """
717
718         # Basic oACL testing with source IP
719         pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes)
720         self.pg1.add_stream(pkts)
721
722         key = 'ip6_out'
723         self.create_classify_table(
724             key,
725             self.build_ip6_mask(src_ip='ffffffffffffffffffffffffffffffff'),
726             data_offset=0)
727         self.create_classify_session(
728             self.acl_tbl_idx.get(key),
729             self.build_ip6_match(src_ip=self.pg1.remote_ip6))
730         self.output_acl_set_interface(
731             self.pg0, self.acl_tbl_idx.get(key))
732         self.acl_active_table = key
733
734         self.pg_enable_capture(self.pg_interfaces)
735         self.pg_start()
736
737         pkts = self.pg0.get_capture(len(pkts))
738         self.verify_capture(self.pg0, pkts)
739         self.pg1.assert_nothing_captured(remark="packets forwarded")
740         self.pg2.assert_nothing_captured(remark="packets forwarded")
741
742
743 class TestClassifierIP6MAC(TestClassifier):
744     """ Classifier IP6 MAC Test Case """
745
746     @classmethod
747     def setUpClass(cls):
748         super(TestClassifierIP6MAC, cls).setUpClass()
749
750     @classmethod
751     def tearDownClass(cls):
752         super(TestClassifierIP6MAC, cls).tearDownClass()
753
754     def test_acl_mac(self):
755         """ IP6 MAC iACL test
756
757         Test scenario for basic MAC ACL with source MAC
758             - Create IPv6 stream for pg0 -> pg2 interface.
759             - Create ACL with source MAC address.
760             - Send and verify received packets on pg2 interface.
761         """
762
763         # Basic iACL testing with source MAC
764         pkts = self.create_stream(self.pg0, self.pg2, self.pg_if_packet_sizes)
765         self.pg0.add_stream(pkts)
766
767         key = 'mac'
768         self.create_classify_table(
769             key, self.build_mac_mask(src_mac='ffffffffffff'), data_offset=-14)
770         self.create_classify_session(
771             self.acl_tbl_idx.get(key),
772             self.build_mac_match(src_mac=self.pg0.remote_mac))
773         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
774         self.acl_active_table = key
775
776         self.pg_enable_capture(self.pg_interfaces)
777         self.pg_start()
778
779         pkts = self.pg2.get_capture(len(pkts))
780         self.verify_capture(self.pg2, pkts)
781         self.pg0.assert_nothing_captured(remark="packets forwarded")
782         self.pg1.assert_nothing_captured(remark="packets forwarded")
783
784
785 if __name__ == '__main__':
786     unittest.main(testRunner=VppTestRunner)