MAP: Convert from DPO to input feature.
[vpp.git] / test / test_classifier_ip6.py
1 #!/usr/bin/env python
2
3 import unittest
4 import socket
5 import binascii
6 import sys
7
8 from framework import VppTestCase, VppTestRunner
9
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether
12 from scapy.layers.inet6 import IPv6, UDP, TCP
13 from util import ppp
14
15
16 class TestClassifier(VppTestCase):
17     """ Classifier Test Case """
18
19     @classmethod
20     def setUpClass(cls):
21         """
22         Perform standard class setup (defined by class method setUpClass in
23         class VppTestCase) before running the test case, set test case related
24         variables and configure VPP.
25         """
26         super(TestClassifier, cls).setUpClass()
27         cls.acl_active_table = ''
28
29     def setUp(self):
30         """
31         Perform test setup before test case.
32
33         **Config:**
34             - create 4 pg interfaces
35                 - untagged pg0/pg1/pg2 interface
36                     pg0 -------> pg1 (IP ACL)
37                            \
38                             ---> pg2 (MAC ACL))
39             - setup interfaces:
40                 - put it into UP state
41                 - set IPv6 addresses
42                 - resolve neighbor address using NDP
43
44         :ivar list interfaces: pg interfaces.
45         :ivar list pg_if_packet_sizes: packet sizes in test.
46         :ivar dict acl_tbl_idx: ACL table index.
47         :ivar int pbr_vrfid: VRF id for PBR test.
48         """
49         self.reset_packet_infos()
50         super(TestClassifier, self).setUp()
51
52         # create 4 pg interfaces
53         self.create_pg_interfaces(range(3))
54
55         # packet sizes to test
56         self.pg_if_packet_sizes = [64, 9018]
57
58         self.interfaces = list(self.pg_interfaces)
59
60         # ACL vars
61         self.acl_tbl_idx = {}
62
63         # setup all interfaces
64         for intf in self.interfaces:
65             intf.admin_up()
66             intf.config_ip6()
67             intf.resolve_ndp()
68
69     def tearDown(self):
70         """Run standard test teardown and acl related log."""
71         if not self.vpp_dead:
72             self.logger.info(self.vapi.ppcli("show inacl type ip6"))
73             self.logger.info(self.vapi.ppcli("show outacl type ip6"))
74             self.logger.info(self.vapi.cli("show classify table verbose"))
75             self.logger.info(self.vapi.cli("show ip fib"))
76             if self.acl_active_table == 'ip6_out':
77                 self.output_acl_set_interface(
78                     self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
79                 self.acl_active_table = ''
80             elif self.acl_active_table != '':
81                 self.input_acl_set_interface(
82                     self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
83                 self.acl_active_table = ''
84             for intf in self.interfaces:
85                 intf.unconfig_ip6()
86                 intf.admin_down()
87
88         super(TestClassifier, self).tearDown()
89
90     def create_stream(self, src_if, dst_if, packet_sizes,
91                       proto_l=UDP(sport=1234, dport=5678)):
92         """Create input packet stream for defined interfaces.
93
94         :param VppInterface src_if: Source Interface for packet stream.
95         :param VppInterface dst_if: Destination Interface for packet stream.
96         :param list packet_sizes: packet size to test.
97         :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
98         """
99         pkts = []
100
101         for size in packet_sizes:
102             info = self.create_packet_info(src_if, dst_if)
103             payload = self.info_to_payload(info)
104             p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
105                  IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6) /
106                  proto_l /
107                  Raw(payload))
108             info.data = p.copy()
109             self.extend_packet(p, size)
110             pkts.append(p)
111         return pkts
112
113     def verify_capture(self, dst_if, capture, proto_l=UDP):
114         """Verify captured input packet stream for defined interface.
115
116         :param VppInterface dst_if: Interface to verify captured packet stream.
117         :param list capture: Captured packet stream.
118         :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
119         """
120         self.logger.info("Verifying capture on interface %s" % dst_if.name)
121         last_info = dict()
122         for i in self.interfaces:
123             last_info[i.sw_if_index] = None
124         dst_sw_if_index = dst_if.sw_if_index
125         for packet in capture:
126             try:
127                 ip6_received = packet[IPv6]
128                 proto_received = packet[proto_l]
129                 payload_info = self.payload_to_info(str(packet[Raw]))
130                 packet_index = payload_info.index
131                 self.assertEqual(payload_info.dst, dst_sw_if_index)
132                 self.logger.debug(
133                     "Got packet on port %s: src=%u (id=%u)" %
134                     (dst_if.name, payload_info.src, packet_index))
135                 next_info = self.get_next_packet_info_for_interface2(
136                     payload_info.src, dst_sw_if_index,
137                     last_info[payload_info.src])
138                 last_info[payload_info.src] = next_info
139                 self.assertTrue(next_info is not None)
140                 self.assertEqual(packet_index, next_info.index)
141                 saved_packet = next_info.data
142                 ip_saved = saved_packet[IPv6]
143                 proto_saved = saved_packet[proto_l]
144                 # Check standard fields
145                 self.assertEqual(ip6_received.src, ip_saved.src)
146                 self.assertEqual(ip6_received.dst, ip_saved.dst)
147                 self.assertEqual(proto_received.sport, proto_saved.sport)
148                 self.assertEqual(proto_received.dport, proto_saved.dport)
149             except:
150                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
151                 raise
152         for i in self.interfaces:
153             remaining_packet = self.get_next_packet_info_for_interface2(
154                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
155             self.assertTrue(remaining_packet is None,
156                             "Interface %s: Packet expected from interface %s "
157                             "didn't arrive" % (dst_if.name, i.name))
158
159     @staticmethod
160     def build_ip6_mask(nh='', src_ip='', dst_ip='',
161                        src_port='', dst_port=''):
162         """Build IPv6 ACL mask data with hexstring format.
163
164         :param str nh: next header number <0-ff>
165         :param str src_ip: source ip address <0-ffffffff>
166         :param str dst_ip: destination ip address <0-ffffffff>
167         :param str src_port: source port number <0-ffff>
168         :param str dst_port: destination port number <0-ffff>
169         """
170
171         return ('{:0>14}{:0>34}{:0>32}{:0>4}{:0>4}'.format(
172             nh, src_ip, dst_ip, src_port, dst_port)).rstrip('0')
173
174     @staticmethod
175     def build_ip6_match(nh=0, src_ip='', dst_ip='',
176                         src_port=0, dst_port=0):
177         """Build IPv6 ACL match data with hexstring format.
178
179         :param int nh: next header number with valid option "x"
180         :param str src_ip: source ip6 address with format of "xxx:xxxx::xxxx"
181         :param str dst_ip: destination ip6 address with format of
182             "xxx:xxxx::xxxx"
183         :param int src_port: source port number "x"
184         :param int dst_port: destination port number "x"
185         """
186         if src_ip:
187             src_ip = binascii.hexlify(socket.inet_pton(
188                 socket.AF_INET6, src_ip))
189         if dst_ip:
190             dst_ip = binascii.hexlify(socket.inet_pton(
191                 socket.AF_INET6, dst_ip))
192
193         return ('{:0>14}{:0>34}{:0>32}{:0>4}{:0>4}'.format(
194             hex(nh)[2:], src_ip, dst_ip, hex(src_port)[2:],
195             hex(dst_port)[2:])).rstrip('0')
196
197     @staticmethod
198     def build_mac_mask(dst_mac='', src_mac='', ether_type=''):
199         """Build MAC ACL mask data with hexstring format.
200
201         :param str dst_mac: source MAC address <0-ffffffffffff>
202         :param str src_mac: destination MAC address <0-ffffffffffff>
203         :param str ether_type: ethernet type <0-ffff>
204         """
205
206         return ('{:0>12}{:0>12}{:0>4}'.format(dst_mac, src_mac,
207                                               ether_type)).rstrip('0')
208
209     @staticmethod
210     def build_mac_match(dst_mac='', src_mac='', ether_type=''):
211         """Build MAC ACL match data with hexstring format.
212
213         :param str dst_mac: source MAC address <x:x:x:x:x:x>
214         :param str src_mac: destination MAC address <x:x:x:x:x:x>
215         :param str ether_type: ethernet type <0-ffff>
216         """
217         if dst_mac:
218             dst_mac = dst_mac.replace(':', '')
219         if src_mac:
220             src_mac = src_mac.replace(':', '')
221
222         return ('{:0>12}{:0>12}{:0>4}'.format(dst_mac, src_mac,
223                                               ether_type)).rstrip('0')
224
225     def create_classify_table(self, key, mask, data_offset=0):
226         """Create Classify Table
227
228         :param str key: key for classify table (ex, ACL name).
229         :param str mask: mask value for interested traffic.
230         :param int data_offset:
231         """
232         r = self.vapi.classify_add_del_table(
233             is_add=1,
234             mask=binascii.unhexlify(mask),
235             match_n_vectors=(len(mask) - 1) // 32 + 1,
236             miss_next_index=0,
237             current_data_flag=1,
238             current_data_offset=data_offset)
239         self.assertIsNotNone(r, msg='No response msg for add_del_table')
240         self.acl_tbl_idx[key] = r.new_table_index
241
242     def create_classify_session(self, table_index, match, vrfid=0, is_add=1):
243         """Create Classify Session
244
245         :param int table_index: table index to identify classify table.
246         :param str match: matched value for interested traffic.
247         :param int vrfid: VRF id.
248         :param int is_add: option to configure classify session.
249             - create(1) or delete(0)
250         """
251         r = self.vapi.classify_add_del_session(
252             is_add,
253             table_index,
254             binascii.unhexlify(match),
255             opaque_index=0,
256             metadata=vrfid)
257         self.assertIsNotNone(r, msg='No response msg for add_del_session')
258
259     def input_acl_set_interface(self, intf, table_index, is_add=1):
260         """Configure Input ACL interface
261
262         :param VppInterface intf: Interface to apply Input ACL feature.
263         :param int table_index: table index to identify classify table.
264         :param int is_add: option to configure classify session.
265             - enable(1) or disable(0)
266         """
267         r = self.vapi.input_acl_set_interface(
268             is_add,
269             intf.sw_if_index,
270             ip6_table_index=table_index)
271         self.assertIsNotNone(r, msg='No response msg for acl_set_interface')
272
273     def output_acl_set_interface(self, intf, table_index, is_add=1):
274         """Configure Output ACL interface
275
276         :param VppInterface intf: Interface to apply Output ACL feature.
277         :param int table_index: table index to identify classify table.
278         :param int is_add: option to configure classify session.
279             - enable(1) or disable(0)
280         """
281         r = self.vapi.output_acl_set_interface(
282             is_add,
283             intf.sw_if_index,
284             ip6_table_index=table_index)
285         self.assertIsNotNone(r, msg='No response msg for acl_set_interface')
286
287
288 class TestClassifierIP6(TestClassifier):
289     """ Classifier IP6 Test Case """
290
291     def test_iacl_src_ip(self):
292         """ Source IP6 iACL test
293
294         Test scenario for basic IP ACL with source IP
295             - Create IPv6 stream for pg0 -> pg1 interface.
296             - Create iACL with source IP address.
297             - Send and verify received packets on pg1 interface.
298         """
299
300         # Basic iACL testing with source IP
301         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
302         self.pg0.add_stream(pkts)
303
304         key = 'ip6_src'
305         self.create_classify_table(
306             key,
307             self.build_ip6_mask(src_ip='ffffffffffffffffffffffffffffffff'))
308         self.create_classify_session(
309             self.acl_tbl_idx.get(key),
310             self.build_ip6_match(src_ip=self.pg0.remote_ip6))
311         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
312         self.acl_active_table = key
313
314         self.pg_enable_capture(self.pg_interfaces)
315         self.pg_start()
316
317         pkts = self.pg1.get_capture(len(pkts))
318         self.verify_capture(self.pg1, pkts)
319         self.pg0.assert_nothing_captured(remark="packets forwarded")
320         self.pg2.assert_nothing_captured(remark="packets forwarded")
321
322     def test_iacl_dst_ip(self):
323         """ Destination IP6 iACL test
324
325         Test scenario for basic IP ACL with destination IP
326             - Create IPv6 stream for pg0 -> pg1 interface.
327             - Create iACL with destination IP address.
328             - Send and verify received packets on pg1 interface.
329         """
330
331         # Basic iACL testing with destination IP
332         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
333         self.pg0.add_stream(pkts)
334
335         key = 'ip6_dst'
336         self.create_classify_table(
337             key,
338             self.build_ip6_mask(dst_ip='ffffffffffffffffffffffffffffffff'))
339         self.create_classify_session(
340             self.acl_tbl_idx.get(key),
341             self.build_ip6_match(dst_ip=self.pg1.remote_ip6))
342         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
343         self.acl_active_table = key
344
345         self.pg_enable_capture(self.pg_interfaces)
346         self.pg_start()
347
348         pkts = self.pg1.get_capture(len(pkts))
349         self.verify_capture(self.pg1, pkts)
350         self.pg0.assert_nothing_captured(remark="packets forwarded")
351         self.pg2.assert_nothing_captured(remark="packets forwarded")
352
353     def test_iacl_src_dst_ip(self):
354         """ Source and destination IP6 iACL test
355
356         Test scenario for basic IP ACL with source and destination IP
357             - Create IPv4 stream for pg0 -> pg1 interface.
358             - Create iACL with source and destination IP addresses.
359             - Send and verify received packets on pg1 interface.
360         """
361
362         # Basic iACL testing with source and destination IP
363         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
364         self.pg0.add_stream(pkts)
365
366         key = 'ip6'
367         self.create_classify_table(
368             key,
369             self.build_ip6_mask(src_ip='ffffffffffffffffffffffffffffffff',
370                                 dst_ip='ffffffffffffffffffffffffffffffff'))
371         self.create_classify_session(
372             self.acl_tbl_idx.get(key),
373             self.build_ip6_match(src_ip=self.pg0.remote_ip6,
374                                  dst_ip=self.pg1.remote_ip6))
375         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
376         self.acl_active_table = key
377
378         self.pg_enable_capture(self.pg_interfaces)
379         self.pg_start()
380
381         pkts = self.pg1.get_capture(len(pkts))
382         self.verify_capture(self.pg1, pkts)
383         self.pg0.assert_nothing_captured(remark="packets forwarded")
384         self.pg2.assert_nothing_captured(remark="packets forwarded")
385
386
387 # Tests split to different test case classes because of issue reported in
388 # ticket VPP-1336
389 class TestClassifierIP6UDP(TestClassifier):
390     """ Classifier IP6 UDP proto Test Case """
391
392     def test_iacl_proto_udp(self):
393         """ IP6 UDP protocol iACL test
394
395         Test scenario for basic protocol ACL with UDP protocol
396             - Create IPv6 stream for pg0 -> pg1 interface.
397             - Create iACL with UDP IP protocol.
398             - Send and verify received packets on pg1 interface.
399         """
400
401         # Basic iACL testing with UDP protocol
402         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
403         self.pg0.add_stream(pkts)
404
405         key = 'nh_udp'
406         self.create_classify_table(key, self.build_ip6_mask(nh='ff'))
407         self.create_classify_session(
408             self.acl_tbl_idx.get(key),
409             self.build_ip6_match(nh=socket.IPPROTO_UDP))
410         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
411         self.acl_active_table = key
412
413         self.pg_enable_capture(self.pg_interfaces)
414         self.pg_start()
415
416         pkts = self.pg1.get_capture(len(pkts))
417         self.verify_capture(self.pg1, pkts)
418         self.pg0.assert_nothing_captured(remark="packets forwarded")
419         self.pg2.assert_nothing_captured(remark="packets forwarded")
420
421     def test_iacl_proto_udp_sport(self):
422         """ IP6 UDP source port iACL test
423
424         Test scenario for basic protocol ACL with UDP and sport
425             - Create IPv6 stream for pg0 -> pg1 interface.
426             - Create iACL with UDP IP protocol and defined sport.
427             - Send and verify received packets on pg1 interface.
428         """
429
430         # Basic iACL testing with UDP and sport
431         sport = 38
432         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
433                                   UDP(sport=sport, dport=5678))
434         self.pg0.add_stream(pkts)
435
436         key = 'nh_udp_sport'
437         self.create_classify_table(
438             key, self.build_ip6_mask(nh='ff', src_port='ffff'))
439         self.create_classify_session(
440             self.acl_tbl_idx.get(key),
441             self.build_ip6_match(nh=socket.IPPROTO_UDP, src_port=sport))
442         self.input_acl_set_interface(
443             self.pg0, self.acl_tbl_idx.get(key))
444         self.acl_active_table = key
445
446         self.pg_enable_capture(self.pg_interfaces)
447         self.pg_start()
448
449         pkts = self.pg1.get_capture(len(pkts))
450         self.verify_capture(self.pg1, pkts)
451         self.pg0.assert_nothing_captured(remark="packets forwarded")
452         self.pg2.assert_nothing_captured(remark="packets forwarded")
453
454     def test_iacl_proto_udp_dport(self):
455         """ IP6 UDP destination port iACL test
456
457         Test scenario for basic protocol ACL with UDP and dport
458             - Create IPv6 stream for pg0 -> pg1 interface.
459             - Create iACL with UDP IP protocol and defined dport.
460             - Send and verify received packets on pg1 interface.
461         """
462
463         # Basic iACL testing with UDP and dport
464         dport = 427
465         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
466                                   UDP(sport=1234, dport=dport))
467         self.pg0.add_stream(pkts)
468
469         key = 'nh_udp_dport'
470         self.create_classify_table(
471             key, self.build_ip6_mask(nh='ff', dst_port='ffff'))
472         self.create_classify_session(
473             self.acl_tbl_idx.get(key),
474             self.build_ip6_match(nh=socket.IPPROTO_UDP, dst_port=dport))
475         self.input_acl_set_interface(
476             self.pg0, self.acl_tbl_idx.get(key))
477         self.acl_active_table = key
478
479         self.pg_enable_capture(self.pg_interfaces)
480         self.pg_start()
481
482         pkts = self.pg1.get_capture(len(pkts))
483         self.verify_capture(self.pg1, pkts)
484         self.pg0.assert_nothing_captured(remark="packets forwarded")
485         self.pg2.assert_nothing_captured(remark="packets forwarded")
486
487     def test_iacl_proto_udp_sport_dport(self):
488         """ IP6 UDP source and destination ports iACL test
489
490         Test scenario for basic protocol ACL with UDP and sport and dport
491             - Create IPv6 stream for pg0 -> pg1 interface.
492             - Create iACL with UDP IP protocol and defined sport and dport.
493             - Send and verify received packets on pg1 interface.
494         """
495
496         # Basic iACL testing with UDP and sport and dport
497         sport = 13720
498         dport = 9080
499         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
500                                   UDP(sport=sport, dport=dport))
501         self.pg0.add_stream(pkts)
502
503         key = 'nh_udp_ports'
504         self.create_classify_table(
505             key,
506             self.build_ip6_mask(nh='ff', src_port='ffff', dst_port='ffff'))
507         self.create_classify_session(
508             self.acl_tbl_idx.get(key),
509             self.build_ip6_match(nh=socket.IPPROTO_UDP, src_port=sport,
510                                  dst_port=dport))
511         self.input_acl_set_interface(
512             self.pg0, self.acl_tbl_idx.get(key))
513         self.acl_active_table = key
514
515         self.pg_enable_capture(self.pg_interfaces)
516         self.pg_start()
517
518         pkts = self.pg1.get_capture(len(pkts))
519         self.verify_capture(self.pg1, pkts)
520         self.pg0.assert_nothing_captured(remark="packets forwarded")
521         self.pg2.assert_nothing_captured(remark="packets forwarded")
522
523
524 class TestClassifierIP6TCP(TestClassifier):
525     """ Classifier IP6 TCP proto Test Case """
526
527     def test_iacl_proto_tcp(self):
528         """ IP6 TCP protocol iACL test
529
530         Test scenario for basic protocol ACL with TCP protocol
531             - Create IPv6 stream for pg0 -> pg1 interface.
532             - Create iACL with TCP IP protocol.
533             - Send and verify received packets on pg1 interface.
534         """
535
536         # Basic iACL testing with TCP protocol
537         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
538                                   TCP(sport=1234, dport=5678))
539         self.pg0.add_stream(pkts)
540
541         key = 'nh_tcp'
542         self.create_classify_table(key, self.build_ip6_mask(nh='ff'))
543         self.create_classify_session(
544             self.acl_tbl_idx.get(key),
545             self.build_ip6_match(nh=socket.IPPROTO_TCP))
546         self.input_acl_set_interface(
547             self.pg0, self.acl_tbl_idx.get(key))
548         self.acl_active_table = key
549
550         self.pg_enable_capture(self.pg_interfaces)
551         self.pg_start()
552
553         pkts = self.pg1.get_capture(len(pkts))
554         self.verify_capture(self.pg1, pkts, TCP)
555         self.pg0.assert_nothing_captured(remark="packets forwarded")
556         self.pg2.assert_nothing_captured(remark="packets forwarded")
557
558     def test_iacl_proto_tcp_sport(self):
559         """ IP6 TCP source port iACL test
560
561         Test scenario for basic protocol ACL with TCP and sport
562             - Create IPv6 stream for pg0 -> pg1 interface.
563             - Create iACL with TCP IP protocol and defined sport.
564             - Send and verify received packets on pg1 interface.
565         """
566
567         # Basic iACL testing with TCP and sport
568         sport = 38
569         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
570                                   TCP(sport=sport, dport=5678))
571         self.pg0.add_stream(pkts)
572
573         key = 'nh_tcp_sport'
574         self.create_classify_table(
575             key, self.build_ip6_mask(nh='ff', src_port='ffff'))
576         self.create_classify_session(
577             self.acl_tbl_idx.get(key),
578             self.build_ip6_match(nh=socket.IPPROTO_TCP, src_port=sport))
579         self.input_acl_set_interface(
580             self.pg0, self.acl_tbl_idx.get(key))
581         self.acl_active_table = key
582
583         self.pg_enable_capture(self.pg_interfaces)
584         self.pg_start()
585
586         pkts = self.pg1.get_capture(len(pkts))
587         self.verify_capture(self.pg1, pkts, TCP)
588         self.pg0.assert_nothing_captured(remark="packets forwarded")
589         self.pg2.assert_nothing_captured(remark="packets forwarded")
590
591     def test_iacl_proto_tcp_dport(self):
592         """ IP6 TCP destination port iACL test
593
594         Test scenario for basic protocol ACL with TCP and dport
595             - Create IPv6 stream for pg0 -> pg1 interface.
596             - Create iACL with TCP IP protocol and defined dport.
597             - Send and verify received packets on pg1 interface.
598         """
599
600         # Basic iACL testing with TCP and dport
601         dport = 427
602         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
603                                   TCP(sport=1234, dport=dport))
604         self.pg0.add_stream(pkts)
605
606         key = 'nh_tcp_dport'
607         self.create_classify_table(
608             key, self.build_ip6_mask(nh='ff', dst_port='ffff'))
609         self.create_classify_session(
610             self.acl_tbl_idx.get(key),
611             self.build_ip6_match(nh=socket.IPPROTO_TCP, dst_port=dport))
612         self.input_acl_set_interface(
613             self.pg0, self.acl_tbl_idx.get(key))
614         self.acl_active_table = key
615
616         self.pg_enable_capture(self.pg_interfaces)
617         self.pg_start()
618
619         pkts = self.pg1.get_capture(len(pkts))
620         self.verify_capture(self.pg1, pkts, TCP)
621         self.pg0.assert_nothing_captured(remark="packets forwarded")
622         self.pg2.assert_nothing_captured(remark="packets forwarded")
623
624     def test_iacl_proto_tcp_sport_dport(self):
625         """ IP6 TCP source and destination ports iACL test
626
627         Test scenario for basic protocol ACL with TCP and sport and dport
628             - Create IPv6 stream for pg0 -> pg1 interface.
629             - Create iACL with TCP IP protocol and defined sport and dport.
630             - Send and verify received packets on pg1 interface.
631         """
632
633         # Basic iACL testing with TCP and sport and dport
634         sport = 13720
635         dport = 9080
636         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
637                                   TCP(sport=sport, dport=dport))
638         self.pg0.add_stream(pkts)
639
640         key = 'nh_tcp_ports'
641         self.create_classify_table(
642             key,
643             self.build_ip6_mask(nh='ff', src_port='ffff', dst_port='ffff'))
644         self.create_classify_session(
645             self.acl_tbl_idx.get(key),
646             self.build_ip6_match(nh=socket.IPPROTO_TCP, src_port=sport,
647                                  dst_port=dport))
648         self.input_acl_set_interface(
649             self.pg0, self.acl_tbl_idx.get(key))
650         self.acl_active_table = key
651
652         self.pg_enable_capture(self.pg_interfaces)
653         self.pg_start()
654
655         pkts = self.pg1.get_capture(len(pkts))
656         self.verify_capture(self.pg1, pkts, TCP)
657         self.pg0.assert_nothing_captured(remark="packets forwarded")
658         self.pg2.assert_nothing_captured(remark="packets forwarded")
659
660
661 class TestClassifierIP6Out(TestClassifier):
662     """ Classifier output IP6 Test Case """
663
664     def test_acl_ip_out(self):
665         """ Output IP6 ACL test
666
667         Test scenario for basic IP ACL with source IP
668             - Create IPv6 stream for pg1 -> pg0 interface.
669             - Create ACL with source IP address.
670             - Send and verify received packets on pg0 interface.
671         """
672
673         # Basic oACL testing with source IP
674         pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes)
675         self.pg1.add_stream(pkts)
676
677         key = 'ip6_out'
678         self.create_classify_table(
679             key,
680             self.build_ip6_mask(src_ip='ffffffffffffffffffffffffffffffff'),
681             data_offset=0)
682         self.create_classify_session(
683             self.acl_tbl_idx.get(key),
684             self.build_ip6_match(src_ip=self.pg1.remote_ip6))
685         self.output_acl_set_interface(
686             self.pg0, self.acl_tbl_idx.get(key))
687         self.acl_active_table = key
688
689         self.pg_enable_capture(self.pg_interfaces)
690         self.pg_start()
691
692         pkts = self.pg0.get_capture(len(pkts))
693         self.verify_capture(self.pg0, pkts)
694         self.pg1.assert_nothing_captured(remark="packets forwarded")
695         self.pg2.assert_nothing_captured(remark="packets forwarded")
696
697
698 class TestClassifierIP6MAC(TestClassifier):
699     """ Classifier IP6 MAC Test Case """
700
701     def test_acl_mac(self):
702         """ IP6 MAC iACL test
703
704         Test scenario for basic MAC ACL with source MAC
705             - Create IPv6 stream for pg0 -> pg2 interface.
706             - Create ACL with source MAC address.
707             - Send and verify received packets on pg2 interface.
708         """
709
710         # Basic iACL testing with source MAC
711         pkts = self.create_stream(self.pg0, self.pg2, self.pg_if_packet_sizes)
712         self.pg0.add_stream(pkts)
713
714         key = 'mac'
715         self.create_classify_table(
716             key, self.build_mac_mask(src_mac='ffffffffffff'), data_offset=-14)
717         self.create_classify_session(
718             self.acl_tbl_idx.get(key),
719             self.build_mac_match(src_mac=self.pg0.remote_mac))
720         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
721         self.acl_active_table = key
722
723         self.pg_enable_capture(self.pg_interfaces)
724         self.pg_start()
725
726         pkts = self.pg2.get_capture(len(pkts))
727         self.verify_capture(self.pg2, pkts)
728         self.pg0.assert_nothing_captured(remark="packets forwarded")
729         self.pg1.assert_nothing_captured(remark="packets forwarded")
730
731
732 if __name__ == '__main__':
733     unittest.main(testRunner=VppTestRunner)