IGMP: proxy device
[vpp.git] / test / test_acl_plugin.py
1 #!/usr/bin/env python
2 """ACL plugin Test Case HLD:
3 """
4
5 import unittest
6 import random
7
8 from scapy.packet import Raw
9 from scapy.layers.l2 import Ether
10 from scapy.layers.inet import IP, TCP, UDP, ICMP
11 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
12 from scapy.layers.inet6 import IPv6ExtHdrFragment
13 from framework import VppTestCase, VppTestRunner
14 from util import Host, ppp
15
16 from vpp_lo_interface import VppLoInterface
17
18
19 class TestACLplugin(VppTestCase):
20     """ ACL plugin Test Case """
21
22     # traffic types
23     IP = 0
24     ICMP = 1
25
26     # IP version
27     IPRANDOM = -1
28     IPV4 = 0
29     IPV6 = 1
30
31     # rule types
32     DENY = 0
33     PERMIT = 1
34
35     # supported protocols
36     proto = [[6, 17], [1, 58]]
37     proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
38     ICMPv4 = 0
39     ICMPv6 = 1
40     TCP = 0
41     UDP = 1
42     PROTO_ALL = 0
43
44     # port ranges
45     PORTS_ALL = -1
46     PORTS_RANGE = 0
47     PORTS_RANGE_2 = 1
48     udp_sport_from = 10
49     udp_sport_to = udp_sport_from + 5
50     udp_dport_from = 20000
51     udp_dport_to = udp_dport_from + 5000
52     tcp_sport_from = 30
53     tcp_sport_to = tcp_sport_from + 5
54     tcp_dport_from = 40000
55     tcp_dport_to = tcp_dport_from + 5000
56
57     udp_sport_from_2 = 90
58     udp_sport_to_2 = udp_sport_from_2 + 5
59     udp_dport_from_2 = 30000
60     udp_dport_to_2 = udp_dport_from_2 + 5000
61     tcp_sport_from_2 = 130
62     tcp_sport_to_2 = tcp_sport_from_2 + 5
63     tcp_dport_from_2 = 20000
64     tcp_dport_to_2 = tcp_dport_from_2 + 5000
65
66     icmp4_type = 8  # echo request
67     icmp4_code = 3
68     icmp6_type = 128  # echo request
69     icmp6_code = 3
70
71     icmp4_type_2 = 8
72     icmp4_code_from_2 = 5
73     icmp4_code_to_2 = 20
74     icmp6_type_2 = 128
75     icmp6_code_from_2 = 8
76     icmp6_code_to_2 = 42
77
78     # Test variables
79     bd_id = 1
80
81     @classmethod
82     def setUpClass(cls):
83         """
84         Perform standard class setup (defined by class method setUpClass in
85         class VppTestCase) before running the test case, set test case related
86         variables and configure VPP.
87         """
88         super(TestACLplugin, cls).setUpClass()
89
90         try:
91             # Create 2 pg interfaces
92             cls.create_pg_interfaces(range(2))
93
94             # Packet flows mapping pg0 -> pg1, pg2 etc.
95             cls.flows = dict()
96             cls.flows[cls.pg0] = [cls.pg1]
97
98             # Packet sizes
99             cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
100
101             # Create BD with MAC learning and unknown unicast flooding disabled
102             # and put interfaces to this BD
103             cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
104                                            learn=1)
105             for pg_if in cls.pg_interfaces:
106                 cls.vapi.sw_interface_set_l2_bridge(pg_if.sw_if_index,
107                                                     bd_id=cls.bd_id)
108
109             # Set up all interfaces
110             for i in cls.pg_interfaces:
111                 i.admin_up()
112
113             # Mapping between packet-generator index and lists of test hosts
114             cls.hosts_by_pg_idx = dict()
115             for pg_if in cls.pg_interfaces:
116                 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
117
118             # Create list of deleted hosts
119             cls.deleted_hosts_by_pg_idx = dict()
120             for pg_if in cls.pg_interfaces:
121                 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
122
123             # warm-up the mac address tables
124             # self.warmup_test()
125             count = 16
126             start = 0
127             n_int = len(cls.pg_interfaces)
128             macs_per_if = count / n_int
129             i = -1
130             for pg_if in cls.pg_interfaces:
131                 i += 1
132                 start_nr = macs_per_if * i + start
133                 end_nr = count + start if i == (n_int - 1) \
134                     else macs_per_if * (i + 1) + start
135                 hosts = cls.hosts_by_pg_idx[pg_if.sw_if_index]
136                 for j in range(start_nr, end_nr):
137                     host = Host(
138                         "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
139                         "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
140                         "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
141                     hosts.append(host)
142
143         except Exception:
144             super(TestACLplugin, cls).tearDownClass()
145             raise
146
147     def setUp(self):
148         super(TestACLplugin, self).setUp()
149         self.reset_packet_infos()
150
151     def tearDown(self):
152         """
153         Show various debug prints after each test.
154         """
155         super(TestACLplugin, self).tearDown()
156         if not self.vpp_dead:
157             self.logger.info(self.vapi.ppcli("show l2fib verbose"))
158             self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
159             self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
160             self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
161             self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
162                                              % self.bd_id))
163
164     def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
165                     s_prefix=0, s_ip='\x00\x00\x00\x00',
166                     d_prefix=0, d_ip='\x00\x00\x00\x00'):
167         if proto == -1:
168             return
169         if ports == self.PORTS_ALL:
170             sport_from = 0
171             dport_from = 0
172             sport_to = 65535 if proto != 1 and proto != 58 else 255
173             dport_to = sport_to
174         elif ports == self.PORTS_RANGE:
175             if proto == 1:
176                 sport_from = self.icmp4_type
177                 sport_to = self.icmp4_type
178                 dport_from = self.icmp4_code
179                 dport_to = self.icmp4_code
180             elif proto == 58:
181                 sport_from = self.icmp6_type
182                 sport_to = self.icmp6_type
183                 dport_from = self.icmp6_code
184                 dport_to = self.icmp6_code
185             elif proto == self.proto[self.IP][self.TCP]:
186                 sport_from = self.tcp_sport_from
187                 sport_to = self.tcp_sport_to
188                 dport_from = self.tcp_dport_from
189                 dport_to = self.tcp_dport_to
190             elif proto == self.proto[self.IP][self.UDP]:
191                 sport_from = self.udp_sport_from
192                 sport_to = self.udp_sport_to
193                 dport_from = self.udp_dport_from
194                 dport_to = self.udp_dport_to
195         elif ports == self.PORTS_RANGE_2:
196             if proto == 1:
197                 sport_from = self.icmp4_type_2
198                 sport_to = self.icmp4_type_2
199                 dport_from = self.icmp4_code_from_2
200                 dport_to = self.icmp4_code_to_2
201             elif proto == 58:
202                 sport_from = self.icmp6_type_2
203                 sport_to = self.icmp6_type_2
204                 dport_from = self.icmp6_code_from_2
205                 dport_to = self.icmp6_code_to_2
206             elif proto == self.proto[self.IP][self.TCP]:
207                 sport_from = self.tcp_sport_from_2
208                 sport_to = self.tcp_sport_to_2
209                 dport_from = self.tcp_dport_from_2
210                 dport_to = self.tcp_dport_to_2
211             elif proto == self.proto[self.IP][self.UDP]:
212                 sport_from = self.udp_sport_from_2
213                 sport_to = self.udp_sport_to_2
214                 dport_from = self.udp_dport_from_2
215                 dport_to = self.udp_dport_to_2
216         else:
217             sport_from = ports
218             sport_to = ports
219             dport_from = ports
220             dport_to = ports
221
222         rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto,
223                  'srcport_or_icmptype_first': sport_from,
224                  'srcport_or_icmptype_last': sport_to,
225                  'src_ip_prefix_len': s_prefix,
226                  'src_ip_addr': s_ip,
227                  'dstport_or_icmpcode_first': dport_from,
228                  'dstport_or_icmpcode_last': dport_to,
229                  'dst_ip_prefix_len': d_prefix,
230                  'dst_ip_addr': d_ip})
231         return rule
232
233     def apply_rules(self, rules, tag=''):
234         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
235                                           tag=tag)
236         self.logger.info("Dumped ACL: " + str(
237             self.vapi.acl_dump(reply.acl_index)))
238         # Apply a ACL on the interface as inbound
239         for i in self.pg_interfaces:
240             self.vapi.acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
241                                                  n_input=1,
242                                                  acls=[reply.acl_index])
243         return
244
245     def apply_rules_to(self, rules, tag='', sw_if_index=0xFFFFFFFF):
246         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
247                                           tag=tag)
248         self.logger.info("Dumped ACL: " + str(
249             self.vapi.acl_dump(reply.acl_index)))
250         # Apply a ACL on the interface as inbound
251         self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index,
252                                              n_input=1,
253                                              acls=[reply.acl_index])
254         return
255
256     def etype_whitelist(self, whitelist, n_input):
257         # Apply whitelists on all the interfaces
258         for i in self.pg_interfaces:
259             # checkstyle can't read long names. Help them.
260             fun = self.vapi.acl_interface_set_etype_whitelist
261             fun(sw_if_index=i.sw_if_index, n_input=n_input,
262                 whitelist=whitelist)
263         return
264
265     def create_upper_layer(self, packet_index, proto, ports=0):
266         p = self.proto_map[proto]
267         if p == 'UDP':
268             if ports == 0:
269                 return UDP(sport=random.randint(self.udp_sport_from,
270                                                 self.udp_sport_to),
271                            dport=random.randint(self.udp_dport_from,
272                                                 self.udp_dport_to))
273             else:
274                 return UDP(sport=ports, dport=ports)
275         elif p == 'TCP':
276             if ports == 0:
277                 return TCP(sport=random.randint(self.tcp_sport_from,
278                                                 self.tcp_sport_to),
279                            dport=random.randint(self.tcp_dport_from,
280                                                 self.tcp_dport_to))
281             else:
282                 return TCP(sport=ports, dport=ports)
283         return ''
284
285     def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
286                       proto=-1, ports=0, fragments=False,
287                       pkt_raw=True, etype=-1):
288         """
289         Create input packet stream for defined interface using hosts or
290         deleted_hosts list.
291
292         :param object src_if: Interface to create packet stream for.
293         :param list packet_sizes: List of required packet sizes.
294         :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
295         :return: Stream of packets.
296         """
297         pkts = []
298         if self.flows.__contains__(src_if):
299             src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
300             for dst_if in self.flows[src_if]:
301                 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
302                 n_int = len(dst_hosts) * len(src_hosts)
303                 for i in range(0, n_int):
304                     dst_host = dst_hosts[i / len(src_hosts)]
305                     src_host = src_hosts[i % len(src_hosts)]
306                     pkt_info = self.create_packet_info(src_if, dst_if)
307                     if ipv6 == 1:
308                         pkt_info.ip = 1
309                     elif ipv6 == 0:
310                         pkt_info.ip = 0
311                     else:
312                         pkt_info.ip = random.choice([0, 1])
313                     if proto == -1:
314                         pkt_info.proto = random.choice(self.proto[self.IP])
315                     else:
316                         pkt_info.proto = proto
317                     payload = self.info_to_payload(pkt_info)
318                     p = Ether(dst=dst_host.mac, src=src_host.mac)
319                     if etype > 0:
320                         p = Ether(dst=dst_host.mac,
321                                   src=src_host.mac,
322                                   type=etype)
323                     if pkt_info.ip:
324                         p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
325                         if fragments:
326                             p /= IPv6ExtHdrFragment(offset=64, m=1)
327                     else:
328                         if fragments:
329                             p /= IP(src=src_host.ip4, dst=dst_host.ip4,
330                                     flags=1, frag=64)
331                         else:
332                             p /= IP(src=src_host.ip4, dst=dst_host.ip4)
333                     if traffic_type == self.ICMP:
334                         if pkt_info.ip:
335                             p /= ICMPv6EchoRequest(type=self.icmp6_type,
336                                                    code=self.icmp6_code)
337                         else:
338                             p /= ICMP(type=self.icmp4_type,
339                                       code=self.icmp4_code)
340                     else:
341                         p /= self.create_upper_layer(i, pkt_info.proto, ports)
342                     if pkt_raw:
343                         p /= Raw(payload)
344                         pkt_info.data = p.copy()
345                     if pkt_raw:
346                         size = random.choice(packet_sizes)
347                         self.extend_packet(p, size)
348                     pkts.append(p)
349         return pkts
350
351     def verify_capture(self, pg_if, capture,
352                        traffic_type=0, ip_type=0, etype=-1):
353         """
354         Verify captured input packet stream for defined interface.
355
356         :param object pg_if: Interface to verify captured packet stream for.
357         :param list capture: Captured packet stream.
358         :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
359         """
360         last_info = dict()
361         for i in self.pg_interfaces:
362             last_info[i.sw_if_index] = None
363         dst_sw_if_index = pg_if.sw_if_index
364         for packet in capture:
365             if etype > 0:
366                 if packet[Ether].type != etype:
367                     self.logger.error(ppp("Unexpected ethertype in packet:",
368                                           packet))
369                 else:
370                     continue
371             try:
372                 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
373                 if traffic_type == self.ICMP and ip_type == self.IPV6:
374                     payload_info = self.payload_to_info(
375                         packet[ICMPv6EchoRequest].data)
376                     payload = packet[ICMPv6EchoRequest]
377                 else:
378                     payload_info = self.payload_to_info(str(packet[Raw]))
379                     payload = packet[self.proto_map[payload_info.proto]]
380             except:
381                 self.logger.error(ppp("Unexpected or invalid packet "
382                                       "(outside network):", packet))
383                 raise
384
385             if ip_type != 0:
386                 self.assertEqual(payload_info.ip, ip_type)
387             if traffic_type == self.ICMP:
388                 try:
389                     if payload_info.ip == 0:
390                         self.assertEqual(payload.type, self.icmp4_type)
391                         self.assertEqual(payload.code, self.icmp4_code)
392                     else:
393                         self.assertEqual(payload.type, self.icmp6_type)
394                         self.assertEqual(payload.code, self.icmp6_code)
395                 except:
396                     self.logger.error(ppp("Unexpected or invalid packet "
397                                           "(outside network):", packet))
398                     raise
399             else:
400                 try:
401                     ip_version = IPv6 if payload_info.ip == 1 else IP
402
403                     ip = packet[ip_version]
404                     packet_index = payload_info.index
405
406                     self.assertEqual(payload_info.dst, dst_sw_if_index)
407                     self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
408                                       (pg_if.name, payload_info.src,
409                                        packet_index))
410                     next_info = self.get_next_packet_info_for_interface2(
411                         payload_info.src, dst_sw_if_index,
412                         last_info[payload_info.src])
413                     last_info[payload_info.src] = next_info
414                     self.assertTrue(next_info is not None)
415                     self.assertEqual(packet_index, next_info.index)
416                     saved_packet = next_info.data
417                     # Check standard fields
418                     self.assertEqual(ip.src, saved_packet[ip_version].src)
419                     self.assertEqual(ip.dst, saved_packet[ip_version].dst)
420                     p = self.proto_map[payload_info.proto]
421                     if p == 'TCP':
422                         tcp = packet[TCP]
423                         self.assertEqual(tcp.sport, saved_packet[
424                             TCP].sport)
425                         self.assertEqual(tcp.dport, saved_packet[
426                             TCP].dport)
427                     elif p == 'UDP':
428                         udp = packet[UDP]
429                         self.assertEqual(udp.sport, saved_packet[
430                             UDP].sport)
431                         self.assertEqual(udp.dport, saved_packet[
432                             UDP].dport)
433                 except:
434                     self.logger.error(ppp("Unexpected or invalid packet:",
435                                           packet))
436                     raise
437         for i in self.pg_interfaces:
438             remaining_packet = self.get_next_packet_info_for_interface2(
439                 i, dst_sw_if_index, last_info[i.sw_if_index])
440             self.assertTrue(
441                 remaining_packet is None,
442                 "Port %u: Packet expected from source %u didn't arrive" %
443                 (dst_sw_if_index, i.sw_if_index))
444
445     def run_traffic_no_check(self):
446         # Test
447         # Create incoming packet streams for packet-generator interfaces
448         for i in self.pg_interfaces:
449             if self.flows.__contains__(i):
450                 pkts = self.create_stream(i, self.pg_if_packet_sizes)
451                 if len(pkts) > 0:
452                     i.add_stream(pkts)
453
454         # Enable packet capture and start packet sending
455         self.pg_enable_capture(self.pg_interfaces)
456         self.pg_start()
457
458     def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
459                         frags=False, pkt_raw=True, etype=-1):
460         # Test
461         # Create incoming packet streams for packet-generator interfaces
462         pkts_cnt = 0
463         for i in self.pg_interfaces:
464             if self.flows.__contains__(i):
465                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
466                                           traffic_type, ip_type, proto, ports,
467                                           frags, pkt_raw, etype)
468                 if len(pkts) > 0:
469                     i.add_stream(pkts)
470                     pkts_cnt += len(pkts)
471
472         # Enable packet capture and start packet sendingself.IPV
473         self.pg_enable_capture(self.pg_interfaces)
474         self.pg_start()
475         self.logger.info("sent packets count: %d" % pkts_cnt)
476
477         # Verify
478         # Verify outgoing packet streams per packet-generator interface
479         for src_if in self.pg_interfaces:
480             if self.flows.__contains__(src_if):
481                 for dst_if in self.flows[src_if]:
482                     capture = dst_if.get_capture(pkts_cnt)
483                     self.logger.info("Verifying capture on interface %s" %
484                                      dst_if.name)
485                     self.verify_capture(dst_if, capture,
486                                         traffic_type, ip_type, etype)
487
488     def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
489                               ports=0, frags=False, etype=-1):
490         # Test
491         pkts_cnt = 0
492         self.reset_packet_infos()
493         for i in self.pg_interfaces:
494             if self.flows.__contains__(i):
495                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
496                                           traffic_type, ip_type, proto, ports,
497                                           frags, True, etype)
498                 if len(pkts) > 0:
499                     i.add_stream(pkts)
500                     pkts_cnt += len(pkts)
501
502         # Enable packet capture and start packet sending
503         self.pg_enable_capture(self.pg_interfaces)
504         self.pg_start()
505         self.logger.info("sent packets count: %d" % pkts_cnt)
506
507         # Verify
508         # Verify outgoing packet streams per packet-generator interface
509         for src_if in self.pg_interfaces:
510             if self.flows.__contains__(src_if):
511                 for dst_if in self.flows[src_if]:
512                     self.logger.info("Verifying capture on interface %s" %
513                                      dst_if.name)
514                     capture = dst_if.get_capture(0)
515                     self.assertEqual(len(capture), 0)
516
517     def test_0000_warmup_test(self):
518         """ ACL plugin version check; learn MACs
519         """
520         reply = self.vapi.papi.acl_plugin_get_version()
521         self.assertEqual(reply.major, 1)
522         self.logger.info("Working with ACL plugin version: %d.%d" % (
523             reply.major, reply.minor))
524         # minor version changes are non breaking
525         # self.assertEqual(reply.minor, 0)
526
527     def test_0001_acl_create(self):
528         """ ACL create/delete test
529         """
530
531         self.logger.info("ACLP_TEST_START_0001")
532         # Add an ACL
533         r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
534               'srcport_or_icmptype_first': 1234,
535               'srcport_or_icmptype_last': 1235,
536               'src_ip_prefix_len': 0,
537               'src_ip_addr': '\x00\x00\x00\x00',
538               'dstport_or_icmpcode_first': 1234,
539               'dstport_or_icmpcode_last': 1234,
540               'dst_ip_addr': '\x00\x00\x00\x00',
541               'dst_ip_prefix_len': 0}]
542         # Test 1: add a new ACL
543         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r,
544                                           tag="permit 1234")
545         self.assertEqual(reply.retval, 0)
546         # The very first ACL gets #0
547         self.assertEqual(reply.acl_index, 0)
548         first_acl = reply.acl_index
549         rr = self.vapi.acl_dump(reply.acl_index)
550         self.logger.info("Dumped ACL: " + str(rr))
551         self.assertEqual(len(rr), 1)
552         # We should have the same number of ACL entries as we had asked
553         self.assertEqual(len(rr[0].r), len(r))
554         # The rules should be the same. But because the submitted and returned
555         # are different types, we need to iterate over rules and keys to get
556         # to basic values.
557         for i_rule in range(0, len(r) - 1):
558             for rule_key in r[i_rule]:
559                 self.assertEqual(rr[0].r[i_rule][rule_key],
560                                  r[i_rule][rule_key])
561
562         # Add a deny-1234 ACL
563         r_deny = [{'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
564                    'srcport_or_icmptype_first': 1234,
565                    'srcport_or_icmptype_last': 1235,
566                    'src_ip_prefix_len': 0,
567                    'src_ip_addr': '\x00\x00\x00\x00',
568                    'dstport_or_icmpcode_first': 1234,
569                    'dstport_or_icmpcode_last': 1234,
570                    'dst_ip_addr': '\x00\x00\x00\x00',
571                    'dst_ip_prefix_len': 0},
572                   {'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
573                    'srcport_or_icmptype_first': 0,
574                    'srcport_or_icmptype_last': 0,
575                    'src_ip_prefix_len': 0,
576                    'src_ip_addr': '\x00\x00\x00\x00',
577                    'dstport_or_icmpcode_first': 0,
578                    'dstport_or_icmpcode_last': 0,
579                    'dst_ip_addr': '\x00\x00\x00\x00',
580                    'dst_ip_prefix_len': 0}]
581
582         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_deny,
583                                           tag="deny 1234;permit all")
584         self.assertEqual(reply.retval, 0)
585         # The second ACL gets #1
586         self.assertEqual(reply.acl_index, 1)
587         second_acl = reply.acl_index
588
589         # Test 2: try to modify a nonexistent ACL
590         reply = self.vapi.acl_add_replace(acl_index=432, r=r,
591                                           tag="FFFF:FFFF", expected_retval=-6)
592         self.assertEqual(reply.retval, -6)
593         # The ACL number should pass through
594         self.assertEqual(reply.acl_index, 432)
595         # apply an ACL on an interface inbound, try to delete ACL, must fail
596         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
597                                              n_input=1,
598                                              acls=[first_acl])
599         reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=-142)
600         # Unapply an ACL and then try to delete it - must be ok
601         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
602                                              n_input=0,
603                                              acls=[])
604         reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=0)
605
606         # apply an ACL on an interface outbound, try to delete ACL, must fail
607         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
608                                              n_input=0,
609                                              acls=[second_acl])
610         reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=-143)
611         # Unapply the ACL and then try to delete it - must be ok
612         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
613                                              n_input=0,
614                                              acls=[])
615         reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=0)
616
617         # try to apply a nonexistent ACL - must fail
618         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
619                                              n_input=1,
620                                              acls=[first_acl],
621                                              expected_retval=-6)
622
623         self.logger.info("ACLP_TEST_FINISH_0001")
624
625     def test_0002_acl_permit_apply(self):
626         """ permit ACL apply test
627         """
628         self.logger.info("ACLP_TEST_START_0002")
629
630         rules = []
631         rules.append(self.create_rule(self.IPV4, self.PERMIT,
632                      0, self.proto[self.IP][self.UDP]))
633         rules.append(self.create_rule(self.IPV4, self.PERMIT,
634                      0, self.proto[self.IP][self.TCP]))
635
636         # Apply rules
637         self.apply_rules(rules, "permit per-flow")
638
639         # Traffic should still pass
640         self.run_verify_test(self.IP, self.IPV4, -1)
641         self.logger.info("ACLP_TEST_FINISH_0002")
642
643     def test_0003_acl_deny_apply(self):
644         """ deny ACL apply test
645         """
646         self.logger.info("ACLP_TEST_START_0003")
647         # Add a deny-flows ACL
648         rules = []
649         rules.append(self.create_rule(self.IPV4, self.DENY,
650                      self.PORTS_ALL, self.proto[self.IP][self.UDP]))
651         # Permit ip any any in the end
652         rules.append(self.create_rule(self.IPV4, self.PERMIT,
653                                       self.PORTS_ALL, 0))
654
655         # Apply rules
656         self.apply_rules(rules, "deny per-flow;permit all")
657
658         # Traffic should not pass
659         self.run_verify_negat_test(self.IP, self.IPV4,
660                                    self.proto[self.IP][self.UDP])
661         self.logger.info("ACLP_TEST_FINISH_0003")
662         # self.assertEqual(1, 0)
663
664     def test_0004_vpp624_permit_icmpv4(self):
665         """ VPP_624 permit ICMPv4
666         """
667         self.logger.info("ACLP_TEST_START_0004")
668
669         # Add an ACL
670         rules = []
671         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
672                                       self.proto[self.ICMP][self.ICMPv4]))
673         # deny ip any any in the end
674         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
675
676         # Apply rules
677         self.apply_rules(rules, "permit icmpv4")
678
679         # Traffic should still pass
680         self.run_verify_test(self.ICMP, self.IPV4,
681                              self.proto[self.ICMP][self.ICMPv4])
682
683         self.logger.info("ACLP_TEST_FINISH_0004")
684
685     def test_0005_vpp624_permit_icmpv6(self):
686         """ VPP_624 permit ICMPv6
687         """
688         self.logger.info("ACLP_TEST_START_0005")
689
690         # Add an ACL
691         rules = []
692         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
693                                       self.proto[self.ICMP][self.ICMPv6]))
694         # deny ip any any in the end
695         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
696
697         # Apply rules
698         self.apply_rules(rules, "permit icmpv6")
699
700         # Traffic should still pass
701         self.run_verify_test(self.ICMP, self.IPV6,
702                              self.proto[self.ICMP][self.ICMPv6])
703
704         self.logger.info("ACLP_TEST_FINISH_0005")
705
706     def test_0006_vpp624_deny_icmpv4(self):
707         """ VPP_624 deny ICMPv4
708         """
709         self.logger.info("ACLP_TEST_START_0006")
710         # Add an ACL
711         rules = []
712         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
713                                       self.proto[self.ICMP][self.ICMPv4]))
714         # permit ip any any in the end
715         rules.append(self.create_rule(self.IPV4, self.PERMIT,
716                                       self.PORTS_ALL, 0))
717
718         # Apply rules
719         self.apply_rules(rules, "deny icmpv4")
720
721         # Traffic should not pass
722         self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
723
724         self.logger.info("ACLP_TEST_FINISH_0006")
725
726     def test_0007_vpp624_deny_icmpv6(self):
727         """ VPP_624 deny ICMPv6
728         """
729         self.logger.info("ACLP_TEST_START_0007")
730         # Add an ACL
731         rules = []
732         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
733                                       self.proto[self.ICMP][self.ICMPv6]))
734         # deny ip any any in the end
735         rules.append(self.create_rule(self.IPV6, self.PERMIT,
736                                       self.PORTS_ALL, 0))
737
738         # Apply rules
739         self.apply_rules(rules, "deny icmpv6")
740
741         # Traffic should not pass
742         self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
743
744         self.logger.info("ACLP_TEST_FINISH_0007")
745
746     def test_0008_tcp_permit_v4(self):
747         """ permit TCPv4
748         """
749         self.logger.info("ACLP_TEST_START_0008")
750
751         # Add an ACL
752         rules = []
753         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
754                      self.proto[self.IP][self.TCP]))
755         # deny ip any any in the end
756         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
757
758         # Apply rules
759         self.apply_rules(rules, "permit ipv4 tcp")
760
761         # Traffic should still pass
762         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
763
764         self.logger.info("ACLP_TEST_FINISH_0008")
765
766     def test_0009_tcp_permit_v6(self):
767         """ permit TCPv6
768         """
769         self.logger.info("ACLP_TEST_START_0009")
770
771         # Add an ACL
772         rules = []
773         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
774                                       self.proto[self.IP][self.TCP]))
775         # deny ip any any in the end
776         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
777
778         # Apply rules
779         self.apply_rules(rules, "permit ip6 tcp")
780
781         # Traffic should still pass
782         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
783
784         self.logger.info("ACLP_TEST_FINISH_0008")
785
786     def test_0010_udp_permit_v4(self):
787         """ permit UDPv4
788         """
789         self.logger.info("ACLP_TEST_START_0010")
790
791         # Add an ACL
792         rules = []
793         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
794                                       self.proto[self.IP][self.UDP]))
795         # deny ip any any in the end
796         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
797
798         # Apply rules
799         self.apply_rules(rules, "permit ipv udp")
800
801         # Traffic should still pass
802         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
803
804         self.logger.info("ACLP_TEST_FINISH_0010")
805
806     def test_0011_udp_permit_v6(self):
807         """ permit UDPv6
808         """
809         self.logger.info("ACLP_TEST_START_0011")
810
811         # Add an ACL
812         rules = []
813         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
814                                       self.proto[self.IP][self.UDP]))
815         # deny ip any any in the end
816         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
817
818         # Apply rules
819         self.apply_rules(rules, "permit ip6 udp")
820
821         # Traffic should still pass
822         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
823
824         self.logger.info("ACLP_TEST_FINISH_0011")
825
826     def test_0012_tcp_deny(self):
827         """ deny TCPv4/v6
828         """
829         self.logger.info("ACLP_TEST_START_0012")
830
831         # Add an ACL
832         rules = []
833         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
834                                       self.proto[self.IP][self.TCP]))
835         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
836                                       self.proto[self.IP][self.TCP]))
837         # permit ip any any in the end
838         rules.append(self.create_rule(self.IPV4, self.PERMIT,
839                                       self.PORTS_ALL, 0))
840         rules.append(self.create_rule(self.IPV6, self.PERMIT,
841                                       self.PORTS_ALL, 0))
842
843         # Apply rules
844         self.apply_rules(rules, "deny ip4/ip6 tcp")
845
846         # Traffic should not pass
847         self.run_verify_negat_test(self.IP, self.IPRANDOM,
848                                    self.proto[self.IP][self.TCP])
849
850         self.logger.info("ACLP_TEST_FINISH_0012")
851
852     def test_0013_udp_deny(self):
853         """ deny UDPv4/v6
854         """
855         self.logger.info("ACLP_TEST_START_0013")
856
857         # Add an ACL
858         rules = []
859         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
860                                       self.proto[self.IP][self.UDP]))
861         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
862                                       self.proto[self.IP][self.UDP]))
863         # permit ip any any in the end
864         rules.append(self.create_rule(self.IPV4, self.PERMIT,
865                                       self.PORTS_ALL, 0))
866         rules.append(self.create_rule(self.IPV6, self.PERMIT,
867                                       self.PORTS_ALL, 0))
868
869         # Apply rules
870         self.apply_rules(rules, "deny ip4/ip6 udp")
871
872         # Traffic should not pass
873         self.run_verify_negat_test(self.IP, self.IPRANDOM,
874                                    self.proto[self.IP][self.UDP])
875
876         self.logger.info("ACLP_TEST_FINISH_0013")
877
878     def test_0014_acl_dump(self):
879         """ verify add/dump acls
880         """
881         self.logger.info("ACLP_TEST_START_0014")
882
883         r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
884              [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
885              [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
886              [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
887              [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
888              [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
889              [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
890              [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
891              [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
892              [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
893              [self.IPV4, self.DENY, self.PORTS_ALL, 0],
894              [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
895              [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
896              [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
897              [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
898              [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
899              [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
900              [self.IPV6, self.DENY, self.PORTS_ALL, 0]
901              ]
902
903         # Add and verify new ACLs
904         rules = []
905         for i in range(len(r)):
906             rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
907
908         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules)
909         result = self.vapi.acl_dump(reply.acl_index)
910
911         i = 0
912         for drules in result:
913             for dr in drules.r:
914                 self.assertEqual(dr.is_ipv6, r[i][0])
915                 self.assertEqual(dr.is_permit, r[i][1])
916                 self.assertEqual(dr.proto, r[i][3])
917
918                 if r[i][2] > 0:
919                     self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
920                 else:
921                     if r[i][2] < 0:
922                         self.assertEqual(dr.srcport_or_icmptype_first, 0)
923                         self.assertEqual(dr.srcport_or_icmptype_last, 65535)
924                     else:
925                         if dr.proto == self.proto[self.IP][self.TCP]:
926                             self.assertGreater(dr.srcport_or_icmptype_first,
927                                                self.tcp_sport_from-1)
928                             self.assertLess(dr.srcport_or_icmptype_first,
929                                             self.tcp_sport_to+1)
930                             self.assertGreater(dr.dstport_or_icmpcode_last,
931                                                self.tcp_dport_from-1)
932                             self.assertLess(dr.dstport_or_icmpcode_last,
933                                             self.tcp_dport_to+1)
934                         elif dr.proto == self.proto[self.IP][self.UDP]:
935                             self.assertGreater(dr.srcport_or_icmptype_first,
936                                                self.udp_sport_from-1)
937                             self.assertLess(dr.srcport_or_icmptype_first,
938                                             self.udp_sport_to+1)
939                             self.assertGreater(dr.dstport_or_icmpcode_last,
940                                                self.udp_dport_from-1)
941                             self.assertLess(dr.dstport_or_icmpcode_last,
942                                             self.udp_dport_to+1)
943                 i += 1
944
945         self.logger.info("ACLP_TEST_FINISH_0014")
946
947     def test_0015_tcp_permit_port_v4(self):
948         """ permit single TCPv4
949         """
950         self.logger.info("ACLP_TEST_START_0015")
951
952         port = random.randint(0, 65535)
953         # Add an ACL
954         rules = []
955         rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
956                                       self.proto[self.IP][self.TCP]))
957         # deny ip any any in the end
958         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
959
960         # Apply rules
961         self.apply_rules(rules, "permit ip4 tcp "+str(port))
962
963         # Traffic should still pass
964         self.run_verify_test(self.IP, self.IPV4,
965                              self.proto[self.IP][self.TCP], port)
966
967         self.logger.info("ACLP_TEST_FINISH_0015")
968
969     def test_0016_udp_permit_port_v4(self):
970         """ permit single UDPv4
971         """
972         self.logger.info("ACLP_TEST_START_0016")
973
974         port = random.randint(0, 65535)
975         # Add an ACL
976         rules = []
977         rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
978                                       self.proto[self.IP][self.UDP]))
979         # deny ip any any in the end
980         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
981
982         # Apply rules
983         self.apply_rules(rules, "permit ip4 tcp "+str(port))
984
985         # Traffic should still pass
986         self.run_verify_test(self.IP, self.IPV4,
987                              self.proto[self.IP][self.UDP], port)
988
989         self.logger.info("ACLP_TEST_FINISH_0016")
990
991     def test_0017_tcp_permit_port_v6(self):
992         """ permit single TCPv6
993         """
994         self.logger.info("ACLP_TEST_START_0017")
995
996         port = random.randint(0, 65535)
997         # Add an ACL
998         rules = []
999         rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1000                                       self.proto[self.IP][self.TCP]))
1001         # deny ip any any in the end
1002         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1003
1004         # Apply rules
1005         self.apply_rules(rules, "permit ip4 tcp "+str(port))
1006
1007         # Traffic should still pass
1008         self.run_verify_test(self.IP, self.IPV6,
1009                              self.proto[self.IP][self.TCP], port)
1010
1011         self.logger.info("ACLP_TEST_FINISH_0017")
1012
1013     def test_0018_udp_permit_port_v6(self):
1014         """ permit single UPPv6
1015         """
1016         self.logger.info("ACLP_TEST_START_0018")
1017
1018         port = random.randint(0, 65535)
1019         # Add an ACL
1020         rules = []
1021         rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1022                                       self.proto[self.IP][self.UDP]))
1023         # deny ip any any in the end
1024         rules.append(self.create_rule(self.IPV6, self.DENY,
1025                                       self.PORTS_ALL, 0))
1026
1027         # Apply rules
1028         self.apply_rules(rules, "permit ip4 tcp "+str(port))
1029
1030         # Traffic should still pass
1031         self.run_verify_test(self.IP, self.IPV6,
1032                              self.proto[self.IP][self.UDP], port)
1033
1034         self.logger.info("ACLP_TEST_FINISH_0018")
1035
1036     def test_0019_udp_deny_port(self):
1037         """ deny single TCPv4/v6
1038         """
1039         self.logger.info("ACLP_TEST_START_0019")
1040
1041         port = random.randint(0, 65535)
1042         # Add an ACL
1043         rules = []
1044         rules.append(self.create_rule(self.IPV4, self.DENY, port,
1045                                       self.proto[self.IP][self.TCP]))
1046         rules.append(self.create_rule(self.IPV6, self.DENY, port,
1047                                       self.proto[self.IP][self.TCP]))
1048         # Permit ip any any in the end
1049         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1050                                       self.PORTS_ALL, 0))
1051         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1052                                       self.PORTS_ALL, 0))
1053
1054         # Apply rules
1055         self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1056
1057         # Traffic should not pass
1058         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1059                                    self.proto[self.IP][self.TCP], port)
1060
1061         self.logger.info("ACLP_TEST_FINISH_0019")
1062
1063     def test_0020_udp_deny_port(self):
1064         """ deny single UDPv4/v6
1065         """
1066         self.logger.info("ACLP_TEST_START_0020")
1067
1068         port = random.randint(0, 65535)
1069         # Add an ACL
1070         rules = []
1071         rules.append(self.create_rule(self.IPV4, self.DENY, port,
1072                                       self.proto[self.IP][self.UDP]))
1073         rules.append(self.create_rule(self.IPV6, self.DENY, port,
1074                                       self.proto[self.IP][self.UDP]))
1075         # Permit ip any any in the end
1076         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1077                                       self.PORTS_ALL, 0))
1078         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1079                                       self.PORTS_ALL, 0))
1080
1081         # Apply rules
1082         self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1083
1084         # Traffic should not pass
1085         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1086                                    self.proto[self.IP][self.UDP], port)
1087
1088         self.logger.info("ACLP_TEST_FINISH_0020")
1089
1090     def test_0021_udp_deny_port_verify_fragment_deny(self):
1091         """ deny single UDPv4/v6, permit ip any, verify non-initial fragment
1092         blocked
1093         """
1094         self.logger.info("ACLP_TEST_START_0021")
1095
1096         port = random.randint(0, 65535)
1097         # Add an ACL
1098         rules = []
1099         rules.append(self.create_rule(self.IPV4, self.DENY, port,
1100                                       self.proto[self.IP][self.UDP]))
1101         rules.append(self.create_rule(self.IPV6, self.DENY, port,
1102                                       self.proto[self.IP][self.UDP]))
1103         # deny ip any any in the end
1104         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1105                                       self.PORTS_ALL, 0))
1106         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1107                                       self.PORTS_ALL, 0))
1108
1109         # Apply rules
1110         self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1111
1112         # Traffic should not pass
1113         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1114                                    self.proto[self.IP][self.UDP], port, True)
1115
1116         self.logger.info("ACLP_TEST_FINISH_0021")
1117
1118     def test_0022_zero_length_udp_ipv4(self):
1119         """ VPP-687 zero length udp ipv4 packet"""
1120         self.logger.info("ACLP_TEST_START_0022")
1121
1122         port = random.randint(0, 65535)
1123         # Add an ACL
1124         rules = []
1125         rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
1126                                       self.proto[self.IP][self.UDP]))
1127         # deny ip any any in the end
1128         rules.append(
1129             self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1130
1131         # Apply rules
1132         self.apply_rules(rules, "permit empty udp ip4 " + str(port))
1133
1134         # Traffic should still pass
1135         # Create incoming packet streams for packet-generator interfaces
1136         pkts_cnt = 0
1137         pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1138                                   self.IP, self.IPV4,
1139                                   self.proto[self.IP][self.UDP], port,
1140                                   False, False)
1141         if len(pkts) > 0:
1142             self.pg0.add_stream(pkts)
1143             pkts_cnt += len(pkts)
1144
1145         # Enable packet capture and start packet sendingself.IPV
1146         self.pg_enable_capture(self.pg_interfaces)
1147         self.pg_start()
1148
1149         self.pg1.get_capture(pkts_cnt)
1150
1151         self.logger.info("ACLP_TEST_FINISH_0022")
1152
1153     def test_0023_zero_length_udp_ipv6(self):
1154         """ VPP-687 zero length udp ipv6 packet"""
1155         self.logger.info("ACLP_TEST_START_0023")
1156
1157         port = random.randint(0, 65535)
1158         # Add an ACL
1159         rules = []
1160         rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1161                                       self.proto[self.IP][self.UDP]))
1162         # deny ip any any in the end
1163         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1164
1165         # Apply rules
1166         self.apply_rules(rules, "permit empty udp ip6 "+str(port))
1167
1168         # Traffic should still pass
1169         # Create incoming packet streams for packet-generator interfaces
1170         pkts_cnt = 0
1171         pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1172                                   self.IP, self.IPV6,
1173                                   self.proto[self.IP][self.UDP], port,
1174                                   False, False)
1175         if len(pkts) > 0:
1176             self.pg0.add_stream(pkts)
1177             pkts_cnt += len(pkts)
1178
1179         # Enable packet capture and start packet sendingself.IPV
1180         self.pg_enable_capture(self.pg_interfaces)
1181         self.pg_start()
1182
1183         # Verify outgoing packet streams per packet-generator interface
1184         self.pg1.get_capture(pkts_cnt)
1185
1186         self.logger.info("ACLP_TEST_FINISH_0023")
1187
1188     def test_0108_tcp_permit_v4(self):
1189         """ permit TCPv4 + non-match range
1190         """
1191         self.logger.info("ACLP_TEST_START_0108")
1192
1193         # Add an ACL
1194         rules = []
1195         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1196                      self.proto[self.IP][self.TCP]))
1197         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1198                      self.proto[self.IP][self.TCP]))
1199         # deny ip any any in the end
1200         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1201
1202         # Apply rules
1203         self.apply_rules(rules, "permit ipv4 tcp")
1204
1205         # Traffic should still pass
1206         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1207
1208         self.logger.info("ACLP_TEST_FINISH_0108")
1209
1210     def test_0109_tcp_permit_v6(self):
1211         """ permit TCPv6 + non-match range
1212         """
1213         self.logger.info("ACLP_TEST_START_0109")
1214
1215         # Add an ACL
1216         rules = []
1217         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1218                                       self.proto[self.IP][self.TCP]))
1219         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1220                                       self.proto[self.IP][self.TCP]))
1221         # deny ip any any in the end
1222         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1223
1224         # Apply rules
1225         self.apply_rules(rules, "permit ip6 tcp")
1226
1227         # Traffic should still pass
1228         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
1229
1230         self.logger.info("ACLP_TEST_FINISH_0109")
1231
1232     def test_0110_udp_permit_v4(self):
1233         """ permit UDPv4 + non-match range
1234         """
1235         self.logger.info("ACLP_TEST_START_0110")
1236
1237         # Add an ACL
1238         rules = []
1239         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1240                                       self.proto[self.IP][self.UDP]))
1241         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1242                                       self.proto[self.IP][self.UDP]))
1243         # deny ip any any in the end
1244         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1245
1246         # Apply rules
1247         self.apply_rules(rules, "permit ipv4 udp")
1248
1249         # Traffic should still pass
1250         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
1251
1252         self.logger.info("ACLP_TEST_FINISH_0110")
1253
1254     def test_0111_udp_permit_v6(self):
1255         """ permit UDPv6 + non-match range
1256         """
1257         self.logger.info("ACLP_TEST_START_0111")
1258
1259         # Add an ACL
1260         rules = []
1261         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1262                                       self.proto[self.IP][self.UDP]))
1263         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1264                                       self.proto[self.IP][self.UDP]))
1265         # deny ip any any in the end
1266         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1267
1268         # Apply rules
1269         self.apply_rules(rules, "permit ip6 udp")
1270
1271         # Traffic should still pass
1272         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
1273
1274         self.logger.info("ACLP_TEST_FINISH_0111")
1275
1276     def test_0112_tcp_deny(self):
1277         """ deny TCPv4/v6 + non-match range
1278         """
1279         self.logger.info("ACLP_TEST_START_0112")
1280
1281         # Add an ACL
1282         rules = []
1283         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1284                                       self.PORTS_RANGE_2,
1285                                       self.proto[self.IP][self.TCP]))
1286         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1287                                       self.PORTS_RANGE_2,
1288                                       self.proto[self.IP][self.TCP]))
1289         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1290                                       self.proto[self.IP][self.TCP]))
1291         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1292                                       self.proto[self.IP][self.TCP]))
1293         # permit ip any any in the end
1294         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1295                                       self.PORTS_ALL, 0))
1296         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1297                                       self.PORTS_ALL, 0))
1298
1299         # Apply rules
1300         self.apply_rules(rules, "deny ip4/ip6 tcp")
1301
1302         # Traffic should not pass
1303         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1304                                    self.proto[self.IP][self.TCP])
1305
1306         self.logger.info("ACLP_TEST_FINISH_0112")
1307
1308     def test_0113_udp_deny(self):
1309         """ deny UDPv4/v6 + non-match range
1310         """
1311         self.logger.info("ACLP_TEST_START_0113")
1312
1313         # Add an ACL
1314         rules = []
1315         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1316                                       self.PORTS_RANGE_2,
1317                                       self.proto[self.IP][self.UDP]))
1318         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1319                                       self.PORTS_RANGE_2,
1320                                       self.proto[self.IP][self.UDP]))
1321         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1322                                       self.proto[self.IP][self.UDP]))
1323         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1324                                       self.proto[self.IP][self.UDP]))
1325         # permit ip any any in the end
1326         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1327                                       self.PORTS_ALL, 0))
1328         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1329                                       self.PORTS_ALL, 0))
1330
1331         # Apply rules
1332         self.apply_rules(rules, "deny ip4/ip6 udp")
1333
1334         # Traffic should not pass
1335         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1336                                    self.proto[self.IP][self.UDP])
1337
1338         self.logger.info("ACLP_TEST_FINISH_0113")
1339
1340     def test_0300_tcp_permit_v4_etype_aaaa(self):
1341         """ permit TCPv4, send 0xAAAA etype
1342         """
1343         self.logger.info("ACLP_TEST_START_0300")
1344
1345         # Add an ACL
1346         rules = []
1347         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1348                      self.proto[self.IP][self.TCP]))
1349         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1350                      self.proto[self.IP][self.TCP]))
1351         # deny ip any any in the end
1352         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1353
1354         # Apply rules
1355         self.apply_rules(rules, "permit ipv4 tcp")
1356
1357         # Traffic should still pass also for an odd ethertype
1358         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1359                              0, False, True, 0xaaaa)
1360         self.logger.info("ACLP_TEST_FINISH_0300")
1361
1362     def test_0305_tcp_permit_v4_etype_blacklist_aaaa(self):
1363         """ permit TCPv4, whitelist 0x0BBB ethertype, send 0xAAAA-blocked
1364         """
1365         self.logger.info("ACLP_TEST_START_0305")
1366
1367         # Add an ACL
1368         rules = []
1369         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1370                      self.proto[self.IP][self.TCP]))
1371         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1372                      self.proto[self.IP][self.TCP]))
1373         # deny ip any any in the end
1374         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1375
1376         # Apply rules
1377         self.apply_rules(rules, "permit ipv4 tcp")
1378
1379         # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1380         self.etype_whitelist([0xbbb], 1)
1381
1382         # The oddball ethertype should be blocked
1383         self.run_verify_negat_test(self.IP, self.IPV4,
1384                                    self.proto[self.IP][self.TCP],
1385                                    0, False, 0xaaaa)
1386
1387         # remove the whitelist
1388         self.etype_whitelist([], 0)
1389
1390         self.logger.info("ACLP_TEST_FINISH_0305")
1391
1392     def test_0306_tcp_permit_v4_etype_blacklist_aaaa(self):
1393         """ permit TCPv4, whitelist 0x0BBB ethertype, send 0x0BBB - pass
1394         """
1395         self.logger.info("ACLP_TEST_START_0306")
1396
1397         # Add an ACL
1398         rules = []
1399         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1400                      self.proto[self.IP][self.TCP]))
1401         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1402                      self.proto[self.IP][self.TCP]))
1403         # deny ip any any in the end
1404         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1405
1406         # Apply rules
1407         self.apply_rules(rules, "permit ipv4 tcp")
1408
1409         # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1410         self.etype_whitelist([0xbbb], 1)
1411
1412         # The whitelisted traffic, should pass
1413         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1414                              0, False, True, 0x0bbb)
1415
1416         # remove the whitelist, the previously blocked 0xAAAA should pass now
1417         self.etype_whitelist([], 0)
1418
1419         self.logger.info("ACLP_TEST_FINISH_0306")
1420
1421     def test_0307_tcp_permit_v4_etype_blacklist_aaaa(self):
1422         """ permit TCPv4, whitelist 0x0BBB, remove, send 0xAAAA - pass
1423         """
1424         self.logger.info("ACLP_TEST_START_0307")
1425
1426         # Add an ACL
1427         rules = []
1428         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1429                      self.proto[self.IP][self.TCP]))
1430         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1431                      self.proto[self.IP][self.TCP]))
1432         # deny ip any any in the end
1433         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1434
1435         # Apply rules
1436         self.apply_rules(rules, "permit ipv4 tcp")
1437
1438         # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1439         self.etype_whitelist([0xbbb], 1)
1440         # remove the whitelist, the previously blocked 0xAAAA should pass now
1441         self.etype_whitelist([], 0)
1442
1443         # The whitelisted traffic, should pass
1444         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1445                              0, False, True, 0xaaaa)
1446
1447         self.logger.info("ACLP_TEST_FINISH_0306")
1448
1449     def test_0315_del_intf(self):
1450         """ apply an acl and delete the interface
1451         """
1452         self.logger.info("ACLP_TEST_START_0315")
1453
1454         # Add an ACL
1455         rules = []
1456         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1457                      self.proto[self.IP][self.TCP]))
1458         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1459                      self.proto[self.IP][self.TCP]))
1460         # deny ip any any in the end
1461         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1462
1463         # create an interface
1464         intf = []
1465         intf.append(VppLoInterface(self))
1466
1467         # Apply rules
1468         self.apply_rules_to(rules, "permit ipv4 tcp", intf[0].sw_if_index)
1469
1470         # Remove the interface
1471         intf[0].remove_vpp_config()
1472
1473         self.logger.info("ACLP_TEST_FINISH_0315")
1474
1475 if __name__ == '__main__':
1476     unittest.main(testRunner=VppTestRunner)