MAP: Convert from DPO to input feature.
[vpp.git] / test / test_acl_plugin.py
1 #!/usr/bin/env python
2 """ACL plugin Test Case HLD:
3 """
4
5 import unittest
6 import random
7
8 from scapy.packet import Raw
9 from scapy.layers.l2 import Ether
10 from scapy.layers.inet import IP, TCP, UDP, ICMP
11 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
12 from scapy.layers.inet6 import IPv6ExtHdrFragment
13 from framework import VppTestCase, VppTestRunner
14 from util import Host, ppp
15
16 from vpp_lo_interface import VppLoInterface
17
18
19 class TestACLplugin(VppTestCase):
20     """ ACL plugin Test Case """
21
22     # traffic types
23     IP = 0
24     ICMP = 1
25
26     # IP version
27     IPRANDOM = -1
28     IPV4 = 0
29     IPV6 = 1
30
31     # rule types
32     DENY = 0
33     PERMIT = 1
34
35     # supported protocols
36     proto = [[6, 17], [1, 58]]
37     proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
38     ICMPv4 = 0
39     ICMPv6 = 1
40     TCP = 0
41     UDP = 1
42     PROTO_ALL = 0
43
44     # port ranges
45     PORTS_ALL = -1
46     PORTS_RANGE = 0
47     PORTS_RANGE_2 = 1
48     udp_sport_from = 10
49     udp_sport_to = udp_sport_from + 5
50     udp_dport_from = 20000
51     udp_dport_to = udp_dport_from + 5000
52     tcp_sport_from = 30
53     tcp_sport_to = tcp_sport_from + 5
54     tcp_dport_from = 40000
55     tcp_dport_to = tcp_dport_from + 5000
56
57     udp_sport_from_2 = 90
58     udp_sport_to_2 = udp_sport_from_2 + 5
59     udp_dport_from_2 = 30000
60     udp_dport_to_2 = udp_dport_from_2 + 5000
61     tcp_sport_from_2 = 130
62     tcp_sport_to_2 = tcp_sport_from_2 + 5
63     tcp_dport_from_2 = 20000
64     tcp_dport_to_2 = tcp_dport_from_2 + 5000
65
66     icmp4_type = 8  # echo request
67     icmp4_code = 3
68     icmp6_type = 128  # echo request
69     icmp6_code = 3
70
71     icmp4_type_2 = 8
72     icmp4_code_from_2 = 5
73     icmp4_code_to_2 = 20
74     icmp6_type_2 = 128
75     icmp6_code_from_2 = 8
76     icmp6_code_to_2 = 42
77
78     # Test variables
79     bd_id = 1
80
81     @classmethod
82     def setUpClass(cls):
83         """
84         Perform standard class setup (defined by class method setUpClass in
85         class VppTestCase) before running the test case, set test case related
86         variables and configure VPP.
87         """
88         super(TestACLplugin, cls).setUpClass()
89
90         try:
91             # Create 2 pg interfaces
92             cls.create_pg_interfaces(range(2))
93
94             # Packet flows mapping pg0 -> pg1, pg2 etc.
95             cls.flows = dict()
96             cls.flows[cls.pg0] = [cls.pg1]
97
98             # Packet sizes
99             cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
100
101             # Create BD with MAC learning and unknown unicast flooding disabled
102             # and put interfaces to this BD
103             cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
104                                            learn=1)
105             for pg_if in cls.pg_interfaces:
106                 cls.vapi.sw_interface_set_l2_bridge(pg_if.sw_if_index,
107                                                     bd_id=cls.bd_id)
108
109             # Set up all interfaces
110             for i in cls.pg_interfaces:
111                 i.admin_up()
112
113             # Mapping between packet-generator index and lists of test hosts
114             cls.hosts_by_pg_idx = dict()
115             for pg_if in cls.pg_interfaces:
116                 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
117
118             # Create list of deleted hosts
119             cls.deleted_hosts_by_pg_idx = dict()
120             for pg_if in cls.pg_interfaces:
121                 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
122
123             # warm-up the mac address tables
124             # self.warmup_test()
125             count = 16
126             start = 0
127             n_int = len(cls.pg_interfaces)
128             macs_per_if = count / n_int
129             i = -1
130             for pg_if in cls.pg_interfaces:
131                 i += 1
132                 start_nr = macs_per_if * i + start
133                 end_nr = count + start if i == (n_int - 1) \
134                     else macs_per_if * (i + 1) + start
135                 hosts = cls.hosts_by_pg_idx[pg_if.sw_if_index]
136                 for j in range(start_nr, end_nr):
137                     host = Host(
138                         "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
139                         "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
140                         "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
141                     hosts.append(host)
142
143         except Exception:
144             super(TestACLplugin, cls).tearDownClass()
145             raise
146
147     def setUp(self):
148         super(TestACLplugin, self).setUp()
149         self.reset_packet_infos()
150
151     def tearDown(self):
152         """
153         Show various debug prints after each test.
154         """
155         super(TestACLplugin, self).tearDown()
156         if not self.vpp_dead:
157             cli = "show vlib graph l2-input-feat-arc"
158             self.logger.info(self.vapi.ppcli(cli))
159             cli = "show vlib graph l2-input-feat-arc-end"
160             self.logger.info(self.vapi.ppcli(cli))
161             cli = "show vlib graph l2-output-feat-arc"
162             self.logger.info(self.vapi.ppcli(cli))
163             cli = "show vlib graph l2-output-feat-arc-end"
164             self.logger.info(self.vapi.ppcli(cli))
165             self.logger.info(self.vapi.ppcli("show l2fib verbose"))
166             self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
167             self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
168             self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
169             self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
170                                              % self.bd_id))
171
172     def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
173                     s_prefix=0, s_ip='\x00\x00\x00\x00',
174                     d_prefix=0, d_ip='\x00\x00\x00\x00'):
175         if proto == -1:
176             return
177         if ports == self.PORTS_ALL:
178             sport_from = 0
179             dport_from = 0
180             sport_to = 65535 if proto != 1 and proto != 58 else 255
181             dport_to = sport_to
182         elif ports == self.PORTS_RANGE:
183             if proto == 1:
184                 sport_from = self.icmp4_type
185                 sport_to = self.icmp4_type
186                 dport_from = self.icmp4_code
187                 dport_to = self.icmp4_code
188             elif proto == 58:
189                 sport_from = self.icmp6_type
190                 sport_to = self.icmp6_type
191                 dport_from = self.icmp6_code
192                 dport_to = self.icmp6_code
193             elif proto == self.proto[self.IP][self.TCP]:
194                 sport_from = self.tcp_sport_from
195                 sport_to = self.tcp_sport_to
196                 dport_from = self.tcp_dport_from
197                 dport_to = self.tcp_dport_to
198             elif proto == self.proto[self.IP][self.UDP]:
199                 sport_from = self.udp_sport_from
200                 sport_to = self.udp_sport_to
201                 dport_from = self.udp_dport_from
202                 dport_to = self.udp_dport_to
203         elif ports == self.PORTS_RANGE_2:
204             if proto == 1:
205                 sport_from = self.icmp4_type_2
206                 sport_to = self.icmp4_type_2
207                 dport_from = self.icmp4_code_from_2
208                 dport_to = self.icmp4_code_to_2
209             elif proto == 58:
210                 sport_from = self.icmp6_type_2
211                 sport_to = self.icmp6_type_2
212                 dport_from = self.icmp6_code_from_2
213                 dport_to = self.icmp6_code_to_2
214             elif proto == self.proto[self.IP][self.TCP]:
215                 sport_from = self.tcp_sport_from_2
216                 sport_to = self.tcp_sport_to_2
217                 dport_from = self.tcp_dport_from_2
218                 dport_to = self.tcp_dport_to_2
219             elif proto == self.proto[self.IP][self.UDP]:
220                 sport_from = self.udp_sport_from_2
221                 sport_to = self.udp_sport_to_2
222                 dport_from = self.udp_dport_from_2
223                 dport_to = self.udp_dport_to_2
224         else:
225             sport_from = ports
226             sport_to = ports
227             dport_from = ports
228             dport_to = ports
229
230         rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto,
231                  'srcport_or_icmptype_first': sport_from,
232                  'srcport_or_icmptype_last': sport_to,
233                  'src_ip_prefix_len': s_prefix,
234                  'src_ip_addr': s_ip,
235                  'dstport_or_icmpcode_first': dport_from,
236                  'dstport_or_icmpcode_last': dport_to,
237                  'dst_ip_prefix_len': d_prefix,
238                  'dst_ip_addr': d_ip})
239         return rule
240
241     def apply_rules(self, rules, tag=''):
242         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
243                                           tag=tag)
244         self.logger.info("Dumped ACL: " + str(
245             self.vapi.acl_dump(reply.acl_index)))
246         # Apply a ACL on the interface as inbound
247         for i in self.pg_interfaces:
248             self.vapi.acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
249                                                  n_input=1,
250                                                  acls=[reply.acl_index])
251         return
252
253     def apply_rules_to(self, rules, tag='', sw_if_index=0xFFFFFFFF):
254         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
255                                           tag=tag)
256         self.logger.info("Dumped ACL: " + str(
257             self.vapi.acl_dump(reply.acl_index)))
258         # Apply a ACL on the interface as inbound
259         self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index,
260                                              n_input=1,
261                                              acls=[reply.acl_index])
262         return
263
264     def etype_whitelist(self, whitelist, n_input):
265         # Apply whitelists on all the interfaces
266         for i in self.pg_interfaces:
267             # checkstyle can't read long names. Help them.
268             fun = self.vapi.acl_interface_set_etype_whitelist
269             fun(sw_if_index=i.sw_if_index, n_input=n_input,
270                 whitelist=whitelist)
271         return
272
273     def create_upper_layer(self, packet_index, proto, ports=0):
274         p = self.proto_map[proto]
275         if p == 'UDP':
276             if ports == 0:
277                 return UDP(sport=random.randint(self.udp_sport_from,
278                                                 self.udp_sport_to),
279                            dport=random.randint(self.udp_dport_from,
280                                                 self.udp_dport_to))
281             else:
282                 return UDP(sport=ports, dport=ports)
283         elif p == 'TCP':
284             if ports == 0:
285                 return TCP(sport=random.randint(self.tcp_sport_from,
286                                                 self.tcp_sport_to),
287                            dport=random.randint(self.tcp_dport_from,
288                                                 self.tcp_dport_to))
289             else:
290                 return TCP(sport=ports, dport=ports)
291         return ''
292
293     def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
294                       proto=-1, ports=0, fragments=False,
295                       pkt_raw=True, etype=-1):
296         """
297         Create input packet stream for defined interface using hosts or
298         deleted_hosts list.
299
300         :param object src_if: Interface to create packet stream for.
301         :param list packet_sizes: List of required packet sizes.
302         :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
303         :return: Stream of packets.
304         """
305         pkts = []
306         if self.flows.__contains__(src_if):
307             src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
308             for dst_if in self.flows[src_if]:
309                 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
310                 n_int = len(dst_hosts) * len(src_hosts)
311                 for i in range(0, n_int):
312                     dst_host = dst_hosts[i / len(src_hosts)]
313                     src_host = src_hosts[i % len(src_hosts)]
314                     pkt_info = self.create_packet_info(src_if, dst_if)
315                     if ipv6 == 1:
316                         pkt_info.ip = 1
317                     elif ipv6 == 0:
318                         pkt_info.ip = 0
319                     else:
320                         pkt_info.ip = random.choice([0, 1])
321                     if proto == -1:
322                         pkt_info.proto = random.choice(self.proto[self.IP])
323                     else:
324                         pkt_info.proto = proto
325                     payload = self.info_to_payload(pkt_info)
326                     p = Ether(dst=dst_host.mac, src=src_host.mac)
327                     if etype > 0:
328                         p = Ether(dst=dst_host.mac,
329                                   src=src_host.mac,
330                                   type=etype)
331                     if pkt_info.ip:
332                         p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
333                         if fragments:
334                             p /= IPv6ExtHdrFragment(offset=64, m=1)
335                     else:
336                         if fragments:
337                             p /= IP(src=src_host.ip4, dst=dst_host.ip4,
338                                     flags=1, frag=64)
339                         else:
340                             p /= IP(src=src_host.ip4, dst=dst_host.ip4)
341                     if traffic_type == self.ICMP:
342                         if pkt_info.ip:
343                             p /= ICMPv6EchoRequest(type=self.icmp6_type,
344                                                    code=self.icmp6_code)
345                         else:
346                             p /= ICMP(type=self.icmp4_type,
347                                       code=self.icmp4_code)
348                     else:
349                         p /= self.create_upper_layer(i, pkt_info.proto, ports)
350                     if pkt_raw:
351                         p /= Raw(payload)
352                         pkt_info.data = p.copy()
353                     if pkt_raw:
354                         size = random.choice(packet_sizes)
355                         self.extend_packet(p, size)
356                     pkts.append(p)
357         return pkts
358
359     def verify_capture(self, pg_if, capture,
360                        traffic_type=0, ip_type=0, etype=-1):
361         """
362         Verify captured input packet stream for defined interface.
363
364         :param object pg_if: Interface to verify captured packet stream for.
365         :param list capture: Captured packet stream.
366         :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
367         """
368         last_info = dict()
369         for i in self.pg_interfaces:
370             last_info[i.sw_if_index] = None
371         dst_sw_if_index = pg_if.sw_if_index
372         for packet in capture:
373             if etype > 0:
374                 if packet[Ether].type != etype:
375                     self.logger.error(ppp("Unexpected ethertype in packet:",
376                                           packet))
377                 else:
378                     continue
379             try:
380                 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
381                 if traffic_type == self.ICMP and ip_type == self.IPV6:
382                     payload_info = self.payload_to_info(
383                         packet[ICMPv6EchoRequest].data)
384                     payload = packet[ICMPv6EchoRequest]
385                 else:
386                     payload_info = self.payload_to_info(str(packet[Raw]))
387                     payload = packet[self.proto_map[payload_info.proto]]
388             except:
389                 self.logger.error(ppp("Unexpected or invalid packet "
390                                       "(outside network):", packet))
391                 raise
392
393             if ip_type != 0:
394                 self.assertEqual(payload_info.ip, ip_type)
395             if traffic_type == self.ICMP:
396                 try:
397                     if payload_info.ip == 0:
398                         self.assertEqual(payload.type, self.icmp4_type)
399                         self.assertEqual(payload.code, self.icmp4_code)
400                     else:
401                         self.assertEqual(payload.type, self.icmp6_type)
402                         self.assertEqual(payload.code, self.icmp6_code)
403                 except:
404                     self.logger.error(ppp("Unexpected or invalid packet "
405                                           "(outside network):", packet))
406                     raise
407             else:
408                 try:
409                     ip_version = IPv6 if payload_info.ip == 1 else IP
410
411                     ip = packet[ip_version]
412                     packet_index = payload_info.index
413
414                     self.assertEqual(payload_info.dst, dst_sw_if_index)
415                     self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
416                                       (pg_if.name, payload_info.src,
417                                        packet_index))
418                     next_info = self.get_next_packet_info_for_interface2(
419                         payload_info.src, dst_sw_if_index,
420                         last_info[payload_info.src])
421                     last_info[payload_info.src] = next_info
422                     self.assertTrue(next_info is not None)
423                     self.assertEqual(packet_index, next_info.index)
424                     saved_packet = next_info.data
425                     # Check standard fields
426                     self.assertEqual(ip.src, saved_packet[ip_version].src)
427                     self.assertEqual(ip.dst, saved_packet[ip_version].dst)
428                     p = self.proto_map[payload_info.proto]
429                     if p == 'TCP':
430                         tcp = packet[TCP]
431                         self.assertEqual(tcp.sport, saved_packet[
432                             TCP].sport)
433                         self.assertEqual(tcp.dport, saved_packet[
434                             TCP].dport)
435                     elif p == 'UDP':
436                         udp = packet[UDP]
437                         self.assertEqual(udp.sport, saved_packet[
438                             UDP].sport)
439                         self.assertEqual(udp.dport, saved_packet[
440                             UDP].dport)
441                 except:
442                     self.logger.error(ppp("Unexpected or invalid packet:",
443                                           packet))
444                     raise
445         for i in self.pg_interfaces:
446             remaining_packet = self.get_next_packet_info_for_interface2(
447                 i, dst_sw_if_index, last_info[i.sw_if_index])
448             self.assertTrue(
449                 remaining_packet is None,
450                 "Port %u: Packet expected from source %u didn't arrive" %
451                 (dst_sw_if_index, i.sw_if_index))
452
453     def run_traffic_no_check(self):
454         # Test
455         # Create incoming packet streams for packet-generator interfaces
456         for i in self.pg_interfaces:
457             if self.flows.__contains__(i):
458                 pkts = self.create_stream(i, self.pg_if_packet_sizes)
459                 if len(pkts) > 0:
460                     i.add_stream(pkts)
461
462         # Enable packet capture and start packet sending
463         self.pg_enable_capture(self.pg_interfaces)
464         self.pg_start()
465
466     def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
467                         frags=False, pkt_raw=True, etype=-1):
468         # Test
469         # Create incoming packet streams for packet-generator interfaces
470         pkts_cnt = 0
471         for i in self.pg_interfaces:
472             if self.flows.__contains__(i):
473                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
474                                           traffic_type, ip_type, proto, ports,
475                                           frags, pkt_raw, etype)
476                 if len(pkts) > 0:
477                     i.add_stream(pkts)
478                     pkts_cnt += len(pkts)
479
480         # Enable packet capture and start packet sendingself.IPV
481         self.pg_enable_capture(self.pg_interfaces)
482         self.pg_start()
483         self.logger.info("sent packets count: %d" % pkts_cnt)
484
485         # Verify
486         # Verify outgoing packet streams per packet-generator interface
487         for src_if in self.pg_interfaces:
488             if self.flows.__contains__(src_if):
489                 for dst_if in self.flows[src_if]:
490                     capture = dst_if.get_capture(pkts_cnt)
491                     self.logger.info("Verifying capture on interface %s" %
492                                      dst_if.name)
493                     self.verify_capture(dst_if, capture,
494                                         traffic_type, ip_type, etype)
495
496     def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
497                               ports=0, frags=False, etype=-1):
498         # Test
499         pkts_cnt = 0
500         self.reset_packet_infos()
501         for i in self.pg_interfaces:
502             if self.flows.__contains__(i):
503                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
504                                           traffic_type, ip_type, proto, ports,
505                                           frags, True, etype)
506                 if len(pkts) > 0:
507                     i.add_stream(pkts)
508                     pkts_cnt += len(pkts)
509
510         # Enable packet capture and start packet sending
511         self.pg_enable_capture(self.pg_interfaces)
512         self.pg_start()
513         self.logger.info("sent packets count: %d" % pkts_cnt)
514
515         # Verify
516         # Verify outgoing packet streams per packet-generator interface
517         for src_if in self.pg_interfaces:
518             if self.flows.__contains__(src_if):
519                 for dst_if in self.flows[src_if]:
520                     self.logger.info("Verifying capture on interface %s" %
521                                      dst_if.name)
522                     capture = dst_if.get_capture(0)
523                     self.assertEqual(len(capture), 0)
524
525     def test_0000_warmup_test(self):
526         """ ACL plugin version check; learn MACs
527         """
528         reply = self.vapi.papi.acl_plugin_get_version()
529         self.assertEqual(reply.major, 1)
530         self.logger.info("Working with ACL plugin version: %d.%d" % (
531             reply.major, reply.minor))
532         # minor version changes are non breaking
533         # self.assertEqual(reply.minor, 0)
534
535     def test_0001_acl_create(self):
536         """ ACL create/delete test
537         """
538
539         self.logger.info("ACLP_TEST_START_0001")
540         # Add an ACL
541         r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
542               'srcport_or_icmptype_first': 1234,
543               'srcport_or_icmptype_last': 1235,
544               'src_ip_prefix_len': 0,
545               'src_ip_addr': '\x00\x00\x00\x00',
546               'dstport_or_icmpcode_first': 1234,
547               'dstport_or_icmpcode_last': 1234,
548               'dst_ip_addr': '\x00\x00\x00\x00',
549               'dst_ip_prefix_len': 0}]
550         # Test 1: add a new ACL
551         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r,
552                                           tag="permit 1234")
553         self.assertEqual(reply.retval, 0)
554         # The very first ACL gets #0
555         self.assertEqual(reply.acl_index, 0)
556         first_acl = reply.acl_index
557         rr = self.vapi.acl_dump(reply.acl_index)
558         self.logger.info("Dumped ACL: " + str(rr))
559         self.assertEqual(len(rr), 1)
560         # We should have the same number of ACL entries as we had asked
561         self.assertEqual(len(rr[0].r), len(r))
562         # The rules should be the same. But because the submitted and returned
563         # are different types, we need to iterate over rules and keys to get
564         # to basic values.
565         for i_rule in range(0, len(r) - 1):
566             for rule_key in r[i_rule]:
567                 self.assertEqual(rr[0].r[i_rule][rule_key],
568                                  r[i_rule][rule_key])
569
570         # Add a deny-1234 ACL
571         r_deny = [{'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
572                    'srcport_or_icmptype_first': 1234,
573                    'srcport_or_icmptype_last': 1235,
574                    'src_ip_prefix_len': 0,
575                    'src_ip_addr': '\x00\x00\x00\x00',
576                    'dstport_or_icmpcode_first': 1234,
577                    'dstport_or_icmpcode_last': 1234,
578                    'dst_ip_addr': '\x00\x00\x00\x00',
579                    'dst_ip_prefix_len': 0},
580                   {'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
581                    'srcport_or_icmptype_first': 0,
582                    'srcport_or_icmptype_last': 0,
583                    'src_ip_prefix_len': 0,
584                    'src_ip_addr': '\x00\x00\x00\x00',
585                    'dstport_or_icmpcode_first': 0,
586                    'dstport_or_icmpcode_last': 0,
587                    'dst_ip_addr': '\x00\x00\x00\x00',
588                    'dst_ip_prefix_len': 0}]
589
590         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_deny,
591                                           tag="deny 1234;permit all")
592         self.assertEqual(reply.retval, 0)
593         # The second ACL gets #1
594         self.assertEqual(reply.acl_index, 1)
595         second_acl = reply.acl_index
596
597         # Test 2: try to modify a nonexistent ACL
598         reply = self.vapi.acl_add_replace(acl_index=432, r=r,
599                                           tag="FFFF:FFFF", expected_retval=-6)
600         self.assertEqual(reply.retval, -6)
601         # The ACL number should pass through
602         self.assertEqual(reply.acl_index, 432)
603         # apply an ACL on an interface inbound, try to delete ACL, must fail
604         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
605                                              n_input=1,
606                                              acls=[first_acl])
607         reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=-142)
608         # Unapply an ACL and then try to delete it - must be ok
609         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
610                                              n_input=0,
611                                              acls=[])
612         reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=0)
613
614         # apply an ACL on an interface outbound, try to delete ACL, must fail
615         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
616                                              n_input=0,
617                                              acls=[second_acl])
618         reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=-143)
619         # Unapply the ACL and then try to delete it - must be ok
620         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
621                                              n_input=0,
622                                              acls=[])
623         reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=0)
624
625         # try to apply a nonexistent ACL - must fail
626         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
627                                              n_input=1,
628                                              acls=[first_acl],
629                                              expected_retval=-6)
630
631         self.logger.info("ACLP_TEST_FINISH_0001")
632
633     def test_0002_acl_permit_apply(self):
634         """ permit ACL apply test
635         """
636         self.logger.info("ACLP_TEST_START_0002")
637
638         rules = []
639         rules.append(self.create_rule(self.IPV4, self.PERMIT,
640                      0, self.proto[self.IP][self.UDP]))
641         rules.append(self.create_rule(self.IPV4, self.PERMIT,
642                      0, self.proto[self.IP][self.TCP]))
643
644         # Apply rules
645         self.apply_rules(rules, "permit per-flow")
646
647         # Traffic should still pass
648         self.run_verify_test(self.IP, self.IPV4, -1)
649         self.logger.info("ACLP_TEST_FINISH_0002")
650
651     def test_0003_acl_deny_apply(self):
652         """ deny ACL apply test
653         """
654         self.logger.info("ACLP_TEST_START_0003")
655         # Add a deny-flows ACL
656         rules = []
657         rules.append(self.create_rule(self.IPV4, self.DENY,
658                      self.PORTS_ALL, self.proto[self.IP][self.UDP]))
659         # Permit ip any any in the end
660         rules.append(self.create_rule(self.IPV4, self.PERMIT,
661                                       self.PORTS_ALL, 0))
662
663         # Apply rules
664         self.apply_rules(rules, "deny per-flow;permit all")
665
666         # Traffic should not pass
667         self.run_verify_negat_test(self.IP, self.IPV4,
668                                    self.proto[self.IP][self.UDP])
669         self.logger.info("ACLP_TEST_FINISH_0003")
670         # self.assertEqual(1, 0)
671
672     def test_0004_vpp624_permit_icmpv4(self):
673         """ VPP_624 permit ICMPv4
674         """
675         self.logger.info("ACLP_TEST_START_0004")
676
677         # Add an ACL
678         rules = []
679         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
680                                       self.proto[self.ICMP][self.ICMPv4]))
681         # deny ip any any in the end
682         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
683
684         # Apply rules
685         self.apply_rules(rules, "permit icmpv4")
686
687         # Traffic should still pass
688         self.run_verify_test(self.ICMP, self.IPV4,
689                              self.proto[self.ICMP][self.ICMPv4])
690
691         self.logger.info("ACLP_TEST_FINISH_0004")
692
693     def test_0005_vpp624_permit_icmpv6(self):
694         """ VPP_624 permit ICMPv6
695         """
696         self.logger.info("ACLP_TEST_START_0005")
697
698         # Add an ACL
699         rules = []
700         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
701                                       self.proto[self.ICMP][self.ICMPv6]))
702         # deny ip any any in the end
703         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
704
705         # Apply rules
706         self.apply_rules(rules, "permit icmpv6")
707
708         # Traffic should still pass
709         self.run_verify_test(self.ICMP, self.IPV6,
710                              self.proto[self.ICMP][self.ICMPv6])
711
712         self.logger.info("ACLP_TEST_FINISH_0005")
713
714     def test_0006_vpp624_deny_icmpv4(self):
715         """ VPP_624 deny ICMPv4
716         """
717         self.logger.info("ACLP_TEST_START_0006")
718         # Add an ACL
719         rules = []
720         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
721                                       self.proto[self.ICMP][self.ICMPv4]))
722         # permit ip any any in the end
723         rules.append(self.create_rule(self.IPV4, self.PERMIT,
724                                       self.PORTS_ALL, 0))
725
726         # Apply rules
727         self.apply_rules(rules, "deny icmpv4")
728
729         # Traffic should not pass
730         self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
731
732         self.logger.info("ACLP_TEST_FINISH_0006")
733
734     def test_0007_vpp624_deny_icmpv6(self):
735         """ VPP_624 deny ICMPv6
736         """
737         self.logger.info("ACLP_TEST_START_0007")
738         # Add an ACL
739         rules = []
740         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
741                                       self.proto[self.ICMP][self.ICMPv6]))
742         # deny ip any any in the end
743         rules.append(self.create_rule(self.IPV6, self.PERMIT,
744                                       self.PORTS_ALL, 0))
745
746         # Apply rules
747         self.apply_rules(rules, "deny icmpv6")
748
749         # Traffic should not pass
750         self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
751
752         self.logger.info("ACLP_TEST_FINISH_0007")
753
754     def test_0008_tcp_permit_v4(self):
755         """ permit TCPv4
756         """
757         self.logger.info("ACLP_TEST_START_0008")
758
759         # Add an ACL
760         rules = []
761         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
762                      self.proto[self.IP][self.TCP]))
763         # deny ip any any in the end
764         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
765
766         # Apply rules
767         self.apply_rules(rules, "permit ipv4 tcp")
768
769         # Traffic should still pass
770         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
771
772         self.logger.info("ACLP_TEST_FINISH_0008")
773
774     def test_0009_tcp_permit_v6(self):
775         """ permit TCPv6
776         """
777         self.logger.info("ACLP_TEST_START_0009")
778
779         # Add an ACL
780         rules = []
781         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
782                                       self.proto[self.IP][self.TCP]))
783         # deny ip any any in the end
784         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
785
786         # Apply rules
787         self.apply_rules(rules, "permit ip6 tcp")
788
789         # Traffic should still pass
790         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
791
792         self.logger.info("ACLP_TEST_FINISH_0008")
793
794     def test_0010_udp_permit_v4(self):
795         """ permit UDPv4
796         """
797         self.logger.info("ACLP_TEST_START_0010")
798
799         # Add an ACL
800         rules = []
801         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
802                                       self.proto[self.IP][self.UDP]))
803         # deny ip any any in the end
804         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
805
806         # Apply rules
807         self.apply_rules(rules, "permit ipv udp")
808
809         # Traffic should still pass
810         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
811
812         self.logger.info("ACLP_TEST_FINISH_0010")
813
814     def test_0011_udp_permit_v6(self):
815         """ permit UDPv6
816         """
817         self.logger.info("ACLP_TEST_START_0011")
818
819         # Add an ACL
820         rules = []
821         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
822                                       self.proto[self.IP][self.UDP]))
823         # deny ip any any in the end
824         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
825
826         # Apply rules
827         self.apply_rules(rules, "permit ip6 udp")
828
829         # Traffic should still pass
830         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
831
832         self.logger.info("ACLP_TEST_FINISH_0011")
833
834     def test_0012_tcp_deny(self):
835         """ deny TCPv4/v6
836         """
837         self.logger.info("ACLP_TEST_START_0012")
838
839         # Add an ACL
840         rules = []
841         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
842                                       self.proto[self.IP][self.TCP]))
843         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
844                                       self.proto[self.IP][self.TCP]))
845         # permit ip any any in the end
846         rules.append(self.create_rule(self.IPV4, self.PERMIT,
847                                       self.PORTS_ALL, 0))
848         rules.append(self.create_rule(self.IPV6, self.PERMIT,
849                                       self.PORTS_ALL, 0))
850
851         # Apply rules
852         self.apply_rules(rules, "deny ip4/ip6 tcp")
853
854         # Traffic should not pass
855         self.run_verify_negat_test(self.IP, self.IPRANDOM,
856                                    self.proto[self.IP][self.TCP])
857
858         self.logger.info("ACLP_TEST_FINISH_0012")
859
860     def test_0013_udp_deny(self):
861         """ deny UDPv4/v6
862         """
863         self.logger.info("ACLP_TEST_START_0013")
864
865         # Add an ACL
866         rules = []
867         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
868                                       self.proto[self.IP][self.UDP]))
869         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
870                                       self.proto[self.IP][self.UDP]))
871         # permit ip any any in the end
872         rules.append(self.create_rule(self.IPV4, self.PERMIT,
873                                       self.PORTS_ALL, 0))
874         rules.append(self.create_rule(self.IPV6, self.PERMIT,
875                                       self.PORTS_ALL, 0))
876
877         # Apply rules
878         self.apply_rules(rules, "deny ip4/ip6 udp")
879
880         # Traffic should not pass
881         self.run_verify_negat_test(self.IP, self.IPRANDOM,
882                                    self.proto[self.IP][self.UDP])
883
884         self.logger.info("ACLP_TEST_FINISH_0013")
885
886     def test_0014_acl_dump(self):
887         """ verify add/dump acls
888         """
889         self.logger.info("ACLP_TEST_START_0014")
890
891         r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
892              [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
893              [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
894              [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
895              [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
896              [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
897              [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
898              [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
899              [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
900              [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
901              [self.IPV4, self.DENY, self.PORTS_ALL, 0],
902              [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
903              [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
904              [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
905              [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
906              [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
907              [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
908              [self.IPV6, self.DENY, self.PORTS_ALL, 0]
909              ]
910
911         # Add and verify new ACLs
912         rules = []
913         for i in range(len(r)):
914             rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
915
916         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules)
917         result = self.vapi.acl_dump(reply.acl_index)
918
919         i = 0
920         for drules in result:
921             for dr in drules.r:
922                 self.assertEqual(dr.is_ipv6, r[i][0])
923                 self.assertEqual(dr.is_permit, r[i][1])
924                 self.assertEqual(dr.proto, r[i][3])
925
926                 if r[i][2] > 0:
927                     self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
928                 else:
929                     if r[i][2] < 0:
930                         self.assertEqual(dr.srcport_or_icmptype_first, 0)
931                         self.assertEqual(dr.srcport_or_icmptype_last, 65535)
932                     else:
933                         if dr.proto == self.proto[self.IP][self.TCP]:
934                             self.assertGreater(dr.srcport_or_icmptype_first,
935                                                self.tcp_sport_from-1)
936                             self.assertLess(dr.srcport_or_icmptype_first,
937                                             self.tcp_sport_to+1)
938                             self.assertGreater(dr.dstport_or_icmpcode_last,
939                                                self.tcp_dport_from-1)
940                             self.assertLess(dr.dstport_or_icmpcode_last,
941                                             self.tcp_dport_to+1)
942                         elif dr.proto == self.proto[self.IP][self.UDP]:
943                             self.assertGreater(dr.srcport_or_icmptype_first,
944                                                self.udp_sport_from-1)
945                             self.assertLess(dr.srcport_or_icmptype_first,
946                                             self.udp_sport_to+1)
947                             self.assertGreater(dr.dstport_or_icmpcode_last,
948                                                self.udp_dport_from-1)
949                             self.assertLess(dr.dstport_or_icmpcode_last,
950                                             self.udp_dport_to+1)
951                 i += 1
952
953         self.logger.info("ACLP_TEST_FINISH_0014")
954
955     def test_0015_tcp_permit_port_v4(self):
956         """ permit single TCPv4
957         """
958         self.logger.info("ACLP_TEST_START_0015")
959
960         port = random.randint(0, 65535)
961         # Add an ACL
962         rules = []
963         rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
964                                       self.proto[self.IP][self.TCP]))
965         # deny ip any any in the end
966         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
967
968         # Apply rules
969         self.apply_rules(rules, "permit ip4 tcp "+str(port))
970
971         # Traffic should still pass
972         self.run_verify_test(self.IP, self.IPV4,
973                              self.proto[self.IP][self.TCP], port)
974
975         self.logger.info("ACLP_TEST_FINISH_0015")
976
977     def test_0016_udp_permit_port_v4(self):
978         """ permit single UDPv4
979         """
980         self.logger.info("ACLP_TEST_START_0016")
981
982         port = random.randint(0, 65535)
983         # Add an ACL
984         rules = []
985         rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
986                                       self.proto[self.IP][self.UDP]))
987         # deny ip any any in the end
988         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
989
990         # Apply rules
991         self.apply_rules(rules, "permit ip4 tcp "+str(port))
992
993         # Traffic should still pass
994         self.run_verify_test(self.IP, self.IPV4,
995                              self.proto[self.IP][self.UDP], port)
996
997         self.logger.info("ACLP_TEST_FINISH_0016")
998
999     def test_0017_tcp_permit_port_v6(self):
1000         """ permit single TCPv6
1001         """
1002         self.logger.info("ACLP_TEST_START_0017")
1003
1004         port = random.randint(0, 65535)
1005         # Add an ACL
1006         rules = []
1007         rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1008                                       self.proto[self.IP][self.TCP]))
1009         # deny ip any any in the end
1010         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1011
1012         # Apply rules
1013         self.apply_rules(rules, "permit ip4 tcp "+str(port))
1014
1015         # Traffic should still pass
1016         self.run_verify_test(self.IP, self.IPV6,
1017                              self.proto[self.IP][self.TCP], port)
1018
1019         self.logger.info("ACLP_TEST_FINISH_0017")
1020
1021     def test_0018_udp_permit_port_v6(self):
1022         """ permit single UPPv6
1023         """
1024         self.logger.info("ACLP_TEST_START_0018")
1025
1026         port = random.randint(0, 65535)
1027         # Add an ACL
1028         rules = []
1029         rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1030                                       self.proto[self.IP][self.UDP]))
1031         # deny ip any any in the end
1032         rules.append(self.create_rule(self.IPV6, self.DENY,
1033                                       self.PORTS_ALL, 0))
1034
1035         # Apply rules
1036         self.apply_rules(rules, "permit ip4 tcp "+str(port))
1037
1038         # Traffic should still pass
1039         self.run_verify_test(self.IP, self.IPV6,
1040                              self.proto[self.IP][self.UDP], port)
1041
1042         self.logger.info("ACLP_TEST_FINISH_0018")
1043
1044     def test_0019_udp_deny_port(self):
1045         """ deny single TCPv4/v6
1046         """
1047         self.logger.info("ACLP_TEST_START_0019")
1048
1049         port = random.randint(0, 65535)
1050         # Add an ACL
1051         rules = []
1052         rules.append(self.create_rule(self.IPV4, self.DENY, port,
1053                                       self.proto[self.IP][self.TCP]))
1054         rules.append(self.create_rule(self.IPV6, self.DENY, port,
1055                                       self.proto[self.IP][self.TCP]))
1056         # Permit ip any any in the end
1057         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1058                                       self.PORTS_ALL, 0))
1059         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1060                                       self.PORTS_ALL, 0))
1061
1062         # Apply rules
1063         self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1064
1065         # Traffic should not pass
1066         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1067                                    self.proto[self.IP][self.TCP], port)
1068
1069         self.logger.info("ACLP_TEST_FINISH_0019")
1070
1071     def test_0020_udp_deny_port(self):
1072         """ deny single UDPv4/v6
1073         """
1074         self.logger.info("ACLP_TEST_START_0020")
1075
1076         port = random.randint(0, 65535)
1077         # Add an ACL
1078         rules = []
1079         rules.append(self.create_rule(self.IPV4, self.DENY, port,
1080                                       self.proto[self.IP][self.UDP]))
1081         rules.append(self.create_rule(self.IPV6, self.DENY, port,
1082                                       self.proto[self.IP][self.UDP]))
1083         # Permit ip any any in the end
1084         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1085                                       self.PORTS_ALL, 0))
1086         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1087                                       self.PORTS_ALL, 0))
1088
1089         # Apply rules
1090         self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1091
1092         # Traffic should not pass
1093         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1094                                    self.proto[self.IP][self.UDP], port)
1095
1096         self.logger.info("ACLP_TEST_FINISH_0020")
1097
1098     def test_0021_udp_deny_port_verify_fragment_deny(self):
1099         """ deny single UDPv4/v6, permit ip any, verify non-initial fragment
1100         blocked
1101         """
1102         self.logger.info("ACLP_TEST_START_0021")
1103
1104         port = random.randint(0, 65535)
1105         # Add an ACL
1106         rules = []
1107         rules.append(self.create_rule(self.IPV4, self.DENY, port,
1108                                       self.proto[self.IP][self.UDP]))
1109         rules.append(self.create_rule(self.IPV6, self.DENY, port,
1110                                       self.proto[self.IP][self.UDP]))
1111         # deny ip any any in the end
1112         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1113                                       self.PORTS_ALL, 0))
1114         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1115                                       self.PORTS_ALL, 0))
1116
1117         # Apply rules
1118         self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1119
1120         # Traffic should not pass
1121         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1122                                    self.proto[self.IP][self.UDP], port, True)
1123
1124         self.logger.info("ACLP_TEST_FINISH_0021")
1125
1126     def test_0022_zero_length_udp_ipv4(self):
1127         """ VPP-687 zero length udp ipv4 packet"""
1128         self.logger.info("ACLP_TEST_START_0022")
1129
1130         port = random.randint(0, 65535)
1131         # Add an ACL
1132         rules = []
1133         rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
1134                                       self.proto[self.IP][self.UDP]))
1135         # deny ip any any in the end
1136         rules.append(
1137             self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1138
1139         # Apply rules
1140         self.apply_rules(rules, "permit empty udp ip4 " + str(port))
1141
1142         # Traffic should still pass
1143         # Create incoming packet streams for packet-generator interfaces
1144         pkts_cnt = 0
1145         pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1146                                   self.IP, self.IPV4,
1147                                   self.proto[self.IP][self.UDP], port,
1148                                   False, False)
1149         if len(pkts) > 0:
1150             self.pg0.add_stream(pkts)
1151             pkts_cnt += len(pkts)
1152
1153         # Enable packet capture and start packet sendingself.IPV
1154         self.pg_enable_capture(self.pg_interfaces)
1155         self.pg_start()
1156
1157         self.pg1.get_capture(pkts_cnt)
1158
1159         self.logger.info("ACLP_TEST_FINISH_0022")
1160
1161     def test_0023_zero_length_udp_ipv6(self):
1162         """ VPP-687 zero length udp ipv6 packet"""
1163         self.logger.info("ACLP_TEST_START_0023")
1164
1165         port = random.randint(0, 65535)
1166         # Add an ACL
1167         rules = []
1168         rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1169                                       self.proto[self.IP][self.UDP]))
1170         # deny ip any any in the end
1171         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1172
1173         # Apply rules
1174         self.apply_rules(rules, "permit empty udp ip6 "+str(port))
1175
1176         # Traffic should still pass
1177         # Create incoming packet streams for packet-generator interfaces
1178         pkts_cnt = 0
1179         pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1180                                   self.IP, self.IPV6,
1181                                   self.proto[self.IP][self.UDP], port,
1182                                   False, False)
1183         if len(pkts) > 0:
1184             self.pg0.add_stream(pkts)
1185             pkts_cnt += len(pkts)
1186
1187         # Enable packet capture and start packet sendingself.IPV
1188         self.pg_enable_capture(self.pg_interfaces)
1189         self.pg_start()
1190
1191         # Verify outgoing packet streams per packet-generator interface
1192         self.pg1.get_capture(pkts_cnt)
1193
1194         self.logger.info("ACLP_TEST_FINISH_0023")
1195
1196     def test_0108_tcp_permit_v4(self):
1197         """ permit TCPv4 + non-match range
1198         """
1199         self.logger.info("ACLP_TEST_START_0108")
1200
1201         # Add an ACL
1202         rules = []
1203         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1204                      self.proto[self.IP][self.TCP]))
1205         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1206                      self.proto[self.IP][self.TCP]))
1207         # deny ip any any in the end
1208         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1209
1210         # Apply rules
1211         self.apply_rules(rules, "permit ipv4 tcp")
1212
1213         # Traffic should still pass
1214         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1215
1216         self.logger.info("ACLP_TEST_FINISH_0108")
1217
1218     def test_0109_tcp_permit_v6(self):
1219         """ permit TCPv6 + non-match range
1220         """
1221         self.logger.info("ACLP_TEST_START_0109")
1222
1223         # Add an ACL
1224         rules = []
1225         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1226                                       self.proto[self.IP][self.TCP]))
1227         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1228                                       self.proto[self.IP][self.TCP]))
1229         # deny ip any any in the end
1230         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1231
1232         # Apply rules
1233         self.apply_rules(rules, "permit ip6 tcp")
1234
1235         # Traffic should still pass
1236         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
1237
1238         self.logger.info("ACLP_TEST_FINISH_0109")
1239
1240     def test_0110_udp_permit_v4(self):
1241         """ permit UDPv4 + non-match range
1242         """
1243         self.logger.info("ACLP_TEST_START_0110")
1244
1245         # Add an ACL
1246         rules = []
1247         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1248                                       self.proto[self.IP][self.UDP]))
1249         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1250                                       self.proto[self.IP][self.UDP]))
1251         # deny ip any any in the end
1252         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1253
1254         # Apply rules
1255         self.apply_rules(rules, "permit ipv4 udp")
1256
1257         # Traffic should still pass
1258         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
1259
1260         self.logger.info("ACLP_TEST_FINISH_0110")
1261
1262     def test_0111_udp_permit_v6(self):
1263         """ permit UDPv6 + non-match range
1264         """
1265         self.logger.info("ACLP_TEST_START_0111")
1266
1267         # Add an ACL
1268         rules = []
1269         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1270                                       self.proto[self.IP][self.UDP]))
1271         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1272                                       self.proto[self.IP][self.UDP]))
1273         # deny ip any any in the end
1274         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1275
1276         # Apply rules
1277         self.apply_rules(rules, "permit ip6 udp")
1278
1279         # Traffic should still pass
1280         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
1281
1282         self.logger.info("ACLP_TEST_FINISH_0111")
1283
1284     def test_0112_tcp_deny(self):
1285         """ deny TCPv4/v6 + non-match range
1286         """
1287         self.logger.info("ACLP_TEST_START_0112")
1288
1289         # Add an ACL
1290         rules = []
1291         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1292                                       self.PORTS_RANGE_2,
1293                                       self.proto[self.IP][self.TCP]))
1294         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1295                                       self.PORTS_RANGE_2,
1296                                       self.proto[self.IP][self.TCP]))
1297         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1298                                       self.proto[self.IP][self.TCP]))
1299         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1300                                       self.proto[self.IP][self.TCP]))
1301         # permit ip any any in the end
1302         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1303                                       self.PORTS_ALL, 0))
1304         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1305                                       self.PORTS_ALL, 0))
1306
1307         # Apply rules
1308         self.apply_rules(rules, "deny ip4/ip6 tcp")
1309
1310         # Traffic should not pass
1311         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1312                                    self.proto[self.IP][self.TCP])
1313
1314         self.logger.info("ACLP_TEST_FINISH_0112")
1315
1316     def test_0113_udp_deny(self):
1317         """ deny UDPv4/v6 + non-match range
1318         """
1319         self.logger.info("ACLP_TEST_START_0113")
1320
1321         # Add an ACL
1322         rules = []
1323         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1324                                       self.PORTS_RANGE_2,
1325                                       self.proto[self.IP][self.UDP]))
1326         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1327                                       self.PORTS_RANGE_2,
1328                                       self.proto[self.IP][self.UDP]))
1329         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1330                                       self.proto[self.IP][self.UDP]))
1331         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1332                                       self.proto[self.IP][self.UDP]))
1333         # permit ip any any in the end
1334         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1335                                       self.PORTS_ALL, 0))
1336         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1337                                       self.PORTS_ALL, 0))
1338
1339         # Apply rules
1340         self.apply_rules(rules, "deny ip4/ip6 udp")
1341
1342         # Traffic should not pass
1343         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1344                                    self.proto[self.IP][self.UDP])
1345
1346         self.logger.info("ACLP_TEST_FINISH_0113")
1347
1348     def test_0300_tcp_permit_v4_etype_aaaa(self):
1349         """ permit TCPv4, send 0xAAAA etype
1350         """
1351         self.logger.info("ACLP_TEST_START_0300")
1352
1353         # Add an ACL
1354         rules = []
1355         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1356                      self.proto[self.IP][self.TCP]))
1357         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1358                      self.proto[self.IP][self.TCP]))
1359         # deny ip any any in the end
1360         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1361
1362         # Apply rules
1363         self.apply_rules(rules, "permit ipv4 tcp")
1364
1365         # Traffic should still pass also for an odd ethertype
1366         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1367                              0, False, True, 0xaaaa)
1368         self.logger.info("ACLP_TEST_FINISH_0300")
1369
1370     def test_0305_tcp_permit_v4_etype_blacklist_aaaa(self):
1371         """ permit TCPv4, whitelist 0x0BBB ethertype, send 0xAAAA-blocked
1372         """
1373         self.logger.info("ACLP_TEST_START_0305")
1374
1375         # Add an ACL
1376         rules = []
1377         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1378                      self.proto[self.IP][self.TCP]))
1379         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1380                      self.proto[self.IP][self.TCP]))
1381         # deny ip any any in the end
1382         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1383
1384         # Apply rules
1385         self.apply_rules(rules, "permit ipv4 tcp")
1386
1387         # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1388         self.etype_whitelist([0xbbb], 1)
1389
1390         # The oddball ethertype should be blocked
1391         self.run_verify_negat_test(self.IP, self.IPV4,
1392                                    self.proto[self.IP][self.TCP],
1393                                    0, False, 0xaaaa)
1394
1395         # remove the whitelist
1396         self.etype_whitelist([], 0)
1397
1398         self.logger.info("ACLP_TEST_FINISH_0305")
1399
1400     def test_0306_tcp_permit_v4_etype_blacklist_aaaa(self):
1401         """ permit TCPv4, whitelist 0x0BBB ethertype, send 0x0BBB - pass
1402         """
1403         self.logger.info("ACLP_TEST_START_0306")
1404
1405         # Add an ACL
1406         rules = []
1407         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1408                      self.proto[self.IP][self.TCP]))
1409         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1410                      self.proto[self.IP][self.TCP]))
1411         # deny ip any any in the end
1412         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1413
1414         # Apply rules
1415         self.apply_rules(rules, "permit ipv4 tcp")
1416
1417         # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1418         self.etype_whitelist([0xbbb], 1)
1419
1420         # The whitelisted traffic, should pass
1421         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1422                              0, False, True, 0x0bbb)
1423
1424         # remove the whitelist, the previously blocked 0xAAAA should pass now
1425         self.etype_whitelist([], 0)
1426
1427         self.logger.info("ACLP_TEST_FINISH_0306")
1428
1429     def test_0307_tcp_permit_v4_etype_blacklist_aaaa(self):
1430         """ permit TCPv4, whitelist 0x0BBB, remove, send 0xAAAA - pass
1431         """
1432         self.logger.info("ACLP_TEST_START_0307")
1433
1434         # Add an ACL
1435         rules = []
1436         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1437                      self.proto[self.IP][self.TCP]))
1438         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1439                      self.proto[self.IP][self.TCP]))
1440         # deny ip any any in the end
1441         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1442
1443         # Apply rules
1444         self.apply_rules(rules, "permit ipv4 tcp")
1445
1446         # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1447         self.etype_whitelist([0xbbb], 1)
1448         # remove the whitelist, the previously blocked 0xAAAA should pass now
1449         self.etype_whitelist([], 0)
1450
1451         # The whitelisted traffic, should pass
1452         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1453                              0, False, True, 0xaaaa)
1454
1455         self.logger.info("ACLP_TEST_FINISH_0306")
1456
1457     def test_0315_del_intf(self):
1458         """ apply an acl and delete the interface
1459         """
1460         self.logger.info("ACLP_TEST_START_0315")
1461
1462         # Add an ACL
1463         rules = []
1464         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1465                      self.proto[self.IP][self.TCP]))
1466         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1467                      self.proto[self.IP][self.TCP]))
1468         # deny ip any any in the end
1469         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1470
1471         # create an interface
1472         intf = []
1473         intf.append(VppLoInterface(self))
1474
1475         # Apply rules
1476         self.apply_rules_to(rules, "permit ipv4 tcp", intf[0].sw_if_index)
1477
1478         # Remove the interface
1479         intf[0].remove_vpp_config()
1480
1481         self.logger.info("ACLP_TEST_FINISH_0315")
1482
1483 if __name__ == '__main__':
1484     unittest.main(testRunner=VppTestRunner)