IGMP: proxy device
[vpp.git] / test / test_classifier_ip6.py
1 #!/usr/bin/env python
2
3 import unittest
4 import socket
5 import binascii
6 import sys
7
8 from framework import VppTestCase, VppTestRunner
9
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether
12 from scapy.layers.inet6 import IPv6, UDP, TCP
13 from util import ppp
14
15
16 class TestClassifier(VppTestCase):
17     """ Classifier Test Case """
18
19     @classmethod
20     def setUpClass(cls):
21         """
22         Perform standard class setup (defined by class method setUpClass in
23         class VppTestCase) before running the test case, set test case related
24         variables and configure VPP.
25         """
26         super(TestClassifier, cls).setUpClass()
27         cls.acl_active_table = ''
28
29     def setUp(self):
30         """
31         Perform test setup before test case.
32
33         **Config:**
34             - create 4 pg interfaces
35                 - untagged pg0/pg1/pg2 interface
36                     pg0 -------> pg1 (IP ACL)
37                            \
38                             ---> pg2 (MAC ACL))
39             - setup interfaces:
40                 - put it into UP state
41                 - set IPv6 addresses
42                 - resolve neighbor address using NDP
43
44         :ivar list interfaces: pg interfaces.
45         :ivar list pg_if_packet_sizes: packet sizes in test.
46         :ivar dict acl_tbl_idx: ACL table index.
47         :ivar int pbr_vrfid: VRF id for PBR test.
48         """
49         self.reset_packet_infos()
50         super(TestClassifier, self).setUp()
51
52         # create 4 pg interfaces
53         self.create_pg_interfaces(range(3))
54
55         # packet sizes to test
56         self.pg_if_packet_sizes = [64, 9018]
57
58         self.interfaces = list(self.pg_interfaces)
59
60         # ACL vars
61         self.acl_tbl_idx = {}
62
63         # setup all interfaces
64         for intf in self.interfaces:
65             intf.admin_up()
66             intf.config_ip6()
67             intf.resolve_ndp()
68
69     def tearDown(self):
70         """Run standard test teardown and acl related log."""
71         if not self.vpp_dead:
72             self.logger.info(self.vapi.ppcli("show inacl type ip6"))
73             self.logger.info(self.vapi.ppcli("show outacl type ip6"))
74             self.logger.info(self.vapi.cli("show classify table verbose"))
75             self.logger.info(self.vapi.cli("show ip fib"))
76             if self.acl_active_table == 'ip6_out':
77                 self.output_acl_set_interface(
78                     self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
79                 self.acl_active_table = ''
80             elif self.acl_active_table != '':
81                 self.input_acl_set_interface(
82                     self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
83                 self.acl_active_table = ''
84             for intf in self.interfaces:
85                 intf.unconfig_ip6()
86                 intf.admin_down()
87
88         super(TestClassifier, self).tearDown()
89
90     def create_stream(self, src_if, dst_if, packet_sizes,
91                       proto_l=UDP(sport=1234, dport=5678)):
92         """Create input packet stream for defined interfaces.
93
94         :param VppInterface src_if: Source Interface for packet stream.
95         :param VppInterface dst_if: Destination Interface for packet stream.
96         :param list packet_sizes: packet size to test.
97         :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
98         """
99         pkts = []
100
101         for size in packet_sizes:
102             info = self.create_packet_info(src_if, dst_if)
103             payload = self.info_to_payload(info)
104             p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
105                  IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6) /
106                  proto_l /
107                  Raw(payload))
108             info.data = p.copy()
109             self.extend_packet(p, size)
110             pkts.append(p)
111         return pkts
112
113     def verify_capture(self, dst_if, capture, proto_l=UDP):
114         """Verify captured input packet stream for defined interface.
115
116         :param VppInterface dst_if: Interface to verify captured packet stream.
117         :param list capture: Captured packet stream.
118         :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
119         """
120         self.logger.info("Verifying capture on interface %s" % dst_if.name)
121         last_info = dict()
122         for i in self.interfaces:
123             last_info[i.sw_if_index] = None
124         dst_sw_if_index = dst_if.sw_if_index
125         for packet in capture:
126             try:
127                 ip6_received = packet[IPv6]
128                 proto_received = packet[proto_l]
129                 payload_info = self.payload_to_info(str(packet[Raw]))
130                 packet_index = payload_info.index
131                 self.assertEqual(payload_info.dst, dst_sw_if_index)
132                 self.logger.debug(
133                     "Got packet on port %s: src=%u (id=%u)" %
134                     (dst_if.name, payload_info.src, packet_index))
135                 next_info = self.get_next_packet_info_for_interface2(
136                     payload_info.src, dst_sw_if_index,
137                     last_info[payload_info.src])
138                 last_info[payload_info.src] = next_info
139                 self.assertTrue(next_info is not None)
140                 self.assertEqual(packet_index, next_info.index)
141                 saved_packet = next_info.data
142                 ip_saved = saved_packet[IPv6]
143                 proto_saved = saved_packet[proto_l]
144                 # Check standard fields
145                 self.assertEqual(ip6_received.src, ip_saved.src)
146                 self.assertEqual(ip6_received.dst, ip_saved.dst)
147                 self.assertEqual(proto_received.sport, proto_saved.sport)
148                 self.assertEqual(proto_received.dport, proto_saved.dport)
149             except:
150                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
151                 raise
152         for i in self.interfaces:
153             remaining_packet = self.get_next_packet_info_for_interface2(
154                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
155             self.assertTrue(remaining_packet is None,
156                             "Interface %s: Packet expected from interface %s "
157                             "didn't arrive" % (dst_if.name, i.name))
158
159     @staticmethod
160     def build_ip6_mask(nh='', src_ip='', dst_ip='',
161                        src_port='', dst_port=''):
162         """Build IPv6 ACL mask data with hexstring format.
163
164         :param str nh: next header number <0-ff>
165         :param str src_ip: source ip address <0-ffffffff>
166         :param str dst_ip: destination ip address <0-ffffffff>
167         :param str src_port: source port number <0-ffff>
168         :param str dst_port: destination port number <0-ffff>
169         """
170
171         return ('{:0>14}{:0>34}{:0>32}{:0>4}{:0>4}'.format(
172             nh, src_ip, dst_ip, src_port, dst_port)).rstrip('0')
173
174     @staticmethod
175     def build_ip6_match(nh=0, src_ip='', dst_ip='',
176                         src_port=0, dst_port=0):
177         """Build IPv6 ACL match data with hexstring format.
178
179         :param int nh: next header number with valid option "x"
180         :param str src_ip: source ip6 address with format of "xxx:xxxx::xxxx"
181         :param str dst_ip: destination ip6 address with format of
182             "xxx:xxxx::xxxx"
183         :param int src_port: source port number "x"
184         :param int dst_port: destination port number "x"
185         """
186         if src_ip:
187             src_ip = socket.inet_pton(socket.AF_INET6, src_ip).encode('hex')
188         if dst_ip:
189             dst_ip = socket.inet_pton(socket.AF_INET6, dst_ip).encode('hex')
190
191         return ('{:0>14}{:0>34}{:0>32}{:0>4}{:0>4}'.format(
192             hex(nh)[2:], src_ip, dst_ip, hex(src_port)[2:],
193             hex(dst_port)[2:])).rstrip('0')
194
195     @staticmethod
196     def build_mac_mask(dst_mac='', src_mac='', ether_type=''):
197         """Build MAC ACL mask data with hexstring format.
198
199         :param str dst_mac: source MAC address <0-ffffffffffff>
200         :param str src_mac: destination MAC address <0-ffffffffffff>
201         :param str ether_type: ethernet type <0-ffff>
202         """
203
204         return ('{:0>12}{:0>12}{:0>4}'.format(dst_mac, src_mac,
205                                               ether_type)).rstrip('0')
206
207     @staticmethod
208     def build_mac_match(dst_mac='', src_mac='', ether_type=''):
209         """Build MAC ACL match data with hexstring format.
210
211         :param str dst_mac: source MAC address <x:x:x:x:x:x>
212         :param str src_mac: destination MAC address <x:x:x:x:x:x>
213         :param str ether_type: ethernet type <0-ffff>
214         """
215         if dst_mac:
216             dst_mac = dst_mac.replace(':', '')
217         if src_mac:
218             src_mac = src_mac.replace(':', '')
219
220         return ('{:0>12}{:0>12}{:0>4}'.format(dst_mac, src_mac,
221                                               ether_type)).rstrip('0')
222
223     def create_classify_table(self, key, mask, data_offset=0):
224         """Create Classify Table
225
226         :param str key: key for classify table (ex, ACL name).
227         :param str mask: mask value for interested traffic.
228         :param int data_offset:
229         """
230         r = self.vapi.classify_add_del_table(
231             is_add=1,
232             mask=binascii.unhexlify(mask),
233             match_n_vectors=(len(mask) - 1) // 32 + 1,
234             miss_next_index=0,
235             current_data_flag=1,
236             current_data_offset=data_offset)
237         self.assertIsNotNone(r, msg='No response msg for add_del_table')
238         self.acl_tbl_idx[key] = r.new_table_index
239
240     def create_classify_session(self, table_index, match, vrfid=0, is_add=1):
241         """Create Classify Session
242
243         :param int table_index: table index to identify classify table.
244         :param str match: matched value for interested traffic.
245         :param int vrfid: VRF id.
246         :param int is_add: option to configure classify session.
247             - create(1) or delete(0)
248         """
249         r = self.vapi.classify_add_del_session(
250             is_add,
251             table_index,
252             binascii.unhexlify(match),
253             opaque_index=0,
254             metadata=vrfid)
255         self.assertIsNotNone(r, msg='No response msg for add_del_session')
256
257     def input_acl_set_interface(self, intf, table_index, is_add=1):
258         """Configure Input ACL interface
259
260         :param VppInterface intf: Interface to apply Input ACL feature.
261         :param int table_index: table index to identify classify table.
262         :param int is_add: option to configure classify session.
263             - enable(1) or disable(0)
264         """
265         r = self.vapi.input_acl_set_interface(
266             is_add,
267             intf.sw_if_index,
268             ip6_table_index=table_index)
269         self.assertIsNotNone(r, msg='No response msg for acl_set_interface')
270
271     def output_acl_set_interface(self, intf, table_index, is_add=1):
272         """Configure Output ACL interface
273
274         :param VppInterface intf: Interface to apply Output ACL feature.
275         :param int table_index: table index to identify classify table.
276         :param int is_add: option to configure classify session.
277             - enable(1) or disable(0)
278         """
279         r = self.vapi.output_acl_set_interface(
280             is_add,
281             intf.sw_if_index,
282             ip6_table_index=table_index)
283         self.assertIsNotNone(r, msg='No response msg for acl_set_interface')
284
285
286 class TestClassifierIP6(TestClassifier):
287     """ Classifier IP6 Test Case """
288
289     def test_iacl_src_ip(self):
290         """ Source IP6 iACL test
291
292         Test scenario for basic IP ACL with source IP
293             - Create IPv6 stream for pg0 -> pg1 interface.
294             - Create iACL with source IP address.
295             - Send and verify received packets on pg1 interface.
296         """
297
298         # Basic iACL testing with source IP
299         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
300         self.pg0.add_stream(pkts)
301
302         key = 'ip6_src'
303         self.create_classify_table(
304             key,
305             self.build_ip6_mask(src_ip='ffffffffffffffffffffffffffffffff'))
306         self.create_classify_session(
307             self.acl_tbl_idx.get(key),
308             self.build_ip6_match(src_ip=self.pg0.remote_ip6))
309         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
310         self.acl_active_table = key
311
312         self.pg_enable_capture(self.pg_interfaces)
313         self.pg_start()
314
315         pkts = self.pg1.get_capture(len(pkts))
316         self.verify_capture(self.pg1, pkts)
317         self.pg0.assert_nothing_captured(remark="packets forwarded")
318         self.pg2.assert_nothing_captured(remark="packets forwarded")
319
320     def test_iacl_dst_ip(self):
321         """ Destination IP6 iACL test
322
323         Test scenario for basic IP ACL with destination IP
324             - Create IPv6 stream for pg0 -> pg1 interface.
325             - Create iACL with destination IP address.
326             - Send and verify received packets on pg1 interface.
327         """
328
329         # Basic iACL testing with destination IP
330         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
331         self.pg0.add_stream(pkts)
332
333         key = 'ip6_dst'
334         self.create_classify_table(
335             key,
336             self.build_ip6_mask(dst_ip='ffffffffffffffffffffffffffffffff'))
337         self.create_classify_session(
338             self.acl_tbl_idx.get(key),
339             self.build_ip6_match(dst_ip=self.pg1.remote_ip6))
340         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
341         self.acl_active_table = key
342
343         self.pg_enable_capture(self.pg_interfaces)
344         self.pg_start()
345
346         pkts = self.pg1.get_capture(len(pkts))
347         self.verify_capture(self.pg1, pkts)
348         self.pg0.assert_nothing_captured(remark="packets forwarded")
349         self.pg2.assert_nothing_captured(remark="packets forwarded")
350
351     def test_iacl_src_dst_ip(self):
352         """ Source and destination IP6 iACL test
353
354         Test scenario for basic IP ACL with source and destination IP
355             - Create IPv4 stream for pg0 -> pg1 interface.
356             - Create iACL with source and destination IP addresses.
357             - Send and verify received packets on pg1 interface.
358         """
359
360         # Basic iACL testing with source and destination IP
361         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
362         self.pg0.add_stream(pkts)
363
364         key = 'ip6'
365         self.create_classify_table(
366             key,
367             self.build_ip6_mask(src_ip='ffffffffffffffffffffffffffffffff',
368                                 dst_ip='ffffffffffffffffffffffffffffffff'))
369         self.create_classify_session(
370             self.acl_tbl_idx.get(key),
371             self.build_ip6_match(src_ip=self.pg0.remote_ip6,
372                                  dst_ip=self.pg1.remote_ip6))
373         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
374         self.acl_active_table = key
375
376         self.pg_enable_capture(self.pg_interfaces)
377         self.pg_start()
378
379         pkts = self.pg1.get_capture(len(pkts))
380         self.verify_capture(self.pg1, pkts)
381         self.pg0.assert_nothing_captured(remark="packets forwarded")
382         self.pg2.assert_nothing_captured(remark="packets forwarded")
383
384
385 # Tests split to different test case classes because of issue reported in
386 # ticket VPP-1336
387 class TestClassifierIP6UDP(TestClassifier):
388     """ Classifier IP6 UDP proto Test Case """
389
390     def test_iacl_proto_udp(self):
391         """ IP6 UDP protocol iACL test
392
393         Test scenario for basic protocol ACL with UDP protocol
394             - Create IPv6 stream for pg0 -> pg1 interface.
395             - Create iACL with UDP IP protocol.
396             - Send and verify received packets on pg1 interface.
397         """
398
399         # Basic iACL testing with UDP protocol
400         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
401         self.pg0.add_stream(pkts)
402
403         key = 'nh_udp'
404         self.create_classify_table(key, self.build_ip6_mask(nh='ff'))
405         self.create_classify_session(
406             self.acl_tbl_idx.get(key),
407             self.build_ip6_match(nh=socket.IPPROTO_UDP))
408         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
409         self.acl_active_table = key
410
411         self.pg_enable_capture(self.pg_interfaces)
412         self.pg_start()
413
414         pkts = self.pg1.get_capture(len(pkts))
415         self.verify_capture(self.pg1, pkts)
416         self.pg0.assert_nothing_captured(remark="packets forwarded")
417         self.pg2.assert_nothing_captured(remark="packets forwarded")
418
419     def test_iacl_proto_udp_sport(self):
420         """ IP6 UDP source port iACL test
421
422         Test scenario for basic protocol ACL with UDP and sport
423             - Create IPv6 stream for pg0 -> pg1 interface.
424             - Create iACL with UDP IP protocol and defined sport.
425             - Send and verify received packets on pg1 interface.
426         """
427
428         # Basic iACL testing with UDP and sport
429         sport = 38
430         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
431                                   UDP(sport=sport, dport=5678))
432         self.pg0.add_stream(pkts)
433
434         key = 'nh_udp_sport'
435         self.create_classify_table(
436             key, self.build_ip6_mask(nh='ff', src_port='ffff'))
437         self.create_classify_session(
438             self.acl_tbl_idx.get(key),
439             self.build_ip6_match(nh=socket.IPPROTO_UDP, src_port=sport))
440         self.input_acl_set_interface(
441             self.pg0, self.acl_tbl_idx.get(key))
442         self.acl_active_table = key
443
444         self.pg_enable_capture(self.pg_interfaces)
445         self.pg_start()
446
447         pkts = self.pg1.get_capture(len(pkts))
448         self.verify_capture(self.pg1, pkts)
449         self.pg0.assert_nothing_captured(remark="packets forwarded")
450         self.pg2.assert_nothing_captured(remark="packets forwarded")
451
452     def test_iacl_proto_udp_dport(self):
453         """ IP6 UDP destination port iACL test
454
455         Test scenario for basic protocol ACL with UDP and dport
456             - Create IPv6 stream for pg0 -> pg1 interface.
457             - Create iACL with UDP IP protocol and defined dport.
458             - Send and verify received packets on pg1 interface.
459         """
460
461         # Basic iACL testing with UDP and dport
462         dport = 427
463         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
464                                   UDP(sport=1234, dport=dport))
465         self.pg0.add_stream(pkts)
466
467         key = 'nh_udp_dport'
468         self.create_classify_table(
469             key, self.build_ip6_mask(nh='ff', dst_port='ffff'))
470         self.create_classify_session(
471             self.acl_tbl_idx.get(key),
472             self.build_ip6_match(nh=socket.IPPROTO_UDP, dst_port=dport))
473         self.input_acl_set_interface(
474             self.pg0, self.acl_tbl_idx.get(key))
475         self.acl_active_table = key
476
477         self.pg_enable_capture(self.pg_interfaces)
478         self.pg_start()
479
480         pkts = self.pg1.get_capture(len(pkts))
481         self.verify_capture(self.pg1, pkts)
482         self.pg0.assert_nothing_captured(remark="packets forwarded")
483         self.pg2.assert_nothing_captured(remark="packets forwarded")
484
485     def test_iacl_proto_udp_sport_dport(self):
486         """ IP6 UDP source and destination ports iACL test
487
488         Test scenario for basic protocol ACL with UDP and sport and dport
489             - Create IPv6 stream for pg0 -> pg1 interface.
490             - Create iACL with UDP IP protocol and defined sport and dport.
491             - Send and verify received packets on pg1 interface.
492         """
493
494         # Basic iACL testing with UDP and sport and dport
495         sport = 13720
496         dport = 9080
497         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
498                                   UDP(sport=sport, dport=dport))
499         self.pg0.add_stream(pkts)
500
501         key = 'nh_udp_ports'
502         self.create_classify_table(
503             key,
504             self.build_ip6_mask(nh='ff', src_port='ffff', dst_port='ffff'))
505         self.create_classify_session(
506             self.acl_tbl_idx.get(key),
507             self.build_ip6_match(nh=socket.IPPROTO_UDP, src_port=sport,
508                                  dst_port=dport))
509         self.input_acl_set_interface(
510             self.pg0, self.acl_tbl_idx.get(key))
511         self.acl_active_table = key
512
513         self.pg_enable_capture(self.pg_interfaces)
514         self.pg_start()
515
516         pkts = self.pg1.get_capture(len(pkts))
517         self.verify_capture(self.pg1, pkts)
518         self.pg0.assert_nothing_captured(remark="packets forwarded")
519         self.pg2.assert_nothing_captured(remark="packets forwarded")
520
521
522 class TestClassifierIP6TCP(TestClassifier):
523     """ Classifier IP6 TCP proto Test Case """
524
525     def test_iacl_proto_tcp(self):
526         """ IP6 TCP protocol iACL test
527
528         Test scenario for basic protocol ACL with TCP protocol
529             - Create IPv6 stream for pg0 -> pg1 interface.
530             - Create iACL with TCP IP protocol.
531             - Send and verify received packets on pg1 interface.
532         """
533
534         # Basic iACL testing with TCP protocol
535         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
536                                   TCP(sport=1234, dport=5678))
537         self.pg0.add_stream(pkts)
538
539         key = 'nh_tcp'
540         self.create_classify_table(key, self.build_ip6_mask(nh='ff'))
541         self.create_classify_session(
542             self.acl_tbl_idx.get(key),
543             self.build_ip6_match(nh=socket.IPPROTO_TCP))
544         self.input_acl_set_interface(
545             self.pg0, self.acl_tbl_idx.get(key))
546         self.acl_active_table = key
547
548         self.pg_enable_capture(self.pg_interfaces)
549         self.pg_start()
550
551         pkts = self.pg1.get_capture(len(pkts))
552         self.verify_capture(self.pg1, pkts, TCP)
553         self.pg0.assert_nothing_captured(remark="packets forwarded")
554         self.pg2.assert_nothing_captured(remark="packets forwarded")
555
556     def test_iacl_proto_tcp_sport(self):
557         """ IP6 TCP source port iACL test
558
559         Test scenario for basic protocol ACL with TCP and sport
560             - Create IPv6 stream for pg0 -> pg1 interface.
561             - Create iACL with TCP IP protocol and defined sport.
562             - Send and verify received packets on pg1 interface.
563         """
564
565         # Basic iACL testing with TCP and sport
566         sport = 38
567         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
568                                   TCP(sport=sport, dport=5678))
569         self.pg0.add_stream(pkts)
570
571         key = 'nh_tcp_sport'
572         self.create_classify_table(
573             key, self.build_ip6_mask(nh='ff', src_port='ffff'))
574         self.create_classify_session(
575             self.acl_tbl_idx.get(key),
576             self.build_ip6_match(nh=socket.IPPROTO_TCP, src_port=sport))
577         self.input_acl_set_interface(
578             self.pg0, self.acl_tbl_idx.get(key))
579         self.acl_active_table = key
580
581         self.pg_enable_capture(self.pg_interfaces)
582         self.pg_start()
583
584         pkts = self.pg1.get_capture(len(pkts))
585         self.verify_capture(self.pg1, pkts, TCP)
586         self.pg0.assert_nothing_captured(remark="packets forwarded")
587         self.pg2.assert_nothing_captured(remark="packets forwarded")
588
589     def test_iacl_proto_tcp_dport(self):
590         """ IP6 TCP destination port iACL test
591
592         Test scenario for basic protocol ACL with TCP and dport
593             - Create IPv6 stream for pg0 -> pg1 interface.
594             - Create iACL with TCP IP protocol and defined dport.
595             - Send and verify received packets on pg1 interface.
596         """
597
598         # Basic iACL testing with TCP and dport
599         dport = 427
600         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
601                                   TCP(sport=1234, dport=dport))
602         self.pg0.add_stream(pkts)
603
604         key = 'nh_tcp_dport'
605         self.create_classify_table(
606             key, self.build_ip6_mask(nh='ff', dst_port='ffff'))
607         self.create_classify_session(
608             self.acl_tbl_idx.get(key),
609             self.build_ip6_match(nh=socket.IPPROTO_TCP, dst_port=dport))
610         self.input_acl_set_interface(
611             self.pg0, self.acl_tbl_idx.get(key))
612         self.acl_active_table = key
613
614         self.pg_enable_capture(self.pg_interfaces)
615         self.pg_start()
616
617         pkts = self.pg1.get_capture(len(pkts))
618         self.verify_capture(self.pg1, pkts, TCP)
619         self.pg0.assert_nothing_captured(remark="packets forwarded")
620         self.pg2.assert_nothing_captured(remark="packets forwarded")
621
622     def test_iacl_proto_tcp_sport_dport(self):
623         """ IP6 TCP source and destination ports iACL test
624
625         Test scenario for basic protocol ACL with TCP and sport and dport
626             - Create IPv6 stream for pg0 -> pg1 interface.
627             - Create iACL with TCP IP protocol and defined sport and dport.
628             - Send and verify received packets on pg1 interface.
629         """
630
631         # Basic iACL testing with TCP and sport and dport
632         sport = 13720
633         dport = 9080
634         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
635                                   TCP(sport=sport, dport=dport))
636         self.pg0.add_stream(pkts)
637
638         key = 'nh_tcp_ports'
639         self.create_classify_table(
640             key,
641             self.build_ip6_mask(nh='ff', src_port='ffff', dst_port='ffff'))
642         self.create_classify_session(
643             self.acl_tbl_idx.get(key),
644             self.build_ip6_match(nh=socket.IPPROTO_TCP, src_port=sport,
645                                  dst_port=dport))
646         self.input_acl_set_interface(
647             self.pg0, self.acl_tbl_idx.get(key))
648         self.acl_active_table = key
649
650         self.pg_enable_capture(self.pg_interfaces)
651         self.pg_start()
652
653         pkts = self.pg1.get_capture(len(pkts))
654         self.verify_capture(self.pg1, pkts, TCP)
655         self.pg0.assert_nothing_captured(remark="packets forwarded")
656         self.pg2.assert_nothing_captured(remark="packets forwarded")
657
658
659 class TestClassifierIP6Out(TestClassifier):
660     """ Classifier output IP6 Test Case """
661
662     def test_acl_ip_out(self):
663         """ Output IP6 ACL test
664
665         Test scenario for basic IP ACL with source IP
666             - Create IPv6 stream for pg1 -> pg0 interface.
667             - Create ACL with source IP address.
668             - Send and verify received packets on pg0 interface.
669         """
670
671         # Basic oACL testing with source IP
672         pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes)
673         self.pg1.add_stream(pkts)
674
675         key = 'ip6_out'
676         self.create_classify_table(
677             key,
678             self.build_ip6_mask(src_ip='ffffffffffffffffffffffffffffffff'),
679             data_offset=0)
680         self.create_classify_session(
681             self.acl_tbl_idx.get(key),
682             self.build_ip6_match(src_ip=self.pg1.remote_ip6))
683         self.output_acl_set_interface(
684             self.pg0, self.acl_tbl_idx.get(key))
685         self.acl_active_table = key
686
687         self.pg_enable_capture(self.pg_interfaces)
688         self.pg_start()
689
690         pkts = self.pg0.get_capture(len(pkts))
691         self.verify_capture(self.pg0, pkts)
692         self.pg1.assert_nothing_captured(remark="packets forwarded")
693         self.pg2.assert_nothing_captured(remark="packets forwarded")
694
695
696 class TestClassifierIP6MAC(TestClassifier):
697     """ Classifier IP6 MAC Test Case """
698
699     def test_acl_mac(self):
700         """ IP6 MAC iACL test
701
702         Test scenario for basic MAC ACL with source MAC
703             - Create IPv6 stream for pg0 -> pg2 interface.
704             - Create ACL with source MAC address.
705             - Send and verify received packets on pg2 interface.
706         """
707
708         # Basic iACL testing with source MAC
709         pkts = self.create_stream(self.pg0, self.pg2, self.pg_if_packet_sizes)
710         self.pg0.add_stream(pkts)
711
712         key = 'mac'
713         self.create_classify_table(
714             key, self.build_mac_mask(src_mac='ffffffffffff'), data_offset=-14)
715         self.create_classify_session(
716             self.acl_tbl_idx.get(key),
717             self.build_mac_match(src_mac=self.pg0.remote_mac))
718         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
719         self.acl_active_table = key
720
721         self.pg_enable_capture(self.pg_interfaces)
722         self.pg_start()
723
724         pkts = self.pg2.get_capture(len(pkts))
725         self.verify_capture(self.pg2, pkts)
726         self.pg0.assert_nothing_captured(remark="packets forwarded")
727         self.pg1.assert_nothing_captured(remark="packets forwarded")
728
729
730 if __name__ == '__main__':
731     unittest.main(testRunner=VppTestRunner)