acl-plugin: make test: add a test which deletes an interface with applied ACL
[vpp.git] / test / test_acl_plugin.py
1 #!/usr/bin/env python
2 """ACL plugin Test Case HLD:
3 """
4
5 import unittest
6 import random
7
8 from scapy.packet import Raw
9 from scapy.layers.l2 import Ether
10 from scapy.layers.inet import IP, TCP, UDP, ICMP
11 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
12 from scapy.layers.inet6 import IPv6ExtHdrFragment
13 from framework import VppTestCase, VppTestRunner
14 from util import Host, ppp
15
16 from vpp_lo_interface import VppLoInterface
17
18
19 class TestACLplugin(VppTestCase):
20     """ ACL plugin Test Case """
21
22     # traffic types
23     IP = 0
24     ICMP = 1
25
26     # IP version
27     IPRANDOM = -1
28     IPV4 = 0
29     IPV6 = 1
30
31     # rule types
32     DENY = 0
33     PERMIT = 1
34
35     # supported protocols
36     proto = [[6, 17], [1, 58]]
37     proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
38     ICMPv4 = 0
39     ICMPv6 = 1
40     TCP = 0
41     UDP = 1
42     PROTO_ALL = 0
43
44     # port ranges
45     PORTS_ALL = -1
46     PORTS_RANGE = 0
47     PORTS_RANGE_2 = 1
48     udp_sport_from = 10
49     udp_sport_to = udp_sport_from + 5
50     udp_dport_from = 20000
51     udp_dport_to = udp_dport_from + 5000
52     tcp_sport_from = 30
53     tcp_sport_to = tcp_sport_from + 5
54     tcp_dport_from = 40000
55     tcp_dport_to = tcp_dport_from + 5000
56
57     udp_sport_from_2 = 90
58     udp_sport_to_2 = udp_sport_from_2 + 5
59     udp_dport_from_2 = 30000
60     udp_dport_to_2 = udp_dport_from_2 + 5000
61     tcp_sport_from_2 = 130
62     tcp_sport_to_2 = tcp_sport_from_2 + 5
63     tcp_dport_from_2 = 20000
64     tcp_dport_to_2 = tcp_dport_from_2 + 5000
65
66     icmp4_type = 8  # echo request
67     icmp4_code = 3
68     icmp6_type = 128  # echo request
69     icmp6_code = 3
70
71     icmp4_type_2 = 8
72     icmp4_code_from_2 = 5
73     icmp4_code_to_2 = 20
74     icmp6_type_2 = 128
75     icmp6_code_from_2 = 8
76     icmp6_code_to_2 = 42
77
78     # Test variables
79     bd_id = 1
80
81     @classmethod
82     def setUpClass(cls):
83         """
84         Perform standard class setup (defined by class method setUpClass in
85         class VppTestCase) before running the test case, set test case related
86         variables and configure VPP.
87         """
88         super(TestACLplugin, cls).setUpClass()
89
90         try:
91             # Create 2 pg interfaces
92             cls.create_pg_interfaces(range(2))
93
94             # Packet flows mapping pg0 -> pg1, pg2 etc.
95             cls.flows = dict()
96             cls.flows[cls.pg0] = [cls.pg1]
97
98             # Packet sizes
99             cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
100
101             # Create BD with MAC learning and unknown unicast flooding disabled
102             # and put interfaces to this BD
103             cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
104                                            learn=1)
105             for pg_if in cls.pg_interfaces:
106                 cls.vapi.sw_interface_set_l2_bridge(pg_if.sw_if_index,
107                                                     bd_id=cls.bd_id)
108
109             # Set up all interfaces
110             for i in cls.pg_interfaces:
111                 i.admin_up()
112
113             # Mapping between packet-generator index and lists of test hosts
114             cls.hosts_by_pg_idx = dict()
115             for pg_if in cls.pg_interfaces:
116                 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
117
118             # Create list of deleted hosts
119             cls.deleted_hosts_by_pg_idx = dict()
120             for pg_if in cls.pg_interfaces:
121                 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
122
123             # warm-up the mac address tables
124             # self.warmup_test()
125
126         except Exception:
127             super(TestACLplugin, cls).tearDownClass()
128             raise
129
130     def setUp(self):
131         super(TestACLplugin, self).setUp()
132         self.reset_packet_infos()
133
134     def tearDown(self):
135         """
136         Show various debug prints after each test.
137         """
138         super(TestACLplugin, self).tearDown()
139         if not self.vpp_dead:
140             self.logger.info(self.vapi.ppcli("show l2fib verbose"))
141             self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
142             self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
143             self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
144             self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
145                                              % self.bd_id))
146
147     def create_hosts(self, count, start=0):
148         """
149         Create required number of host MAC addresses and distribute them among
150         interfaces. Create host IPv4 address for every host MAC address.
151
152         :param int count: Number of hosts to create MAC/IPv4 addresses for.
153         :param int start: Number to start numbering from.
154         """
155         n_int = len(self.pg_interfaces)
156         macs_per_if = count / n_int
157         i = -1
158         for pg_if in self.pg_interfaces:
159             i += 1
160             start_nr = macs_per_if * i + start
161             end_nr = count + start if i == (n_int - 1) \
162                 else macs_per_if * (i + 1) + start
163             hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
164             for j in range(start_nr, end_nr):
165                 host = Host(
166                     "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
167                     "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
168                     "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
169                 hosts.append(host)
170
171     def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
172                     s_prefix=0, s_ip='\x00\x00\x00\x00',
173                     d_prefix=0, d_ip='\x00\x00\x00\x00'):
174         if proto == -1:
175             return
176         if ports == self.PORTS_ALL:
177             sport_from = 0
178             dport_from = 0
179             sport_to = 65535 if proto != 1 and proto != 58 else 255
180             dport_to = sport_to
181         elif ports == self.PORTS_RANGE:
182             if proto == 1:
183                 sport_from = self.icmp4_type
184                 sport_to = self.icmp4_type
185                 dport_from = self.icmp4_code
186                 dport_to = self.icmp4_code
187             elif proto == 58:
188                 sport_from = self.icmp6_type
189                 sport_to = self.icmp6_type
190                 dport_from = self.icmp6_code
191                 dport_to = self.icmp6_code
192             elif proto == self.proto[self.IP][self.TCP]:
193                 sport_from = self.tcp_sport_from
194                 sport_to = self.tcp_sport_to
195                 dport_from = self.tcp_dport_from
196                 dport_to = self.tcp_dport_to
197             elif proto == self.proto[self.IP][self.UDP]:
198                 sport_from = self.udp_sport_from
199                 sport_to = self.udp_sport_to
200                 dport_from = self.udp_dport_from
201                 dport_to = self.udp_dport_to
202         elif ports == self.PORTS_RANGE_2:
203             if proto == 1:
204                 sport_from = self.icmp4_type_2
205                 sport_to = self.icmp4_type_2
206                 dport_from = self.icmp4_code_from_2
207                 dport_to = self.icmp4_code_to_2
208             elif proto == 58:
209                 sport_from = self.icmp6_type_2
210                 sport_to = self.icmp6_type_2
211                 dport_from = self.icmp6_code_from_2
212                 dport_to = self.icmp6_code_to_2
213             elif proto == self.proto[self.IP][self.TCP]:
214                 sport_from = self.tcp_sport_from_2
215                 sport_to = self.tcp_sport_to_2
216                 dport_from = self.tcp_dport_from_2
217                 dport_to = self.tcp_dport_to_2
218             elif proto == self.proto[self.IP][self.UDP]:
219                 sport_from = self.udp_sport_from_2
220                 sport_to = self.udp_sport_to_2
221                 dport_from = self.udp_dport_from_2
222                 dport_to = self.udp_dport_to_2
223         else:
224             sport_from = ports
225             sport_to = ports
226             dport_from = ports
227             dport_to = ports
228
229         rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto,
230                  'srcport_or_icmptype_first': sport_from,
231                  'srcport_or_icmptype_last': sport_to,
232                  'src_ip_prefix_len': s_prefix,
233                  'src_ip_addr': s_ip,
234                  'dstport_or_icmpcode_first': dport_from,
235                  'dstport_or_icmpcode_last': dport_to,
236                  'dst_ip_prefix_len': d_prefix,
237                  'dst_ip_addr': d_ip})
238         return rule
239
240     def apply_rules(self, rules, tag=''):
241         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
242                                           tag=tag)
243         self.logger.info("Dumped ACL: " + str(
244             self.vapi.acl_dump(reply.acl_index)))
245         # Apply a ACL on the interface as inbound
246         for i in self.pg_interfaces:
247             self.vapi.acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
248                                                  n_input=1,
249                                                  acls=[reply.acl_index])
250         return
251
252     def apply_rules_to(self, rules, tag='', sw_if_index=0xFFFFFFFF):
253         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
254                                           tag=tag)
255         self.logger.info("Dumped ACL: " + str(
256             self.vapi.acl_dump(reply.acl_index)))
257         # Apply a ACL on the interface as inbound
258         self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index,
259                                              n_input=1,
260                                              acls=[reply.acl_index])
261         return
262
263     def etype_whitelist(self, whitelist, n_input):
264         # Apply whitelists on all the interfaces
265         for i in self.pg_interfaces:
266             # checkstyle can't read long names. Help them.
267             fun = self.vapi.acl_interface_set_etype_whitelist
268             fun(sw_if_index=i.sw_if_index, n_input=n_input,
269                 whitelist=whitelist)
270         return
271
272     def create_upper_layer(self, packet_index, proto, ports=0):
273         p = self.proto_map[proto]
274         if p == 'UDP':
275             if ports == 0:
276                 return UDP(sport=random.randint(self.udp_sport_from,
277                                                 self.udp_sport_to),
278                            dport=random.randint(self.udp_dport_from,
279                                                 self.udp_dport_to))
280             else:
281                 return UDP(sport=ports, dport=ports)
282         elif p == 'TCP':
283             if ports == 0:
284                 return TCP(sport=random.randint(self.tcp_sport_from,
285                                                 self.tcp_sport_to),
286                            dport=random.randint(self.tcp_dport_from,
287                                                 self.tcp_dport_to))
288             else:
289                 return TCP(sport=ports, dport=ports)
290         return ''
291
292     def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
293                       proto=-1, ports=0, fragments=False,
294                       pkt_raw=True, etype=-1):
295         """
296         Create input packet stream for defined interface using hosts or
297         deleted_hosts list.
298
299         :param object src_if: Interface to create packet stream for.
300         :param list packet_sizes: List of required packet sizes.
301         :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
302         :return: Stream of packets.
303         """
304         pkts = []
305         if self.flows.__contains__(src_if):
306             src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
307             for dst_if in self.flows[src_if]:
308                 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
309                 n_int = len(dst_hosts) * len(src_hosts)
310                 for i in range(0, n_int):
311                     dst_host = dst_hosts[i / len(src_hosts)]
312                     src_host = src_hosts[i % len(src_hosts)]
313                     pkt_info = self.create_packet_info(src_if, dst_if)
314                     if ipv6 == 1:
315                         pkt_info.ip = 1
316                     elif ipv6 == 0:
317                         pkt_info.ip = 0
318                     else:
319                         pkt_info.ip = random.choice([0, 1])
320                     if proto == -1:
321                         pkt_info.proto = random.choice(self.proto[self.IP])
322                     else:
323                         pkt_info.proto = proto
324                     payload = self.info_to_payload(pkt_info)
325                     p = Ether(dst=dst_host.mac, src=src_host.mac)
326                     if etype > 0:
327                         p = Ether(dst=dst_host.mac,
328                                   src=src_host.mac,
329                                   type=etype)
330                     if pkt_info.ip:
331                         p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
332                         if fragments:
333                             p /= IPv6ExtHdrFragment(offset=64, m=1)
334                     else:
335                         if fragments:
336                             p /= IP(src=src_host.ip4, dst=dst_host.ip4,
337                                     flags=1, frag=64)
338                         else:
339                             p /= IP(src=src_host.ip4, dst=dst_host.ip4)
340                     if traffic_type == self.ICMP:
341                         if pkt_info.ip:
342                             p /= ICMPv6EchoRequest(type=self.icmp6_type,
343                                                    code=self.icmp6_code)
344                         else:
345                             p /= ICMP(type=self.icmp4_type,
346                                       code=self.icmp4_code)
347                     else:
348                         p /= self.create_upper_layer(i, pkt_info.proto, ports)
349                     if pkt_raw:
350                         p /= Raw(payload)
351                         pkt_info.data = p.copy()
352                     if pkt_raw:
353                         size = random.choice(packet_sizes)
354                         self.extend_packet(p, size)
355                     pkts.append(p)
356         return pkts
357
358     def verify_capture(self, pg_if, capture,
359                        traffic_type=0, ip_type=0, etype=-1):
360         """
361         Verify captured input packet stream for defined interface.
362
363         :param object pg_if: Interface to verify captured packet stream for.
364         :param list capture: Captured packet stream.
365         :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
366         """
367         last_info = dict()
368         for i in self.pg_interfaces:
369             last_info[i.sw_if_index] = None
370         dst_sw_if_index = pg_if.sw_if_index
371         for packet in capture:
372             if etype > 0:
373                 if packet[Ether].type != etype:
374                     self.logger.error(ppp("Unexpected ethertype in packet:",
375                                           packet))
376                 else:
377                     continue
378             try:
379                 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
380                 if traffic_type == self.ICMP and ip_type == self.IPV6:
381                     payload_info = self.payload_to_info(
382                         packet[ICMPv6EchoRequest].data)
383                     payload = packet[ICMPv6EchoRequest]
384                 else:
385                     payload_info = self.payload_to_info(str(packet[Raw]))
386                     payload = packet[self.proto_map[payload_info.proto]]
387             except:
388                 self.logger.error(ppp("Unexpected or invalid packet "
389                                       "(outside network):", packet))
390                 raise
391
392             if ip_type != 0:
393                 self.assertEqual(payload_info.ip, ip_type)
394             if traffic_type == self.ICMP:
395                 try:
396                     if payload_info.ip == 0:
397                         self.assertEqual(payload.type, self.icmp4_type)
398                         self.assertEqual(payload.code, self.icmp4_code)
399                     else:
400                         self.assertEqual(payload.type, self.icmp6_type)
401                         self.assertEqual(payload.code, self.icmp6_code)
402                 except:
403                     self.logger.error(ppp("Unexpected or invalid packet "
404                                           "(outside network):", packet))
405                     raise
406             else:
407                 try:
408                     ip_version = IPv6 if payload_info.ip == 1 else IP
409
410                     ip = packet[ip_version]
411                     packet_index = payload_info.index
412
413                     self.assertEqual(payload_info.dst, dst_sw_if_index)
414                     self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
415                                       (pg_if.name, payload_info.src,
416                                        packet_index))
417                     next_info = self.get_next_packet_info_for_interface2(
418                         payload_info.src, dst_sw_if_index,
419                         last_info[payload_info.src])
420                     last_info[payload_info.src] = next_info
421                     self.assertTrue(next_info is not None)
422                     self.assertEqual(packet_index, next_info.index)
423                     saved_packet = next_info.data
424                     # Check standard fields
425                     self.assertEqual(ip.src, saved_packet[ip_version].src)
426                     self.assertEqual(ip.dst, saved_packet[ip_version].dst)
427                     p = self.proto_map[payload_info.proto]
428                     if p == 'TCP':
429                         tcp = packet[TCP]
430                         self.assertEqual(tcp.sport, saved_packet[
431                             TCP].sport)
432                         self.assertEqual(tcp.dport, saved_packet[
433                             TCP].dport)
434                     elif p == 'UDP':
435                         udp = packet[UDP]
436                         self.assertEqual(udp.sport, saved_packet[
437                             UDP].sport)
438                         self.assertEqual(udp.dport, saved_packet[
439                             UDP].dport)
440                 except:
441                     self.logger.error(ppp("Unexpected or invalid packet:",
442                                           packet))
443                     raise
444         for i in self.pg_interfaces:
445             remaining_packet = self.get_next_packet_info_for_interface2(
446                 i, dst_sw_if_index, last_info[i.sw_if_index])
447             self.assertTrue(
448                 remaining_packet is None,
449                 "Port %u: Packet expected from source %u didn't arrive" %
450                 (dst_sw_if_index, i.sw_if_index))
451
452     def run_traffic_no_check(self):
453         # Test
454         # Create incoming packet streams for packet-generator interfaces
455         for i in self.pg_interfaces:
456             if self.flows.__contains__(i):
457                 pkts = self.create_stream(i, self.pg_if_packet_sizes)
458                 if len(pkts) > 0:
459                     i.add_stream(pkts)
460
461         # Enable packet capture and start packet sending
462         self.pg_enable_capture(self.pg_interfaces)
463         self.pg_start()
464
465     def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
466                         frags=False, pkt_raw=True, etype=-1):
467         # Test
468         # Create incoming packet streams for packet-generator interfaces
469         pkts_cnt = 0
470         for i in self.pg_interfaces:
471             if self.flows.__contains__(i):
472                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
473                                           traffic_type, ip_type, proto, ports,
474                                           frags, pkt_raw, etype)
475                 if len(pkts) > 0:
476                     i.add_stream(pkts)
477                     pkts_cnt += len(pkts)
478
479         # Enable packet capture and start packet sendingself.IPV
480         self.pg_enable_capture(self.pg_interfaces)
481         self.pg_start()
482
483         # Verify
484         # Verify outgoing packet streams per packet-generator interface
485         for src_if in self.pg_interfaces:
486             if self.flows.__contains__(src_if):
487                 for dst_if in self.flows[src_if]:
488                     capture = dst_if.get_capture(pkts_cnt)
489                     self.logger.info("Verifying capture on interface %s" %
490                                      dst_if.name)
491                     self.verify_capture(dst_if, capture,
492                                         traffic_type, ip_type, etype)
493
494     def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
495                               ports=0, frags=False, etype=-1):
496         # Test
497         self.reset_packet_infos()
498         for i in self.pg_interfaces:
499             if self.flows.__contains__(i):
500                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
501                                           traffic_type, ip_type, proto, ports,
502                                           frags, True, etype)
503                 if len(pkts) > 0:
504                     i.add_stream(pkts)
505
506         # Enable packet capture and start packet sending
507         self.pg_enable_capture(self.pg_interfaces)
508         self.pg_start()
509
510         # Verify
511         # Verify outgoing packet streams per packet-generator interface
512         for src_if in self.pg_interfaces:
513             if self.flows.__contains__(src_if):
514                 for dst_if in self.flows[src_if]:
515                     self.logger.info("Verifying capture on interface %s" %
516                                      dst_if.name)
517                     capture = dst_if.get_capture(0)
518                     self.assertEqual(len(capture), 0)
519
520     def test_0000_warmup_test(self):
521         """ ACL plugin version check; learn MACs
522         """
523         self.create_hosts(16)
524         self.run_traffic_no_check()
525         reply = self.vapi.papi.acl_plugin_get_version()
526         self.assertEqual(reply.major, 1)
527         self.logger.info("Working with ACL plugin version: %d.%d" % (
528             reply.major, reply.minor))
529         # minor version changes are non breaking
530         # self.assertEqual(reply.minor, 0)
531
532     def test_0001_acl_create(self):
533         """ ACL create/delete test
534         """
535
536         self.logger.info("ACLP_TEST_START_0001")
537         # Add an ACL
538         r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
539               'srcport_or_icmptype_first': 1234,
540               'srcport_or_icmptype_last': 1235,
541               'src_ip_prefix_len': 0,
542               'src_ip_addr': '\x00\x00\x00\x00',
543               'dstport_or_icmpcode_first': 1234,
544               'dstport_or_icmpcode_last': 1234,
545               'dst_ip_addr': '\x00\x00\x00\x00',
546               'dst_ip_prefix_len': 0}]
547         # Test 1: add a new ACL
548         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r,
549                                           tag="permit 1234")
550         self.assertEqual(reply.retval, 0)
551         # The very first ACL gets #0
552         self.assertEqual(reply.acl_index, 0)
553         first_acl = reply.acl_index
554         rr = self.vapi.acl_dump(reply.acl_index)
555         self.logger.info("Dumped ACL: " + str(rr))
556         self.assertEqual(len(rr), 1)
557         # We should have the same number of ACL entries as we had asked
558         self.assertEqual(len(rr[0].r), len(r))
559         # The rules should be the same. But because the submitted and returned
560         # are different types, we need to iterate over rules and keys to get
561         # to basic values.
562         for i_rule in range(0, len(r) - 1):
563             for rule_key in r[i_rule]:
564                 self.assertEqual(rr[0].r[i_rule][rule_key],
565                                  r[i_rule][rule_key])
566
567         # Add a deny-1234 ACL
568         r_deny = [{'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
569                    'srcport_or_icmptype_first': 1234,
570                    'srcport_or_icmptype_last': 1235,
571                    'src_ip_prefix_len': 0,
572                    'src_ip_addr': '\x00\x00\x00\x00',
573                    'dstport_or_icmpcode_first': 1234,
574                    'dstport_or_icmpcode_last': 1234,
575                    'dst_ip_addr': '\x00\x00\x00\x00',
576                    'dst_ip_prefix_len': 0},
577                   {'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
578                    'srcport_or_icmptype_first': 0,
579                    'srcport_or_icmptype_last': 0,
580                    'src_ip_prefix_len': 0,
581                    'src_ip_addr': '\x00\x00\x00\x00',
582                    'dstport_or_icmpcode_first': 0,
583                    'dstport_or_icmpcode_last': 0,
584                    'dst_ip_addr': '\x00\x00\x00\x00',
585                    'dst_ip_prefix_len': 0}]
586
587         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_deny,
588                                           tag="deny 1234;permit all")
589         self.assertEqual(reply.retval, 0)
590         # The second ACL gets #1
591         self.assertEqual(reply.acl_index, 1)
592         second_acl = reply.acl_index
593
594         # Test 2: try to modify a nonexistent ACL
595         reply = self.vapi.acl_add_replace(acl_index=432, r=r,
596                                           tag="FFFF:FFFF", expected_retval=-6)
597         self.assertEqual(reply.retval, -6)
598         # The ACL number should pass through
599         self.assertEqual(reply.acl_index, 432)
600         # apply an ACL on an interface inbound, try to delete ACL, must fail
601         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
602                                              n_input=1,
603                                              acls=[first_acl])
604         reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=-142)
605         # Unapply an ACL and then try to delete it - must be ok
606         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
607                                              n_input=0,
608                                              acls=[])
609         reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=0)
610
611         # apply an ACL on an interface outbound, try to delete ACL, must fail
612         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
613                                              n_input=0,
614                                              acls=[second_acl])
615         reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=-143)
616         # Unapply the ACL and then try to delete it - must be ok
617         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
618                                              n_input=0,
619                                              acls=[])
620         reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=0)
621
622         # try to apply a nonexistent ACL - must fail
623         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
624                                              n_input=1,
625                                              acls=[first_acl],
626                                              expected_retval=-6)
627
628         self.logger.info("ACLP_TEST_FINISH_0001")
629
630     def test_0002_acl_permit_apply(self):
631         """ permit ACL apply test
632         """
633         self.logger.info("ACLP_TEST_START_0002")
634
635         rules = []
636         rules.append(self.create_rule(self.IPV4, self.PERMIT,
637                      0, self.proto[self.IP][self.UDP]))
638         rules.append(self.create_rule(self.IPV4, self.PERMIT,
639                      0, self.proto[self.IP][self.TCP]))
640
641         # Apply rules
642         self.apply_rules(rules, "permit per-flow")
643
644         # Traffic should still pass
645         self.run_verify_test(self.IP, self.IPV4, -1)
646         self.logger.info("ACLP_TEST_FINISH_0002")
647
648     def test_0003_acl_deny_apply(self):
649         """ deny ACL apply test
650         """
651         self.logger.info("ACLP_TEST_START_0003")
652         # Add a deny-flows ACL
653         rules = []
654         rules.append(self.create_rule(self.IPV4, self.DENY,
655                      self.PORTS_ALL, self.proto[self.IP][self.UDP]))
656         # Permit ip any any in the end
657         rules.append(self.create_rule(self.IPV4, self.PERMIT,
658                                       self.PORTS_ALL, 0))
659
660         # Apply rules
661         self.apply_rules(rules, "deny per-flow;permit all")
662
663         # Traffic should not pass
664         self.run_verify_negat_test(self.IP, self.IPV4,
665                                    self.proto[self.IP][self.UDP])
666         self.logger.info("ACLP_TEST_FINISH_0003")
667         # self.assertEqual(1, 0)
668
669     def test_0004_vpp624_permit_icmpv4(self):
670         """ VPP_624 permit ICMPv4
671         """
672         self.logger.info("ACLP_TEST_START_0004")
673
674         # Add an ACL
675         rules = []
676         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
677                                       self.proto[self.ICMP][self.ICMPv4]))
678         # deny ip any any in the end
679         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
680
681         # Apply rules
682         self.apply_rules(rules, "permit icmpv4")
683
684         # Traffic should still pass
685         self.run_verify_test(self.ICMP, self.IPV4,
686                              self.proto[self.ICMP][self.ICMPv4])
687
688         self.logger.info("ACLP_TEST_FINISH_0004")
689
690     def test_0005_vpp624_permit_icmpv6(self):
691         """ VPP_624 permit ICMPv6
692         """
693         self.logger.info("ACLP_TEST_START_0005")
694
695         # Add an ACL
696         rules = []
697         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
698                                       self.proto[self.ICMP][self.ICMPv6]))
699         # deny ip any any in the end
700         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
701
702         # Apply rules
703         self.apply_rules(rules, "permit icmpv6")
704
705         # Traffic should still pass
706         self.run_verify_test(self.ICMP, self.IPV6,
707                              self.proto[self.ICMP][self.ICMPv6])
708
709         self.logger.info("ACLP_TEST_FINISH_0005")
710
711     def test_0006_vpp624_deny_icmpv4(self):
712         """ VPP_624 deny ICMPv4
713         """
714         self.logger.info("ACLP_TEST_START_0006")
715         # Add an ACL
716         rules = []
717         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
718                                       self.proto[self.ICMP][self.ICMPv4]))
719         # permit ip any any in the end
720         rules.append(self.create_rule(self.IPV4, self.PERMIT,
721                                       self.PORTS_ALL, 0))
722
723         # Apply rules
724         self.apply_rules(rules, "deny icmpv4")
725
726         # Traffic should not pass
727         self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
728
729         self.logger.info("ACLP_TEST_FINISH_0006")
730
731     def test_0007_vpp624_deny_icmpv6(self):
732         """ VPP_624 deny ICMPv6
733         """
734         self.logger.info("ACLP_TEST_START_0007")
735         # Add an ACL
736         rules = []
737         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
738                                       self.proto[self.ICMP][self.ICMPv6]))
739         # deny ip any any in the end
740         rules.append(self.create_rule(self.IPV6, self.PERMIT,
741                                       self.PORTS_ALL, 0))
742
743         # Apply rules
744         self.apply_rules(rules, "deny icmpv6")
745
746         # Traffic should not pass
747         self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
748
749         self.logger.info("ACLP_TEST_FINISH_0007")
750
751     def test_0008_tcp_permit_v4(self):
752         """ permit TCPv4
753         """
754         self.logger.info("ACLP_TEST_START_0008")
755
756         # Add an ACL
757         rules = []
758         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
759                      self.proto[self.IP][self.TCP]))
760         # deny ip any any in the end
761         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
762
763         # Apply rules
764         self.apply_rules(rules, "permit ipv4 tcp")
765
766         # Traffic should still pass
767         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
768
769         self.logger.info("ACLP_TEST_FINISH_0008")
770
771     def test_0009_tcp_permit_v6(self):
772         """ permit TCPv6
773         """
774         self.logger.info("ACLP_TEST_START_0009")
775
776         # Add an ACL
777         rules = []
778         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
779                                       self.proto[self.IP][self.TCP]))
780         # deny ip any any in the end
781         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
782
783         # Apply rules
784         self.apply_rules(rules, "permit ip6 tcp")
785
786         # Traffic should still pass
787         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
788
789         self.logger.info("ACLP_TEST_FINISH_0008")
790
791     def test_0010_udp_permit_v4(self):
792         """ permit UDPv4
793         """
794         self.logger.info("ACLP_TEST_START_0010")
795
796         # Add an ACL
797         rules = []
798         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
799                                       self.proto[self.IP][self.UDP]))
800         # deny ip any any in the end
801         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
802
803         # Apply rules
804         self.apply_rules(rules, "permit ipv udp")
805
806         # Traffic should still pass
807         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
808
809         self.logger.info("ACLP_TEST_FINISH_0010")
810
811     def test_0011_udp_permit_v6(self):
812         """ permit UDPv6
813         """
814         self.logger.info("ACLP_TEST_START_0011")
815
816         # Add an ACL
817         rules = []
818         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
819                                       self.proto[self.IP][self.UDP]))
820         # deny ip any any in the end
821         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
822
823         # Apply rules
824         self.apply_rules(rules, "permit ip6 udp")
825
826         # Traffic should still pass
827         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
828
829         self.logger.info("ACLP_TEST_FINISH_0011")
830
831     def test_0012_tcp_deny(self):
832         """ deny TCPv4/v6
833         """
834         self.logger.info("ACLP_TEST_START_0012")
835
836         # Add an ACL
837         rules = []
838         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
839                                       self.proto[self.IP][self.TCP]))
840         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
841                                       self.proto[self.IP][self.TCP]))
842         # permit ip any any in the end
843         rules.append(self.create_rule(self.IPV4, self.PERMIT,
844                                       self.PORTS_ALL, 0))
845         rules.append(self.create_rule(self.IPV6, self.PERMIT,
846                                       self.PORTS_ALL, 0))
847
848         # Apply rules
849         self.apply_rules(rules, "deny ip4/ip6 tcp")
850
851         # Traffic should not pass
852         self.run_verify_negat_test(self.IP, self.IPRANDOM,
853                                    self.proto[self.IP][self.TCP])
854
855         self.logger.info("ACLP_TEST_FINISH_0012")
856
857     def test_0013_udp_deny(self):
858         """ deny UDPv4/v6
859         """
860         self.logger.info("ACLP_TEST_START_0013")
861
862         # Add an ACL
863         rules = []
864         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
865                                       self.proto[self.IP][self.UDP]))
866         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
867                                       self.proto[self.IP][self.UDP]))
868         # permit ip any any in the end
869         rules.append(self.create_rule(self.IPV4, self.PERMIT,
870                                       self.PORTS_ALL, 0))
871         rules.append(self.create_rule(self.IPV6, self.PERMIT,
872                                       self.PORTS_ALL, 0))
873
874         # Apply rules
875         self.apply_rules(rules, "deny ip4/ip6 udp")
876
877         # Traffic should not pass
878         self.run_verify_negat_test(self.IP, self.IPRANDOM,
879                                    self.proto[self.IP][self.UDP])
880
881         self.logger.info("ACLP_TEST_FINISH_0013")
882
883     def test_0014_acl_dump(self):
884         """ verify add/dump acls
885         """
886         self.logger.info("ACLP_TEST_START_0014")
887
888         r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
889              [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
890              [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
891              [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
892              [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
893              [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
894              [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
895              [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
896              [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
897              [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
898              [self.IPV4, self.DENY, self.PORTS_ALL, 0],
899              [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
900              [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
901              [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
902              [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
903              [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
904              [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
905              [self.IPV6, self.DENY, self.PORTS_ALL, 0]
906              ]
907
908         # Add and verify new ACLs
909         rules = []
910         for i in range(len(r)):
911             rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
912
913         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules)
914         result = self.vapi.acl_dump(reply.acl_index)
915
916         i = 0
917         for drules in result:
918             for dr in drules.r:
919                 self.assertEqual(dr.is_ipv6, r[i][0])
920                 self.assertEqual(dr.is_permit, r[i][1])
921                 self.assertEqual(dr.proto, r[i][3])
922
923                 if r[i][2] > 0:
924                     self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
925                 else:
926                     if r[i][2] < 0:
927                         self.assertEqual(dr.srcport_or_icmptype_first, 0)
928                         self.assertEqual(dr.srcport_or_icmptype_last, 65535)
929                     else:
930                         if dr.proto == self.proto[self.IP][self.TCP]:
931                             self.assertGreater(dr.srcport_or_icmptype_first,
932                                                self.tcp_sport_from-1)
933                             self.assertLess(dr.srcport_or_icmptype_first,
934                                             self.tcp_sport_to+1)
935                             self.assertGreater(dr.dstport_or_icmpcode_last,
936                                                self.tcp_dport_from-1)
937                             self.assertLess(dr.dstport_or_icmpcode_last,
938                                             self.tcp_dport_to+1)
939                         elif dr.proto == self.proto[self.IP][self.UDP]:
940                             self.assertGreater(dr.srcport_or_icmptype_first,
941                                                self.udp_sport_from-1)
942                             self.assertLess(dr.srcport_or_icmptype_first,
943                                             self.udp_sport_to+1)
944                             self.assertGreater(dr.dstport_or_icmpcode_last,
945                                                self.udp_dport_from-1)
946                             self.assertLess(dr.dstport_or_icmpcode_last,
947                                             self.udp_dport_to+1)
948                 i += 1
949
950         self.logger.info("ACLP_TEST_FINISH_0014")
951
952     def test_0015_tcp_permit_port_v4(self):
953         """ permit single TCPv4
954         """
955         self.logger.info("ACLP_TEST_START_0015")
956
957         port = random.randint(0, 65535)
958         # Add an ACL
959         rules = []
960         rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
961                                       self.proto[self.IP][self.TCP]))
962         # deny ip any any in the end
963         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
964
965         # Apply rules
966         self.apply_rules(rules, "permit ip4 tcp "+str(port))
967
968         # Traffic should still pass
969         self.run_verify_test(self.IP, self.IPV4,
970                              self.proto[self.IP][self.TCP], port)
971
972         self.logger.info("ACLP_TEST_FINISH_0015")
973
974     def test_0016_udp_permit_port_v4(self):
975         """ permit single UDPv4
976         """
977         self.logger.info("ACLP_TEST_START_0016")
978
979         port = random.randint(0, 65535)
980         # Add an ACL
981         rules = []
982         rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
983                                       self.proto[self.IP][self.UDP]))
984         # deny ip any any in the end
985         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
986
987         # Apply rules
988         self.apply_rules(rules, "permit ip4 tcp "+str(port))
989
990         # Traffic should still pass
991         self.run_verify_test(self.IP, self.IPV4,
992                              self.proto[self.IP][self.UDP], port)
993
994         self.logger.info("ACLP_TEST_FINISH_0016")
995
996     def test_0017_tcp_permit_port_v6(self):
997         """ permit single TCPv6
998         """
999         self.logger.info("ACLP_TEST_START_0017")
1000
1001         port = random.randint(0, 65535)
1002         # Add an ACL
1003         rules = []
1004         rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1005                                       self.proto[self.IP][self.TCP]))
1006         # deny ip any any in the end
1007         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1008
1009         # Apply rules
1010         self.apply_rules(rules, "permit ip4 tcp "+str(port))
1011
1012         # Traffic should still pass
1013         self.run_verify_test(self.IP, self.IPV6,
1014                              self.proto[self.IP][self.TCP], port)
1015
1016         self.logger.info("ACLP_TEST_FINISH_0017")
1017
1018     def test_0018_udp_permit_port_v6(self):
1019         """ permit single UPPv6
1020         """
1021         self.logger.info("ACLP_TEST_START_0018")
1022
1023         port = random.randint(0, 65535)
1024         # Add an ACL
1025         rules = []
1026         rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1027                                       self.proto[self.IP][self.UDP]))
1028         # deny ip any any in the end
1029         rules.append(self.create_rule(self.IPV6, self.DENY,
1030                                       self.PORTS_ALL, 0))
1031
1032         # Apply rules
1033         self.apply_rules(rules, "permit ip4 tcp "+str(port))
1034
1035         # Traffic should still pass
1036         self.run_verify_test(self.IP, self.IPV6,
1037                              self.proto[self.IP][self.UDP], port)
1038
1039         self.logger.info("ACLP_TEST_FINISH_0018")
1040
1041     def test_0019_udp_deny_port(self):
1042         """ deny single TCPv4/v6
1043         """
1044         self.logger.info("ACLP_TEST_START_0019")
1045
1046         port = random.randint(0, 65535)
1047         # Add an ACL
1048         rules = []
1049         rules.append(self.create_rule(self.IPV4, self.DENY, port,
1050                                       self.proto[self.IP][self.TCP]))
1051         rules.append(self.create_rule(self.IPV6, self.DENY, port,
1052                                       self.proto[self.IP][self.TCP]))
1053         # Permit ip any any in the end
1054         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1055                                       self.PORTS_ALL, 0))
1056         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1057                                       self.PORTS_ALL, 0))
1058
1059         # Apply rules
1060         self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1061
1062         # Traffic should not pass
1063         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1064                                    self.proto[self.IP][self.TCP], port)
1065
1066         self.logger.info("ACLP_TEST_FINISH_0019")
1067
1068     def test_0020_udp_deny_port(self):
1069         """ deny single UDPv4/v6
1070         """
1071         self.logger.info("ACLP_TEST_START_0020")
1072
1073         port = random.randint(0, 65535)
1074         # Add an ACL
1075         rules = []
1076         rules.append(self.create_rule(self.IPV4, self.DENY, port,
1077                                       self.proto[self.IP][self.UDP]))
1078         rules.append(self.create_rule(self.IPV6, self.DENY, port,
1079                                       self.proto[self.IP][self.UDP]))
1080         # Permit ip any any in the end
1081         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1082                                       self.PORTS_ALL, 0))
1083         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1084                                       self.PORTS_ALL, 0))
1085
1086         # Apply rules
1087         self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1088
1089         # Traffic should not pass
1090         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1091                                    self.proto[self.IP][self.UDP], port)
1092
1093         self.logger.info("ACLP_TEST_FINISH_0020")
1094
1095     def test_0021_udp_deny_port_verify_fragment_deny(self):
1096         """ deny single UDPv4/v6, permit ip any, verify non-initial fragment blocked
1097         """
1098         self.logger.info("ACLP_TEST_START_0021")
1099
1100         port = random.randint(0, 65535)
1101         # Add an ACL
1102         rules = []
1103         rules.append(self.create_rule(self.IPV4, self.DENY, port,
1104                                       self.proto[self.IP][self.UDP]))
1105         rules.append(self.create_rule(self.IPV6, self.DENY, port,
1106                                       self.proto[self.IP][self.UDP]))
1107         # deny ip any any in the end
1108         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1109                                       self.PORTS_ALL, 0))
1110         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1111                                       self.PORTS_ALL, 0))
1112
1113         # Apply rules
1114         self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1115
1116         # Traffic should not pass
1117         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1118                                    self.proto[self.IP][self.UDP], port, True)
1119
1120         self.logger.info("ACLP_TEST_FINISH_0021")
1121
1122     def test_0022_zero_length_udp_ipv4(self):
1123         """ VPP-687 zero length udp ipv4 packet"""
1124         self.logger.info("ACLP_TEST_START_0022")
1125
1126         port = random.randint(0, 65535)
1127         # Add an ACL
1128         rules = []
1129         rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
1130                                       self.proto[self.IP][self.UDP]))
1131         # deny ip any any in the end
1132         rules.append(
1133             self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1134
1135         # Apply rules
1136         self.apply_rules(rules, "permit empty udp ip4 " + str(port))
1137
1138         # Traffic should still pass
1139         # Create incoming packet streams for packet-generator interfaces
1140         pkts_cnt = 0
1141         pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1142                                   self.IP, self.IPV4,
1143                                   self.proto[self.IP][self.UDP], port,
1144                                   False, False)
1145         if len(pkts) > 0:
1146             self.pg0.add_stream(pkts)
1147             pkts_cnt += len(pkts)
1148
1149         # Enable packet capture and start packet sendingself.IPV
1150         self.pg_enable_capture(self.pg_interfaces)
1151         self.pg_start()
1152
1153         self.pg1.get_capture(pkts_cnt)
1154
1155         self.logger.info("ACLP_TEST_FINISH_0022")
1156
1157     def test_0023_zero_length_udp_ipv6(self):
1158         """ VPP-687 zero length udp ipv6 packet"""
1159         self.logger.info("ACLP_TEST_START_0023")
1160
1161         port = random.randint(0, 65535)
1162         # Add an ACL
1163         rules = []
1164         rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1165                                       self.proto[self.IP][self.UDP]))
1166         # deny ip any any in the end
1167         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1168
1169         # Apply rules
1170         self.apply_rules(rules, "permit empty udp ip6 "+str(port))
1171
1172         # Traffic should still pass
1173         # Create incoming packet streams for packet-generator interfaces
1174         pkts_cnt = 0
1175         pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1176                                   self.IP, self.IPV6,
1177                                   self.proto[self.IP][self.UDP], port,
1178                                   False, False)
1179         if len(pkts) > 0:
1180             self.pg0.add_stream(pkts)
1181             pkts_cnt += len(pkts)
1182
1183         # Enable packet capture and start packet sendingself.IPV
1184         self.pg_enable_capture(self.pg_interfaces)
1185         self.pg_start()
1186
1187         # Verify outgoing packet streams per packet-generator interface
1188         self.pg1.get_capture(pkts_cnt)
1189
1190         self.logger.info("ACLP_TEST_FINISH_0023")
1191
1192     def test_0108_tcp_permit_v4(self):
1193         """ permit TCPv4 + non-match range
1194         """
1195         self.logger.info("ACLP_TEST_START_0108")
1196
1197         # Add an ACL
1198         rules = []
1199         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1200                      self.proto[self.IP][self.TCP]))
1201         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1202                      self.proto[self.IP][self.TCP]))
1203         # deny ip any any in the end
1204         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1205
1206         # Apply rules
1207         self.apply_rules(rules, "permit ipv4 tcp")
1208
1209         # Traffic should still pass
1210         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1211
1212         self.logger.info("ACLP_TEST_FINISH_0108")
1213
1214     def test_0109_tcp_permit_v6(self):
1215         """ permit TCPv6 + non-match range
1216         """
1217         self.logger.info("ACLP_TEST_START_0109")
1218
1219         # Add an ACL
1220         rules = []
1221         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1222                                       self.proto[self.IP][self.TCP]))
1223         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1224                                       self.proto[self.IP][self.TCP]))
1225         # deny ip any any in the end
1226         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1227
1228         # Apply rules
1229         self.apply_rules(rules, "permit ip6 tcp")
1230
1231         # Traffic should still pass
1232         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
1233
1234         self.logger.info("ACLP_TEST_FINISH_0109")
1235
1236     def test_0110_udp_permit_v4(self):
1237         """ permit UDPv4 + non-match range
1238         """
1239         self.logger.info("ACLP_TEST_START_0110")
1240
1241         # Add an ACL
1242         rules = []
1243         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1244                                       self.proto[self.IP][self.UDP]))
1245         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1246                                       self.proto[self.IP][self.UDP]))
1247         # deny ip any any in the end
1248         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1249
1250         # Apply rules
1251         self.apply_rules(rules, "permit ipv4 udp")
1252
1253         # Traffic should still pass
1254         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
1255
1256         self.logger.info("ACLP_TEST_FINISH_0110")
1257
1258     def test_0111_udp_permit_v6(self):
1259         """ permit UDPv6 + non-match range
1260         """
1261         self.logger.info("ACLP_TEST_START_0111")
1262
1263         # Add an ACL
1264         rules = []
1265         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1266                                       self.proto[self.IP][self.UDP]))
1267         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1268                                       self.proto[self.IP][self.UDP]))
1269         # deny ip any any in the end
1270         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1271
1272         # Apply rules
1273         self.apply_rules(rules, "permit ip6 udp")
1274
1275         # Traffic should still pass
1276         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
1277
1278         self.logger.info("ACLP_TEST_FINISH_0111")
1279
1280     def test_0112_tcp_deny(self):
1281         """ deny TCPv4/v6 + non-match range
1282         """
1283         self.logger.info("ACLP_TEST_START_0112")
1284
1285         # Add an ACL
1286         rules = []
1287         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1288                                       self.PORTS_RANGE_2,
1289                                       self.proto[self.IP][self.TCP]))
1290         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1291                                       self.PORTS_RANGE_2,
1292                                       self.proto[self.IP][self.TCP]))
1293         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1294                                       self.proto[self.IP][self.TCP]))
1295         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1296                                       self.proto[self.IP][self.TCP]))
1297         # permit ip any any in the end
1298         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1299                                       self.PORTS_ALL, 0))
1300         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1301                                       self.PORTS_ALL, 0))
1302
1303         # Apply rules
1304         self.apply_rules(rules, "deny ip4/ip6 tcp")
1305
1306         # Traffic should not pass
1307         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1308                                    self.proto[self.IP][self.TCP])
1309
1310         self.logger.info("ACLP_TEST_FINISH_0112")
1311
1312     def test_0113_udp_deny(self):
1313         """ deny UDPv4/v6 + non-match range
1314         """
1315         self.logger.info("ACLP_TEST_START_0113")
1316
1317         # Add an ACL
1318         rules = []
1319         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1320                                       self.PORTS_RANGE_2,
1321                                       self.proto[self.IP][self.UDP]))
1322         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1323                                       self.PORTS_RANGE_2,
1324                                       self.proto[self.IP][self.UDP]))
1325         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1326                                       self.proto[self.IP][self.UDP]))
1327         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1328                                       self.proto[self.IP][self.UDP]))
1329         # permit ip any any in the end
1330         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1331                                       self.PORTS_ALL, 0))
1332         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1333                                       self.PORTS_ALL, 0))
1334
1335         # Apply rules
1336         self.apply_rules(rules, "deny ip4/ip6 udp")
1337
1338         # Traffic should not pass
1339         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1340                                    self.proto[self.IP][self.UDP])
1341
1342         self.logger.info("ACLP_TEST_FINISH_0113")
1343
1344     def test_0300_tcp_permit_v4_etype_aaaa(self):
1345         """ permit TCPv4, send 0xAAAA etype
1346         """
1347         self.logger.info("ACLP_TEST_START_0300")
1348
1349         # Add an ACL
1350         rules = []
1351         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1352                      self.proto[self.IP][self.TCP]))
1353         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1354                      self.proto[self.IP][self.TCP]))
1355         # deny ip any any in the end
1356         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1357
1358         # Apply rules
1359         self.apply_rules(rules, "permit ipv4 tcp")
1360
1361         # Traffic should still pass
1362         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1363
1364         # Traffic should still pass also for an odd ethertype
1365         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1366                              0, False, True, 0xaaaa)
1367
1368         self.logger.info("ACLP_TEST_FINISH_0300")
1369
1370     def test_0305_tcp_permit_v4_etype_blacklist_aaaa(self):
1371         """ permit TCPv4, whitelist 0x0BBB ethertype, send 0xAAAA, 0x0BBB
1372         """
1373         self.logger.info("ACLP_TEST_START_0305")
1374
1375         # Add an ACL
1376         rules = []
1377         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1378                      self.proto[self.IP][self.TCP]))
1379         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1380                      self.proto[self.IP][self.TCP]))
1381         # deny ip any any in the end
1382         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1383
1384         # Apply rules
1385         self.apply_rules(rules, "permit ipv4 tcp")
1386
1387         # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1388         self.etype_whitelist([0xbbb], 1)
1389
1390         # The IPv4 traffic should still pass
1391         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1392
1393         # The oddball ethertype should be blocked
1394         self.run_verify_negat_test(self.IP, self.IPV4,
1395                                    self.proto[self.IP][self.TCP],
1396                                    0, False, 0xaaaa)
1397
1398         # The whitelisted traffic, on the other hand, should pass
1399         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1400                              0, False, True, 0x0bbb)
1401
1402         # remove the whitelist, the previously blocked 0xAAAA should pass now
1403         self.etype_whitelist([], 0)
1404         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1405                              0, False, True, 0xaaaa)
1406
1407         self.logger.info("ACLP_TEST_FINISH_0305")
1408
1409     def test_0315_del_intf(self):
1410         """ apply an acl and delete the interface
1411         """
1412         self.logger.info("ACLP_TEST_START_0315")
1413
1414         # Add an ACL
1415         rules = []
1416         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1417                      self.proto[self.IP][self.TCP]))
1418         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1419                      self.proto[self.IP][self.TCP]))
1420         # deny ip any any in the end
1421         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1422
1423         # create an interface
1424         intf = []
1425         intf.append(VppLoInterface(self, 0))
1426
1427         # Apply rules
1428         self.apply_rules_to(rules, "permit ipv4 tcp", intf[0].sw_if_index)
1429
1430         # Remove the interface
1431         intf[0].remove_vpp_config()
1432
1433         self.logger.info("ACLP_TEST_FINISH_0315")
1434
1435 if __name__ == '__main__':
1436     unittest.main(testRunner=VppTestRunner)