IGMP: proxy device
[vpp.git] / test / test_classifier.py
1 #!/usr/bin/env python
2
3 import unittest
4 import socket
5 import binascii
6 import sys
7
8 from framework import VppTestCase, VppTestRunner
9
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether
12 from scapy.layers.inet import IP, UDP, TCP
13 from util import ppp
14
15
16 class TestClassifier(VppTestCase):
17     """ Classifier Test Case """
18
19     @classmethod
20     def setUpClass(cls):
21         """
22         Perform standard class setup (defined by class method setUpClass in
23         class VppTestCase) before running the test case, set test case related
24         variables and configure VPP.
25         """
26         super(TestClassifier, cls).setUpClass()
27         cls.acl_active_table = ''
28
29     def setUp(self):
30         """
31         Perform test setup before test case.
32
33         **Config:**
34             - create 4 pg interfaces
35                 - untagged pg0/pg1/pg2 interface
36                     pg0 -------> pg1 (IP ACL)
37                            \
38                             ---> pg2 (MAC ACL))
39                              \
40                               -> pg3 (PBR)
41             - setup interfaces:
42                 - put it into UP state
43                 - set IPv4 addresses
44                 - resolve neighbor address using ARP
45
46         :ivar list interfaces: pg interfaces.
47         :ivar list pg_if_packet_sizes: packet sizes in test.
48         :ivar dict acl_tbl_idx: ACL table index.
49         :ivar int pbr_vrfid: VRF id for PBR test.
50         """
51         self.reset_packet_infos()
52         super(TestClassifier, self).setUp()
53
54         # create 4 pg interfaces
55         self.create_pg_interfaces(range(4))
56
57         # packet sizes to test
58         self.pg_if_packet_sizes = [64, 9018]
59
60         self.interfaces = list(self.pg_interfaces)
61
62         # ACL & PBR vars
63         self.acl_tbl_idx = {}
64         self.pbr_vrfid = 200
65
66         # setup all interfaces
67         for intf in self.interfaces:
68             intf.admin_up()
69             intf.config_ip4()
70             intf.resolve_arp()
71
72     def tearDown(self):
73         """Run standard test teardown and acl related log."""
74         if not self.vpp_dead:
75             self.logger.info(self.vapi.ppcli("show inacl type ip4"))
76             self.logger.info(self.vapi.ppcli("show outacl type ip4"))
77             self.logger.info(self.vapi.cli("show classify table verbose"))
78             self.logger.info(self.vapi.cli("show ip fib"))
79             if self.acl_active_table == 'ip_out':
80                 self.output_acl_set_interface(
81                     self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
82                 self.acl_active_table = ''
83             elif self.acl_active_table != '':
84                 self.input_acl_set_interface(
85                     self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
86                 self.acl_active_table = ''
87             for intf in self.interfaces:
88                 intf.unconfig_ip4()
89                 intf.admin_down()
90
91         super(TestClassifier, self).tearDown()
92
93     def config_pbr_fib_entry(self, intf, is_add=1):
94         """Configure fib entry to route traffic toward PBR VRF table
95
96         :param VppInterface intf: destination interface to be routed for PBR.
97
98         """
99         addr_len = 24
100         self.vapi.ip_add_del_route(intf.local_ip4n,
101                                    addr_len,
102                                    intf.remote_ip4n,
103                                    table_id=self.pbr_vrfid,
104                                    is_add=is_add)
105
106     def create_stream(self, src_if, dst_if, packet_sizes,
107                       proto_l=UDP(sport=1234, dport=5678)):
108         """Create input packet stream for defined interfaces.
109
110         :param VppInterface src_if: Source Interface for packet stream.
111         :param VppInterface dst_if: Destination Interface for packet stream.
112         :param list packet_sizes: packet size to test.
113         :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
114         """
115         pkts = []
116
117         for size in packet_sizes:
118             info = self.create_packet_info(src_if, dst_if)
119             payload = self.info_to_payload(info)
120             p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
121                  IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) /
122                  proto_l /
123                  Raw(payload))
124             info.data = p.copy()
125             self.extend_packet(p, size)
126             pkts.append(p)
127         return pkts
128
129     def verify_capture(self, dst_if, capture, proto_l=UDP):
130         """Verify captured input packet stream for defined interface.
131
132         :param VppInterface dst_if: Interface to verify captured packet stream.
133         :param list capture: Captured packet stream.
134         :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
135         """
136         self.logger.info("Verifying capture on interface %s" % dst_if.name)
137         last_info = dict()
138         for i in self.interfaces:
139             last_info[i.sw_if_index] = None
140         dst_sw_if_index = dst_if.sw_if_index
141         for packet in capture:
142             try:
143                 ip_received = packet[IP]
144                 proto_received = packet[proto_l]
145                 payload_info = self.payload_to_info(str(packet[Raw]))
146                 packet_index = payload_info.index
147                 self.assertEqual(payload_info.dst, dst_sw_if_index)
148                 self.logger.debug(
149                     "Got packet on port %s: src=%u (id=%u)" %
150                     (dst_if.name, payload_info.src, packet_index))
151                 next_info = self.get_next_packet_info_for_interface2(
152                     payload_info.src, dst_sw_if_index,
153                     last_info[payload_info.src])
154                 last_info[payload_info.src] = next_info
155                 self.assertTrue(next_info is not None)
156                 self.assertEqual(packet_index, next_info.index)
157                 saved_packet = next_info.data
158                 ip_saved = saved_packet[IP]
159                 proto_saved = saved_packet[proto_l]
160                 # Check standard fields
161                 self.assertEqual(ip_received.src, ip_saved.src)
162                 self.assertEqual(ip_received.dst, ip_saved.dst)
163                 self.assertEqual(proto_received.sport, proto_saved.sport)
164                 self.assertEqual(proto_received.dport, proto_saved.dport)
165             except:
166                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
167                 raise
168         for i in self.interfaces:
169             remaining_packet = self.get_next_packet_info_for_interface2(
170                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
171             self.assertTrue(remaining_packet is None,
172                             "Interface %s: Packet expected from interface %s "
173                             "didn't arrive" % (dst_if.name, i.name))
174
175     def verify_vrf(self, vrf_id):
176         """
177         Check if the FIB table / VRF ID is configured.
178
179         :param int vrf_id: The FIB table / VRF ID to be verified.
180         :return: 1 if the FIB table / VRF ID is configured, otherwise return 0.
181         """
182         ip_fib_dump = self.vapi.ip_fib_dump()
183         vrf_count = 0
184         for ip_fib_details in ip_fib_dump:
185             if ip_fib_details[2] == vrf_id:
186                 vrf_count += 1
187         if vrf_count == 0:
188             self.logger.info("IPv4 VRF ID %d is not configured" % vrf_id)
189             return 0
190         else:
191             self.logger.info("IPv4 VRF ID %d is configured" % vrf_id)
192             return 1
193
194     @staticmethod
195     def build_ip_mask(proto='', src_ip='', dst_ip='',
196                       src_port='', dst_port=''):
197         """Build IP ACL mask data with hexstring format.
198
199         :param str proto: protocol number <0-ff>
200         :param str src_ip: source ip address <0-ffffffff>
201         :param str dst_ip: destination ip address <0-ffffffff>
202         :param str src_port: source port number <0-ffff>
203         :param str dst_port: destination port number <0-ffff>
204         """
205
206         return ('{:0>20}{:0>12}{:0>8}{:0>4}{:0>4}'.format(
207             proto, src_ip, dst_ip, src_port, dst_port)).rstrip('0')
208
209     @staticmethod
210     def build_ip_match(proto=0, src_ip='', dst_ip='',
211                        src_port=0, dst_port=0):
212         """Build IP ACL match data with hexstring format.
213
214         :param int proto: protocol number with valid option "x"
215         :param str src_ip: source ip address with format of "x.x.x.x"
216         :param str dst_ip: destination ip address with format of "x.x.x.x"
217         :param int src_port: source port number "x"
218         :param int dst_port: destination port number "x"
219         """
220         if src_ip:
221             src_ip = socket.inet_aton(src_ip).encode('hex')
222         if dst_ip:
223             dst_ip = socket.inet_aton(dst_ip).encode('hex')
224
225         return ('{:0>20}{:0>12}{:0>8}{:0>4}{:0>4}'.format(
226             hex(proto)[2:], src_ip, dst_ip, hex(src_port)[2:],
227             hex(dst_port)[2:])).rstrip('0')
228
229     @staticmethod
230     def build_mac_mask(dst_mac='', src_mac='', ether_type=''):
231         """Build MAC ACL mask data with hexstring format.
232
233         :param str dst_mac: source MAC address <0-ffffffffffff>
234         :param str src_mac: destination MAC address <0-ffffffffffff>
235         :param str ether_type: ethernet type <0-ffff>
236         """
237
238         return ('{:0>12}{:0>12}{:0>4}'.format(dst_mac, src_mac,
239                                               ether_type)).rstrip('0')
240
241     @staticmethod
242     def build_mac_match(dst_mac='', src_mac='', ether_type=''):
243         """Build MAC ACL match data with hexstring format.
244
245         :param str dst_mac: source MAC address <x:x:x:x:x:x>
246         :param str src_mac: destination MAC address <x:x:x:x:x:x>
247         :param str ether_type: ethernet type <0-ffff>
248         """
249         if dst_mac:
250             dst_mac = dst_mac.replace(':', '')
251         if src_mac:
252             src_mac = src_mac.replace(':', '')
253
254         return ('{:0>12}{:0>12}{:0>4}'.format(dst_mac, src_mac,
255                                               ether_type)).rstrip('0')
256
257     def create_classify_table(self, key, mask, data_offset=0):
258         """Create Classify Table
259
260         :param str key: key for classify table (ex, ACL name).
261         :param str mask: mask value for interested traffic.
262         :param int data_offset:
263         """
264         r = self.vapi.classify_add_del_table(
265             is_add=1,
266             mask=binascii.unhexlify(mask),
267             match_n_vectors=(len(mask) - 1) // 32 + 1,
268             miss_next_index=0,
269             current_data_flag=1,
270             current_data_offset=data_offset)
271         self.assertIsNotNone(r, msg='No response msg for add_del_table')
272         self.acl_tbl_idx[key] = r.new_table_index
273
274     def create_classify_session(self, table_index, match, pbr_option=0,
275                                 vrfid=0, is_add=1):
276         """Create Classify Session
277
278         :param int table_index: table index to identify classify table.
279         :param str match: matched value for interested traffic.
280         :param int pbr_option: enable/disable PBR feature.
281         :param int vrfid: VRF id.
282         :param int is_add: option to configure classify session.
283             - create(1) or delete(0)
284         """
285         r = self.vapi.classify_add_del_session(
286             is_add,
287             table_index,
288             binascii.unhexlify(match),
289             opaque_index=0,
290             action=pbr_option,
291             metadata=vrfid)
292         self.assertIsNotNone(r, msg='No response msg for add_del_session')
293
294     def input_acl_set_interface(self, intf, table_index, is_add=1):
295         """Configure Input ACL interface
296
297         :param VppInterface intf: Interface to apply Input ACL feature.
298         :param int table_index: table index to identify classify table.
299         :param int is_add: option to configure classify session.
300             - enable(1) or disable(0)
301         """
302         r = self.vapi.input_acl_set_interface(
303             is_add,
304             intf.sw_if_index,
305             ip4_table_index=table_index)
306         self.assertIsNotNone(r, msg='No response msg for acl_set_interface')
307
308     def output_acl_set_interface(self, intf, table_index, is_add=1):
309         """Configure Output ACL interface
310
311         :param VppInterface intf: Interface to apply Output ACL feature.
312         :param int table_index: table index to identify classify table.
313         :param int is_add: option to configure classify session.
314             - enable(1) or disable(0)
315         """
316         r = self.vapi.output_acl_set_interface(
317             is_add,
318             intf.sw_if_index,
319             ip4_table_index=table_index)
320         self.assertIsNotNone(r, msg='No response msg for acl_set_interface')
321
322
323 # Tests split to different test case classes because of issue reported in
324 # ticket VPP-1336
325 class TestClassifierIP(TestClassifier):
326     """ Classifier IP Test Case """
327
328     def test_iacl_src_ip(self):
329         """ Source IP iACL test
330
331         Test scenario for basic IP ACL with source IP
332             - Create IPv4 stream for pg0 -> pg1 interface.
333             - Create iACL with source IP address.
334             - Send and verify received packets on pg1 interface.
335         """
336
337         # Basic iACL testing with source IP
338         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
339         self.pg0.add_stream(pkts)
340
341         key = 'ip_src'
342         self.create_classify_table(key, self.build_ip_mask(src_ip='ffffffff'))
343         self.create_classify_session(
344             self.acl_tbl_idx.get(key),
345             self.build_ip_match(src_ip=self.pg0.remote_ip4))
346         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
347         self.acl_active_table = key
348
349         self.pg_enable_capture(self.pg_interfaces)
350         self.pg_start()
351
352         pkts = self.pg1.get_capture(len(pkts))
353         self.verify_capture(self.pg1, pkts)
354         self.pg0.assert_nothing_captured(remark="packets forwarded")
355         self.pg2.assert_nothing_captured(remark="packets forwarded")
356         self.pg3.assert_nothing_captured(remark="packets forwarded")
357
358     def test_iacl_dst_ip(self):
359         """ Destination IP iACL test
360
361         Test scenario for basic IP ACL with destination IP
362             - Create IPv4 stream for pg0 -> pg1 interface.
363             - Create iACL with destination IP address.
364             - Send and verify received packets on pg1 interface.
365         """
366
367         # Basic iACL testing with destination IP
368         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
369         self.pg0.add_stream(pkts)
370
371         key = 'ip_dst'
372         self.create_classify_table(key, self.build_ip_mask(dst_ip='ffffffff'))
373         self.create_classify_session(
374             self.acl_tbl_idx.get(key),
375             self.build_ip_match(dst_ip=self.pg1.remote_ip4))
376         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
377         self.acl_active_table = key
378
379         self.pg_enable_capture(self.pg_interfaces)
380         self.pg_start()
381
382         pkts = self.pg1.get_capture(len(pkts))
383         self.verify_capture(self.pg1, pkts)
384         self.pg0.assert_nothing_captured(remark="packets forwarded")
385         self.pg2.assert_nothing_captured(remark="packets forwarded")
386         self.pg3.assert_nothing_captured(remark="packets forwarded")
387
388     def test_iacl_src_dst_ip(self):
389         """ Source and destination IP iACL test
390
391         Test scenario for basic IP ACL with source and destination IP
392             - Create IPv4 stream for pg0 -> pg1 interface.
393             - Create iACL with source and destination IP addresses.
394             - Send and verify received packets on pg1 interface.
395         """
396
397         # Basic iACL testing with source and destination IP
398         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
399         self.pg0.add_stream(pkts)
400
401         key = 'ip'
402         self.create_classify_table(
403             key, self.build_ip_mask(src_ip='ffffffff', dst_ip='ffffffff'))
404         self.create_classify_session(
405             self.acl_tbl_idx.get(key),
406             self.build_ip_match(src_ip=self.pg0.remote_ip4,
407                                 dst_ip=self.pg1.remote_ip4))
408         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
409         self.acl_active_table = key
410
411         self.pg_enable_capture(self.pg_interfaces)
412         self.pg_start()
413
414         pkts = self.pg1.get_capture(len(pkts))
415         self.verify_capture(self.pg1, pkts)
416         self.pg0.assert_nothing_captured(remark="packets forwarded")
417         self.pg2.assert_nothing_captured(remark="packets forwarded")
418         self.pg3.assert_nothing_captured(remark="packets forwarded")
419
420
421 class TestClassifierUDP(TestClassifier):
422     """ Classifier UDP proto Test Case """
423
424     def test_iacl_proto_udp(self):
425         """ UDP protocol iACL test
426
427         Test scenario for basic protocol ACL with UDP protocol
428             - Create IPv4 stream for pg0 -> pg1 interface.
429             - Create iACL with UDP IP protocol.
430             - Send and verify received packets on pg1 interface.
431         """
432
433         # Basic iACL testing with UDP protocol
434         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
435         self.pg0.add_stream(pkts)
436
437         key = 'proto_udp'
438         self.create_classify_table(key, self.build_ip_mask(proto='ff'))
439         self.create_classify_session(
440             self.acl_tbl_idx.get(key),
441             self.build_ip_match(proto=socket.IPPROTO_UDP))
442         self.input_acl_set_interface(
443             self.pg0, self.acl_tbl_idx.get(key))
444         self.acl_active_table = key
445
446         self.pg_enable_capture(self.pg_interfaces)
447         self.pg_start()
448
449         pkts = self.pg1.get_capture(len(pkts))
450         self.verify_capture(self.pg1, pkts)
451         self.pg0.assert_nothing_captured(remark="packets forwarded")
452         self.pg2.assert_nothing_captured(remark="packets forwarded")
453         self.pg3.assert_nothing_captured(remark="packets forwarded")
454
455     def test_iacl_proto_udp_sport(self):
456         """ UDP source port iACL test
457
458         Test scenario for basic protocol ACL with UDP and sport
459             - Create IPv4 stream for pg0 -> pg1 interface.
460             - Create iACL with UDP IP protocol and defined sport.
461             - Send and verify received packets on pg1 interface.
462         """
463
464         # Basic iACL testing with UDP and sport
465         sport = 38
466         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
467                                   UDP(sport=sport, dport=5678))
468         self.pg0.add_stream(pkts)
469
470         key = 'proto_udp_sport'
471         self.create_classify_table(
472             key, self.build_ip_mask(proto='ff', src_port='ffff'))
473         self.create_classify_session(
474             self.acl_tbl_idx.get(key),
475             self.build_ip_match(proto=socket.IPPROTO_UDP, src_port=sport))
476         self.input_acl_set_interface(
477             self.pg0, self.acl_tbl_idx.get(key))
478         self.acl_active_table = key
479
480         self.pg_enable_capture(self.pg_interfaces)
481         self.pg_start()
482
483         pkts = self.pg1.get_capture(len(pkts))
484         self.verify_capture(self.pg1, pkts)
485         self.pg0.assert_nothing_captured(remark="packets forwarded")
486         self.pg2.assert_nothing_captured(remark="packets forwarded")
487         self.pg3.assert_nothing_captured(remark="packets forwarded")
488
489     def test_iacl_proto_udp_dport(self):
490         """ UDP destination port iACL test
491
492         Test scenario for basic protocol ACL with UDP and dport
493             - Create IPv4 stream for pg0 -> pg1 interface.
494             - Create iACL with UDP IP protocol and defined dport.
495             - Send and verify received packets on pg1 interface.
496         """
497
498         # Basic iACL testing with UDP and dport
499         dport = 427
500         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
501                                   UDP(sport=1234, dport=dport))
502         self.pg0.add_stream(pkts)
503
504         key = 'proto_udp_dport'
505         self.create_classify_table(
506             key, self.build_ip_mask(proto='ff', dst_port='ffff'))
507         self.create_classify_session(
508             self.acl_tbl_idx.get(key),
509             self.build_ip_match(proto=socket.IPPROTO_UDP, dst_port=dport))
510         self.input_acl_set_interface(
511             self.pg0, self.acl_tbl_idx.get(key))
512         self.acl_active_table = key
513
514         self.pg_enable_capture(self.pg_interfaces)
515         self.pg_start()
516
517         pkts = self.pg1.get_capture(len(pkts))
518         self.verify_capture(self.pg1, pkts)
519         self.pg0.assert_nothing_captured(remark="packets forwarded")
520         self.pg2.assert_nothing_captured(remark="packets forwarded")
521         self.pg3.assert_nothing_captured(remark="packets forwarded")
522
523     def test_iacl_proto_udp_sport_dport(self):
524         """ UDP source and destination ports iACL test
525
526         Test scenario for basic protocol ACL with UDP and sport and dport
527             - Create IPv4 stream for pg0 -> pg1 interface.
528             - Create iACL with UDP IP protocol and defined sport and dport.
529             - Send and verify received packets on pg1 interface.
530         """
531
532         # Basic iACL testing with UDP and sport and dport
533         sport = 13720
534         dport = 9080
535         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
536                                   UDP(sport=sport, dport=dport))
537         self.pg0.add_stream(pkts)
538
539         key = 'proto_udp_ports'
540         self.create_classify_table(
541             key,
542             self.build_ip_mask(proto='ff', src_port='ffff', dst_port='ffff'))
543         self.create_classify_session(
544             self.acl_tbl_idx.get(key),
545             self.build_ip_match(proto=socket.IPPROTO_UDP, src_port=sport,
546                                 dst_port=dport))
547         self.input_acl_set_interface(
548             self.pg0, self.acl_tbl_idx.get(key))
549         self.acl_active_table = key
550
551         self.pg_enable_capture(self.pg_interfaces)
552         self.pg_start()
553
554         pkts = self.pg1.get_capture(len(pkts))
555         self.verify_capture(self.pg1, pkts)
556         self.pg0.assert_nothing_captured(remark="packets forwarded")
557         self.pg2.assert_nothing_captured(remark="packets forwarded")
558         self.pg3.assert_nothing_captured(remark="packets forwarded")
559
560
561 class TestClassifierTCP(TestClassifier):
562     """ Classifier TCP proto Test Case """
563
564     def test_iacl_proto_tcp(self):
565         """ TCP protocol iACL test
566
567         Test scenario for basic protocol ACL with TCP protocol
568             - Create IPv4 stream for pg0 -> pg1 interface.
569             - Create iACL with TCP IP protocol.
570             - Send and verify received packets on pg1 interface.
571         """
572
573         # Basic iACL testing with TCP protocol
574         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
575                                   TCP(sport=1234, dport=5678))
576         self.pg0.add_stream(pkts)
577
578         key = 'proto_tcp'
579         self.create_classify_table(key, self.build_ip_mask(proto='ff'))
580         self.create_classify_session(
581             self.acl_tbl_idx.get(key),
582             self.build_ip_match(proto=socket.IPPROTO_TCP))
583         self.input_acl_set_interface(
584             self.pg0, self.acl_tbl_idx.get(key))
585         self.acl_active_table = key
586
587         self.pg_enable_capture(self.pg_interfaces)
588         self.pg_start()
589
590         pkts = self.pg1.get_capture(len(pkts))
591         self.verify_capture(self.pg1, pkts, TCP)
592         self.pg0.assert_nothing_captured(remark="packets forwarded")
593         self.pg2.assert_nothing_captured(remark="packets forwarded")
594         self.pg3.assert_nothing_captured(remark="packets forwarded")
595
596     def test_iacl_proto_tcp_sport(self):
597         """ TCP source port iACL test
598
599         Test scenario for basic protocol ACL with TCP and sport
600             - Create IPv4 stream for pg0 -> pg1 interface.
601             - Create iACL with TCP IP protocol and defined sport.
602             - Send and verify received packets on pg1 interface.
603         """
604
605         # Basic iACL testing with TCP and sport
606         sport = 38
607         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
608                                   TCP(sport=sport, dport=5678))
609         self.pg0.add_stream(pkts)
610
611         key = 'proto_tcp_sport'
612         self.create_classify_table(
613             key, self.build_ip_mask(proto='ff', src_port='ffff'))
614         self.create_classify_session(
615             self.acl_tbl_idx.get(key),
616             self.build_ip_match(proto=socket.IPPROTO_TCP, src_port=sport))
617         self.input_acl_set_interface(
618             self.pg0, self.acl_tbl_idx.get(key))
619         self.acl_active_table = key
620
621         self.pg_enable_capture(self.pg_interfaces)
622         self.pg_start()
623
624         pkts = self.pg1.get_capture(len(pkts))
625         self.verify_capture(self.pg1, pkts, TCP)
626         self.pg0.assert_nothing_captured(remark="packets forwarded")
627         self.pg2.assert_nothing_captured(remark="packets forwarded")
628         self.pg3.assert_nothing_captured(remark="packets forwarded")
629
630     def test_iacl_proto_tcp_dport(self):
631         """ TCP destination port iACL test
632
633         Test scenario for basic protocol ACL with TCP and dport
634             - Create IPv4 stream for pg0 -> pg1 interface.
635             - Create iACL with TCP IP protocol and defined dport.
636             - Send and verify received packets on pg1 interface.
637         """
638
639         # Basic iACL testing with TCP and dport
640         dport = 427
641         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
642                                   TCP(sport=1234, dport=dport))
643         self.pg0.add_stream(pkts)
644
645         key = 'proto_tcp_sport'
646         self.create_classify_table(
647             key, self.build_ip_mask(proto='ff', dst_port='ffff'))
648         self.create_classify_session(
649             self.acl_tbl_idx.get(key),
650             self.build_ip_match(proto=socket.IPPROTO_TCP, dst_port=dport))
651         self.input_acl_set_interface(
652             self.pg0, self.acl_tbl_idx.get(key))
653         self.acl_active_table = key
654
655         self.pg_enable_capture(self.pg_interfaces)
656         self.pg_start()
657
658         pkts = self.pg1.get_capture(len(pkts))
659         self.verify_capture(self.pg1, pkts, TCP)
660         self.pg0.assert_nothing_captured(remark="packets forwarded")
661         self.pg2.assert_nothing_captured(remark="packets forwarded")
662         self.pg3.assert_nothing_captured(remark="packets forwarded")
663
664     def test_iacl_proto_tcp_sport_dport(self):
665         """ TCP source and destination ports iACL test
666
667         Test scenario for basic protocol ACL with TCP and sport and dport
668             - Create IPv4 stream for pg0 -> pg1 interface.
669             - Create iACL with TCP IP protocol and defined sport and dport.
670             - Send and verify received packets on pg1 interface.
671         """
672
673         # Basic iACL testing with TCP and sport and dport
674         sport = 13720
675         dport = 9080
676         pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
677                                   TCP(sport=sport, dport=dport))
678         self.pg0.add_stream(pkts)
679
680         key = 'proto_tcp_ports'
681         self.create_classify_table(
682             key,
683             self.build_ip_mask(proto='ff', src_port='ffff', dst_port='ffff'))
684         self.create_classify_session(
685             self.acl_tbl_idx.get(key),
686             self.build_ip_match(proto=socket.IPPROTO_TCP, src_port=sport,
687                                 dst_port=dport))
688         self.input_acl_set_interface(
689             self.pg0, self.acl_tbl_idx.get(key))
690         self.acl_active_table = key
691
692         self.pg_enable_capture(self.pg_interfaces)
693         self.pg_start()
694
695         pkts = self.pg1.get_capture(len(pkts))
696         self.verify_capture(self.pg1, pkts, TCP)
697         self.pg0.assert_nothing_captured(remark="packets forwarded")
698         self.pg2.assert_nothing_captured(remark="packets forwarded")
699         self.pg3.assert_nothing_captured(remark="packets forwarded")
700
701
702 class TestClassifierIPOut(TestClassifier):
703     """ Classifier output IP Test Case """
704
705     def test_acl_ip_out(self):
706         """ Output IP ACL test
707
708         Test scenario for basic IP ACL with source IP
709             - Create IPv4 stream for pg1 -> pg0 interface.
710             - Create ACL with source IP address.
711             - Send and verify received packets on pg0 interface.
712         """
713
714         # Basic oACL testing with source IP
715         pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes)
716         self.pg1.add_stream(pkts)
717
718         key = 'ip_out'
719         self.create_classify_table(
720             key, self.build_ip_mask(src_ip='ffffffff'), data_offset=0)
721         self.create_classify_session(
722             self.acl_tbl_idx.get(key),
723             self.build_ip_match(src_ip=self.pg1.remote_ip4))
724         self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
725         self.acl_active_table = key
726
727         self.pg_enable_capture(self.pg_interfaces)
728         self.pg_start()
729
730         pkts = self.pg0.get_capture(len(pkts))
731         self.verify_capture(self.pg0, pkts)
732         self.pg1.assert_nothing_captured(remark="packets forwarded")
733         self.pg2.assert_nothing_captured(remark="packets forwarded")
734         self.pg3.assert_nothing_captured(remark="packets forwarded")
735
736
737 class TestClassifierMAC(TestClassifier):
738     """ Classifier MAC Test Case """
739
740     def test_acl_mac(self):
741         """ MAC ACL test
742
743         Test scenario for basic MAC ACL with source MAC
744             - Create IPv4 stream for pg0 -> pg2 interface.
745             - Create ACL with source MAC address.
746             - Send and verify received packets on pg2 interface.
747         """
748
749         # Basic iACL testing with source MAC
750         pkts = self.create_stream(self.pg0, self.pg2, self.pg_if_packet_sizes)
751         self.pg0.add_stream(pkts)
752
753         key = 'mac'
754         self.create_classify_table(
755             key, self.build_mac_mask(src_mac='ffffffffffff'), data_offset=-14)
756         self.create_classify_session(
757             self.acl_tbl_idx.get(key),
758             self.build_mac_match(src_mac=self.pg0.remote_mac))
759         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
760         self.acl_active_table = key
761
762         self.pg_enable_capture(self.pg_interfaces)
763         self.pg_start()
764
765         pkts = self.pg2.get_capture(len(pkts))
766         self.verify_capture(self.pg2, pkts)
767         self.pg0.assert_nothing_captured(remark="packets forwarded")
768         self.pg1.assert_nothing_captured(remark="packets forwarded")
769         self.pg3.assert_nothing_captured(remark="packets forwarded")
770
771
772 class TestClassifierPBR(TestClassifier):
773     """ Classifier PBR Test Case """
774
775     def test_acl_pbr(self):
776         """ IP PBR test
777
778         Test scenario for PBR with source IP
779             - Create IPv4 stream for pg0 -> pg3 interface.
780             - Configure PBR fib entry for packet forwarding.
781             - Send and verify received packets on pg3 interface.
782         """
783
784         # PBR testing with source IP
785         pkts = self.create_stream(self.pg0, self.pg3, self.pg_if_packet_sizes)
786         self.pg0.add_stream(pkts)
787
788         key = 'pbr'
789         self.create_classify_table(key, self.build_ip_mask(src_ip='ffffffff'))
790         pbr_option = 1
791         # this will create the VRF/table in which we will insert the route
792         self.create_classify_session(
793             self.acl_tbl_idx.get(key),
794             self.build_ip_match(src_ip=self.pg0.remote_ip4),
795             pbr_option, self.pbr_vrfid)
796         self.assertTrue(self.verify_vrf(self.pbr_vrfid))
797         self.config_pbr_fib_entry(self.pg3)
798         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
799
800         self.pg_enable_capture(self.pg_interfaces)
801         self.pg_start()
802
803         pkts = self.pg3.get_capture(len(pkts))
804         self.verify_capture(self.pg3, pkts)
805         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key), 0)
806         self.pg0.assert_nothing_captured(remark="packets forwarded")
807         self.pg1.assert_nothing_captured(remark="packets forwarded")
808         self.pg2.assert_nothing_captured(remark="packets forwarded")
809
810         # remove the classify session and the route
811         self.config_pbr_fib_entry(self.pg3, is_add=0)
812         self.create_classify_session(
813             self.acl_tbl_idx.get(key),
814             self.build_ip_match(src_ip=self.pg0.remote_ip4),
815             pbr_option, self.pbr_vrfid, is_add=0)
816
817         # and the table should be gone.
818         self.assertFalse(self.verify_vrf(self.pbr_vrfid))
819
820 if __name__ == '__main__':
821     unittest.main(testRunner=VppTestRunner)