Tests Cleanup: Fix missing calls to setUpClass/tearDownClass.
[vpp.git] / test / test_classifier_ip6.py
1 #!/usr/bin/env python
2
3 import unittest
4 import socket
5 import binascii
6 import sys
7
8 from framework import VppTestCase, VppTestRunner
9
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether
12 from scapy.layers.inet6 import IPv6, UDP, TCP
13 from util import ppp
14
15
16 class TestClassifier(VppTestCase):
17     """ Classifier Test Case """
18
19     @classmethod
20     def setUpClass(cls):
21         """
22         Perform standard class setup (defined by class method setUpClass in
23         class VppTestCase) before running the test case, set test case related
24         variables and configure VPP.
25         """
26         super(TestClassifier, cls).setUpClass()
27         cls.acl_active_table = ''
28
29     @classmethod
30     def tearDownClass(cls):
31         super(TestClassifier, cls).tearDownClass()
32
33     def setUp(self):
34         """
35         Perform test setup before test case.
36
37         **Config:**
38             - create 4 pg interfaces
39                 - untagged pg0/pg1/pg2 interface
40                     pg0 -------> pg1 (IP ACL)
41                            \
42                             ---> pg2 (MAC ACL))
43             - setup interfaces:
44                 - put it into UP state
45                 - set IPv6 addresses
46                 - resolve neighbor address using NDP
47
48         :ivar list interfaces: pg interfaces.
49         :ivar list pg_if_packet_sizes: packet sizes in test.
50         :ivar dict acl_tbl_idx: ACL table index.
51         :ivar int pbr_vrfid: VRF id for PBR test.
52         """
53         self.reset_packet_infos()
54         super(TestClassifier, self).setUp()
55
56         # create 4 pg interfaces
57         self.create_pg_interfaces(range(3))
58
59         # packet sizes to test
60         self.pg_if_packet_sizes = [64, 9018]
61
62         self.interfaces = list(self.pg_interfaces)
63
64         # ACL vars
65         self.acl_tbl_idx = {}
66
67         # setup all interfaces
68         for intf in self.interfaces:
69             intf.admin_up()
70             intf.config_ip6()
71             intf.resolve_ndp()
72
73     def tearDown(self):
74         """Run standard test teardown and acl related log."""
75         if not self.vpp_dead:
76             self.logger.info(self.vapi.ppcli("show inacl type ip6"))
77             self.logger.info(self.vapi.ppcli("show outacl type ip6"))
78             self.logger.info(self.vapi.cli("show classify table verbose"))
79             self.logger.info(self.vapi.cli("show ip fib"))
80             if self.acl_active_table == 'ip6_out':
81                 self.output_acl_set_interface(
82                     self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
83                 self.acl_active_table = ''
84             elif self.acl_active_table != '':
85                 self.input_acl_set_interface(
86                     self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
87                 self.acl_active_table = ''
88             for intf in self.interfaces:
89                 intf.unconfig_ip6()
90                 intf.admin_down()
91
92         super(TestClassifier, self).tearDown()
93
94     def create_stream(self, src_if, dst_if, packet_sizes,
95                       proto_l=UDP(sport=1234, dport=5678)):
96         """Create input packet stream for defined interfaces.
97
98         :param VppInterface src_if: Source Interface for packet stream.
99         :param VppInterface dst_if: Destination Interface for packet stream.
100         :param list packet_sizes: packet size to test.
101         :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
102         """
103         pkts = []
104
105         for size in packet_sizes:
106             info = self.create_packet_info(src_if, dst_if)
107             payload = self.info_to_payload(info)
108             p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
109                  IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6) /
110                  proto_l /
111                  Raw(payload))
112             info.data = p.copy()
113             self.extend_packet(p, size)
114             pkts.append(p)
115         return pkts
116
117     def verify_capture(self, dst_if, capture, proto_l=UDP):
118         """Verify captured input packet stream for defined interface.
119
120         :param VppInterface dst_if: Interface to verify captured packet stream.
121         :param list capture: Captured packet stream.
122         :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
123         """
124         self.logger.info("Verifying capture on interface %s" % dst_if.name)
125         last_info = dict()
126         for i in self.interfaces:
127             last_info[i.sw_if_index] = None
128         dst_sw_if_index = dst_if.sw_if_index
129         for packet in capture:
130             try:
131                 ip6_received = packet[IPv6]
132                 proto_received = packet[proto_l]
133                 payload_info = self.payload_to_info(packet[Raw])
134                 packet_index = payload_info.index
135                 self.assertEqual(payload_info.dst, dst_sw_if_index)
136                 self.logger.debug(
137                     "Got packet on port %s: src=%u (id=%u)" %
138                     (dst_if.name, payload_info.src, packet_index))
139                 next_info = self.get_next_packet_info_for_interface2(
140                     payload_info.src, dst_sw_if_index,
141                     last_info[payload_info.src])
142                 last_info[payload_info.src] = next_info
143                 self.assertTrue(next_info is not None)
144                 self.assertEqual(packet_index, next_info.index)
145                 saved_packet = next_info.data
146                 ip_saved = saved_packet[IPv6]
147                 proto_saved = saved_packet[proto_l]
148                 # Check standard fields
149                 self.assertEqual(ip6_received.src, ip_saved.src)
150                 self.assertEqual(ip6_received.dst, ip_saved.dst)
151                 self.assertEqual(proto_received.sport, proto_saved.sport)
152                 self.assertEqual(proto_received.dport, proto_saved.dport)
153             except:
154                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
155                 raise
156         for i in self.interfaces:
157             remaining_packet = self.get_next_packet_info_for_interface2(
158                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
159             self.assertTrue(remaining_packet is None,
160                             "Interface %s: Packet expected from interface %s "
161                             "didn't arrive" % (dst_if.name, i.name))
162
163     @staticmethod
164     def build_ip6_mask(nh='', src_ip='', dst_ip='',
165                        src_port='', dst_port=''):
166         """Build IPv6 ACL mask data with hexstring format.
167
168         :param str nh: next header number <0-ff>
169         :param str src_ip: source ip address <0-ffffffff>
170         :param str dst_ip: destination ip address <0-ffffffff>
171         :param str src_port: source port number <0-ffff>
172         :param str dst_port: destination port number <0-ffff>
173         """
174
175         return ('{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}'.format(
176             nh, src_ip, dst_ip, src_port, dst_port)).rstrip('0')
177
178     @staticmethod
179     def build_ip6_match(nh=0, src_ip='', dst_ip='',
180                         src_port=0, dst_port=0):
181         """Build IPv6 ACL match data with hexstring format.
182
183         :param int nh: next header number with valid option "x"
184         :param str src_ip: source ip6 address with format of "xxx:xxxx::xxxx"
185         :param str dst_ip: destination ip6 address with format of
186             "xxx:xxxx::xxxx"
187         :param int src_port: source port number "x"
188         :param int dst_port: destination port number "x"
189         """
190         if src_ip:
191             src_ip = binascii.hexlify(socket.inet_pton(
192                 socket.AF_INET6, src_ip))
193         if dst_ip:
194             dst_ip = binascii.hexlify(socket.inet_pton(
195                 socket.AF_INET6, dst_ip))
196
197         return ('{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}'.format(
198             hex(nh)[2:], src_ip, dst_ip, hex(src_port)[2:],
199             hex(dst_port)[2:])).rstrip('0')
200
201     @staticmethod
202     def build_mac_mask(dst_mac='', src_mac='', ether_type=''):
203         """Build MAC ACL mask data with hexstring format.
204
205         :param str dst_mac: source MAC address <0-ffffffffffff>
206         :param str src_mac: destination MAC address <0-ffffffffffff>
207         :param str ether_type: ethernet type <0-ffff>
208         """
209
210         return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
211             dst_mac, src_mac, ether_type)).rstrip('0')
212
213     @staticmethod
214     def build_mac_match(dst_mac='', src_mac='', ether_type=''):
215         """Build MAC ACL match data with hexstring format.
216
217         :param str dst_mac: source MAC address <x:x:x:x:x:x>
218         :param str src_mac: destination MAC address <x:x:x:x:x:x>
219         :param str ether_type: ethernet type <0-ffff>
220         """
221         if dst_mac:
222             dst_mac = dst_mac.replace(':', '')
223         if src_mac:
224             src_mac = src_mac.replace(':', '')
225
226         return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
227             dst_mac, src_mac, ether_type)).rstrip('0')
228
229     def create_classify_table(self, key, mask, data_offset=0):
230         """Create Classify Table
231
232         :param str key: key for classify table (ex, ACL name).
233         :param str mask: mask value for interested traffic.
234         :param int data_offset:
235         """
236         r = self.vapi.classify_add_del_table(
237             is_add=1,
238             mask=binascii.unhexlify(mask),
239             match_n_vectors=(len(mask) - 1) // 32 + 1,
240             miss_next_index=0,
241             current_data_flag=1,
242             current_data_offset=data_offset)
243         self.assertIsNotNone(r, 'No response msg for add_del_table')
244         self.acl_tbl_idx[key] = r.new_table_index
245
246     def create_classify_session(self, table_index, match, vrfid=0, is_add=1):
247         """Create Classify Session
248
249         :param int table_index: table index to identify classify table.
250         :param str match: matched value for interested traffic.
251         :param int vrfid: VRF id.
252         :param int is_add: option to configure classify session.
253             - create(1) or delete(0)
254         """
255         r = self.vapi.classify_add_del_session(
256             is_add,
257             table_index,
258             binascii.unhexlify(match),
259             opaque_index=0,
260             metadata=vrfid)
261         self.assertIsNotNone(r, 'No response msg for add_del_session')
262
263     def input_acl_set_interface(self, intf, table_index, is_add=1):
264         """Configure Input ACL interface
265
266         :param VppInterface intf: Interface to apply Input ACL feature.
267         :param int table_index: table index to identify classify table.
268         :param int is_add: option to configure classify session.
269             - enable(1) or disable(0)
270         """
271         r = self.vapi.input_acl_set_interface(
272             is_add,
273             intf.sw_if_index,
274             ip6_table_index=table_index)
275         self.assertIsNotNone(r, 'No response msg for acl_set_interface')
276
277     def output_acl_set_interface(self, intf, table_index, is_add=1):
278         """Configure Output ACL interface
279
280         :param VppInterface intf: Interface to apply Output ACL feature.
281         :param int table_index: table index to identify classify table.
282         :param int is_add: option to configure classify session.
283             - enable(1) or disable(0)
284         """
285         r = self.vapi.output_acl_set_interface(
286             is_add,
287             intf.sw_if_index,
288             ip6_table_index=table_index)
289         self.assertIsNotNone(r, 'No response msg for acl_set_interface')
290
291
292 class TestClassifierIP6(TestClassifier):
293     """ Classifier IP6 Test Case """
294
295     @classmethod
296     def setUpClass(cls):
297         super(TestClassifierIP6, cls).setUpClass()
298
299     @classmethod
300     def tearDownClass(cls):
301         super(TestClassifierIP6, cls).tearDownClass()
302
303     def test_iacl_src_ip(self):
304         """ Source IP6 iACL test
305
306         Test scenario for basic IP ACL with source IP
307             - Create IPv6 stream for pg0 -> pg1 interface.
308             - Create iACL with source IP address.
309             - Send and verify received packets on pg1 interface.
310         """
311
312         # Basic iACL testing with source IP
313         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
314         self.pg0.add_stream(pkts)
315
316         key = 'ip6_src'
317         self.create_classify_table(
318             key,
319             self.build_ip6_mask(src_ip='ffffffffffffffffffffffffffffffff'))
320         self.create_classify_session(
321             self.acl_tbl_idx.get(key),
322             self.build_ip6_match(src_ip=self.pg0.remote_ip6))
323         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
324         self.acl_active_table = key
325
326         self.pg_enable_capture(self.pg_interfaces)
327         self.pg_start()
328
329         pkts = self.pg1.get_capture(len(pkts))
330         self.verify_capture(self.pg1, pkts)
331         self.pg0.assert_nothing_captured(remark="packets forwarded")
332         self.pg2.assert_nothing_captured(remark="packets forwarded")
333
334     def test_iacl_dst_ip(self):
335         """ Destination IP6 iACL test
336
337         Test scenario for basic IP ACL with destination IP
338             - Create IPv6 stream for pg0 -> pg1 interface.
339             - Create iACL with destination IP address.
340             - Send and verify received packets on pg1 interface.
341         """
342
343         # Basic iACL testing with destination IP
344         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
345         self.pg0.add_stream(pkts)
346
347         key = 'ip6_dst'
348         self.create_classify_table(
349             key,
350             self.build_ip6_mask(dst_ip='ffffffffffffffffffffffffffffffff'))
351         self.create_classify_session(
352             self.acl_tbl_idx.get(key),
353             self.build_ip6_match(dst_ip=self.pg1.remote_ip6))
354         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
355         self.acl_active_table = key
356
357         self.pg_enable_capture(self.pg_interfaces)
358         self.pg_start()
359
360         pkts = self.pg1.get_capture(len(pkts))
361         self.verify_capture(self.pg1, pkts)
362         self.pg0.assert_nothing_captured(remark="packets forwarded")
363         self.pg2.assert_nothing_captured(remark="packets forwarded")
364
365     def test_iacl_src_dst_ip(self):
366         """ Source and destination IP6 iACL test
367
368         Test scenario for basic IP ACL with source and destination IP
369             - Create IPv4 stream for pg0 -> pg1 interface.
370             - Create iACL with source and destination IP addresses.
371             - Send and verify received packets on pg1 interface.
372         """
373
374         # Basic iACL testing with source and destination IP
375         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
376         self.pg0.add_stream(pkts)
377
378         key = 'ip6'
379         self.create_classify_table(
380             key,
381             self.build_ip6_mask(src_ip='ffffffffffffffffffffffffffffffff',
382                                 dst_ip='ffffffffffffffffffffffffffffffff'))
383         self.create_classify_session(
384             self.acl_tbl_idx.get(key),
385             self.build_ip6_match(src_ip=self.pg0.remote_ip6,
386                                  dst_ip=self.pg1.remote_ip6))
387         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
388         self.acl_active_table = key
389
390         self.pg_enable_capture(self.pg_interfaces)
391         self.pg_start()
392
393         pkts = self.pg1.get_capture(len(pkts))
394         self.verify_capture(self.pg1, pkts)
395         self.pg0.assert_nothing_captured(remark="packets forwarded")
396         self.pg2.assert_nothing_captured(remark="packets forwarded")
397
398
399 # Tests split to different test case classes because of issue reported in
400 # ticket VPP-1336
401 class TestClassifierIP6UDP(TestClassifier):
402     """ Classifier IP6 UDP proto Test Case """
403
404     @classmethod
405     def setUpClass(cls):
406         super(TestClassifierIP6UDP, cls).setUpClass()
407
408     @classmethod
409     def tearDownClass(cls):
410         super(TestClassifierIP6UDP, cls).tearDownClass()
411
412     def test_iacl_proto_udp(self):
413         """ IP6 UDP protocol iACL test
414
415         Test scenario for basic protocol ACL with UDP protocol
416             - Create IPv6 stream for pg0 -> pg1 interface.
417             - Create iACL with UDP IP protocol.
418             - Send and verify received packets on pg1 interface.
419         """
420
421         # Basic iACL testing with UDP protocol
422         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
423         self.pg0.add_stream(pkts)
424
425         key = 'nh_udp'
426         self.create_classify_table(key, self.build_ip6_mask(nh='ff'))
427         self.create_classify_session(
428             self.acl_tbl_idx.get(key),
429             self.build_ip6_match(nh=socket.IPPROTO_UDP))
430         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
431         self.acl_active_table = key
432
433         self.pg_enable_capture(self.pg_interfaces)
434         self.pg_start()
435
436         pkts = self.pg1.get_capture(len(pkts))
437         self.verify_capture(self.pg1, pkts)
438         self.pg0.assert_nothing_captured(remark="packets forwarded")
439         self.pg2.assert_nothing_captured(remark="packets forwarded")
440
441     def test_iacl_proto_udp_sport(self):
442         """ IP6 UDP source port iACL test
443
444         Test scenario for basic protocol ACL with UDP and sport
445             - Create IPv6 stream for pg0 -> pg1 interface.
446             - Create iACL with UDP IP protocol and defined sport.
447             - Send and verify received packets on pg1 interface.
448         """
449
450         # Basic iACL testing with UDP and sport
451         sport = 38
452         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
453                                   UDP(sport=sport, dport=5678))
454         self.pg0.add_stream(pkts)
455
456         key = 'nh_udp_sport'
457         self.create_classify_table(
458             key, self.build_ip6_mask(nh='ff', src_port='ffff'))
459         self.create_classify_session(
460             self.acl_tbl_idx.get(key),
461             self.build_ip6_match(nh=socket.IPPROTO_UDP, src_port=sport))
462         self.input_acl_set_interface(
463             self.pg0, self.acl_tbl_idx.get(key))
464         self.acl_active_table = key
465
466         self.pg_enable_capture(self.pg_interfaces)
467         self.pg_start()
468
469         pkts = self.pg1.get_capture(len(pkts))
470         self.verify_capture(self.pg1, pkts)
471         self.pg0.assert_nothing_captured(remark="packets forwarded")
472         self.pg2.assert_nothing_captured(remark="packets forwarded")
473
474     def test_iacl_proto_udp_dport(self):
475         """ IP6 UDP destination port iACL test
476
477         Test scenario for basic protocol ACL with UDP and dport
478             - Create IPv6 stream for pg0 -> pg1 interface.
479             - Create iACL with UDP IP protocol and defined dport.
480             - Send and verify received packets on pg1 interface.
481         """
482
483         # Basic iACL testing with UDP and dport
484         dport = 427
485         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
486                                   UDP(sport=1234, dport=dport))
487         self.pg0.add_stream(pkts)
488
489         key = 'nh_udp_dport'
490         self.create_classify_table(
491             key, self.build_ip6_mask(nh='ff', dst_port='ffff'))
492         self.create_classify_session(
493             self.acl_tbl_idx.get(key),
494             self.build_ip6_match(nh=socket.IPPROTO_UDP, dst_port=dport))
495         self.input_acl_set_interface(
496             self.pg0, self.acl_tbl_idx.get(key))
497         self.acl_active_table = key
498
499         self.pg_enable_capture(self.pg_interfaces)
500         self.pg_start()
501
502         pkts = self.pg1.get_capture(len(pkts))
503         self.verify_capture(self.pg1, pkts)
504         self.pg0.assert_nothing_captured(remark="packets forwarded")
505         self.pg2.assert_nothing_captured(remark="packets forwarded")
506
507     def test_iacl_proto_udp_sport_dport(self):
508         """ IP6 UDP source and destination ports iACL test
509
510         Test scenario for basic protocol ACL with UDP and sport and dport
511             - Create IPv6 stream for pg0 -> pg1 interface.
512             - Create iACL with UDP IP protocol and defined sport and dport.
513             - Send and verify received packets on pg1 interface.
514         """
515
516         # Basic iACL testing with UDP and sport and dport
517         sport = 13720
518         dport = 9080
519         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
520                                   UDP(sport=sport, dport=dport))
521         self.pg0.add_stream(pkts)
522
523         key = 'nh_udp_ports'
524         self.create_classify_table(
525             key,
526             self.build_ip6_mask(nh='ff', src_port='ffff', dst_port='ffff'))
527         self.create_classify_session(
528             self.acl_tbl_idx.get(key),
529             self.build_ip6_match(nh=socket.IPPROTO_UDP, src_port=sport,
530                                  dst_port=dport))
531         self.input_acl_set_interface(
532             self.pg0, self.acl_tbl_idx.get(key))
533         self.acl_active_table = key
534
535         self.pg_enable_capture(self.pg_interfaces)
536         self.pg_start()
537
538         pkts = self.pg1.get_capture(len(pkts))
539         self.verify_capture(self.pg1, pkts)
540         self.pg0.assert_nothing_captured(remark="packets forwarded")
541         self.pg2.assert_nothing_captured(remark="packets forwarded")
542
543
544 class TestClassifierIP6TCP(TestClassifier):
545     """ Classifier IP6 TCP proto Test Case """
546
547     @classmethod
548     def setUpClass(cls):
549         super(TestClassifierIP6TCP, cls).setUpClass()
550
551     @classmethod
552     def tearDownClass(cls):
553         super(TestClassifierIP6TCP, cls).tearDownClass()
554
555     def test_iacl_proto_tcp(self):
556         """ IP6 TCP protocol iACL test
557
558         Test scenario for basic protocol ACL with TCP protocol
559             - Create IPv6 stream for pg0 -> pg1 interface.
560             - Create iACL with TCP IP protocol.
561             - Send and verify received packets on pg1 interface.
562         """
563
564         # Basic iACL testing with TCP protocol
565         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
566                                   TCP(sport=1234, dport=5678))
567         self.pg0.add_stream(pkts)
568
569         key = 'nh_tcp'
570         self.create_classify_table(key, self.build_ip6_mask(nh='ff'))
571         self.create_classify_session(
572             self.acl_tbl_idx.get(key),
573             self.build_ip6_match(nh=socket.IPPROTO_TCP))
574         self.input_acl_set_interface(
575             self.pg0, self.acl_tbl_idx.get(key))
576         self.acl_active_table = key
577
578         self.pg_enable_capture(self.pg_interfaces)
579         self.pg_start()
580
581         pkts = self.pg1.get_capture(len(pkts))
582         self.verify_capture(self.pg1, pkts, TCP)
583         self.pg0.assert_nothing_captured(remark="packets forwarded")
584         self.pg2.assert_nothing_captured(remark="packets forwarded")
585
586     def test_iacl_proto_tcp_sport(self):
587         """ IP6 TCP source port iACL test
588
589         Test scenario for basic protocol ACL with TCP and sport
590             - Create IPv6 stream for pg0 -> pg1 interface.
591             - Create iACL with TCP IP protocol and defined sport.
592             - Send and verify received packets on pg1 interface.
593         """
594
595         # Basic iACL testing with TCP and sport
596         sport = 38
597         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
598                                   TCP(sport=sport, dport=5678))
599         self.pg0.add_stream(pkts)
600
601         key = 'nh_tcp_sport'
602         self.create_classify_table(
603             key, self.build_ip6_mask(nh='ff', src_port='ffff'))
604         self.create_classify_session(
605             self.acl_tbl_idx.get(key),
606             self.build_ip6_match(nh=socket.IPPROTO_TCP, src_port=sport))
607         self.input_acl_set_interface(
608             self.pg0, self.acl_tbl_idx.get(key))
609         self.acl_active_table = key
610
611         self.pg_enable_capture(self.pg_interfaces)
612         self.pg_start()
613
614         pkts = self.pg1.get_capture(len(pkts))
615         self.verify_capture(self.pg1, pkts, TCP)
616         self.pg0.assert_nothing_captured(remark="packets forwarded")
617         self.pg2.assert_nothing_captured(remark="packets forwarded")
618
619     def test_iacl_proto_tcp_dport(self):
620         """ IP6 TCP destination port iACL test
621
622         Test scenario for basic protocol ACL with TCP and dport
623             - Create IPv6 stream for pg0 -> pg1 interface.
624             - Create iACL with TCP IP protocol and defined dport.
625             - Send and verify received packets on pg1 interface.
626         """
627
628         # Basic iACL testing with TCP and dport
629         dport = 427
630         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
631                                   TCP(sport=1234, dport=dport))
632         self.pg0.add_stream(pkts)
633
634         key = 'nh_tcp_dport'
635         self.create_classify_table(
636             key, self.build_ip6_mask(nh='ff', dst_port='ffff'))
637         self.create_classify_session(
638             self.acl_tbl_idx.get(key),
639             self.build_ip6_match(nh=socket.IPPROTO_TCP, dst_port=dport))
640         self.input_acl_set_interface(
641             self.pg0, self.acl_tbl_idx.get(key))
642         self.acl_active_table = key
643
644         self.pg_enable_capture(self.pg_interfaces)
645         self.pg_start()
646
647         pkts = self.pg1.get_capture(len(pkts))
648         self.verify_capture(self.pg1, pkts, TCP)
649         self.pg0.assert_nothing_captured(remark="packets forwarded")
650         self.pg2.assert_nothing_captured(remark="packets forwarded")
651
652     def test_iacl_proto_tcp_sport_dport(self):
653         """ IP6 TCP source and destination ports iACL test
654
655         Test scenario for basic protocol ACL with TCP and sport and dport
656             - Create IPv6 stream for pg0 -> pg1 interface.
657             - Create iACL with TCP IP protocol and defined sport and dport.
658             - Send and verify received packets on pg1 interface.
659         """
660
661         # Basic iACL testing with TCP and sport and dport
662         sport = 13720
663         dport = 9080
664         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
665                                   TCP(sport=sport, dport=dport))
666         self.pg0.add_stream(pkts)
667
668         key = 'nh_tcp_ports'
669         self.create_classify_table(
670             key,
671             self.build_ip6_mask(nh='ff', src_port='ffff', dst_port='ffff'))
672         self.create_classify_session(
673             self.acl_tbl_idx.get(key),
674             self.build_ip6_match(nh=socket.IPPROTO_TCP, src_port=sport,
675                                  dst_port=dport))
676         self.input_acl_set_interface(
677             self.pg0, self.acl_tbl_idx.get(key))
678         self.acl_active_table = key
679
680         self.pg_enable_capture(self.pg_interfaces)
681         self.pg_start()
682
683         pkts = self.pg1.get_capture(len(pkts))
684         self.verify_capture(self.pg1, pkts, TCP)
685         self.pg0.assert_nothing_captured(remark="packets forwarded")
686         self.pg2.assert_nothing_captured(remark="packets forwarded")
687
688
689 class TestClassifierIP6Out(TestClassifier):
690     """ Classifier output IP6 Test Case """
691
692     @classmethod
693     def setUpClass(cls):
694         super(TestClassifierIP6Out, cls).setUpClass()
695
696     @classmethod
697     def tearDownClass(cls):
698         super(TestClassifierIP6Out, cls).tearDownClass()
699
700     def test_acl_ip_out(self):
701         """ Output IP6 ACL test
702
703         Test scenario for basic IP ACL with source IP
704             - Create IPv6 stream for pg1 -> pg0 interface.
705             - Create ACL with source IP address.
706             - Send and verify received packets on pg0 interface.
707         """
708
709         # Basic oACL testing with source IP
710         pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes)
711         self.pg1.add_stream(pkts)
712
713         key = 'ip6_out'
714         self.create_classify_table(
715             key,
716             self.build_ip6_mask(src_ip='ffffffffffffffffffffffffffffffff'),
717             data_offset=0)
718         self.create_classify_session(
719             self.acl_tbl_idx.get(key),
720             self.build_ip6_match(src_ip=self.pg1.remote_ip6))
721         self.output_acl_set_interface(
722             self.pg0, self.acl_tbl_idx.get(key))
723         self.acl_active_table = key
724
725         self.pg_enable_capture(self.pg_interfaces)
726         self.pg_start()
727
728         pkts = self.pg0.get_capture(len(pkts))
729         self.verify_capture(self.pg0, pkts)
730         self.pg1.assert_nothing_captured(remark="packets forwarded")
731         self.pg2.assert_nothing_captured(remark="packets forwarded")
732
733
734 class TestClassifierIP6MAC(TestClassifier):
735     """ Classifier IP6 MAC Test Case """
736
737     @classmethod
738     def setUpClass(cls):
739         super(TestClassifierIP6MAC, cls).setUpClass()
740
741     @classmethod
742     def tearDownClass(cls):
743         super(TestClassifierIP6MAC, cls).tearDownClass()
744
745     def test_acl_mac(self):
746         """ IP6 MAC iACL test
747
748         Test scenario for basic MAC ACL with source MAC
749             - Create IPv6 stream for pg0 -> pg2 interface.
750             - Create ACL with source MAC address.
751             - Send and verify received packets on pg2 interface.
752         """
753
754         # Basic iACL testing with source MAC
755         pkts = self.create_stream(self.pg0, self.pg2, self.pg_if_packet_sizes)
756         self.pg0.add_stream(pkts)
757
758         key = 'mac'
759         self.create_classify_table(
760             key, self.build_mac_mask(src_mac='ffffffffffff'), data_offset=-14)
761         self.create_classify_session(
762             self.acl_tbl_idx.get(key),
763             self.build_mac_match(src_mac=self.pg0.remote_mac))
764         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
765         self.acl_active_table = key
766
767         self.pg_enable_capture(self.pg_interfaces)
768         self.pg_start()
769
770         pkts = self.pg2.get_capture(len(pkts))
771         self.verify_capture(self.pg2, pkts)
772         self.pg0.assert_nothing_captured(remark="packets forwarded")
773         self.pg1.assert_nothing_captured(remark="packets forwarded")
774
775
776 if __name__ == '__main__':
777     unittest.main(testRunner=VppTestRunner)