5fcf09c5ee75b3b915d558196397f43c81f20c18
[vpp.git] / test / test_acl_plugin.py
1 #!/usr/bin/env python
2 """ACL plugin Test Case HLD:
3 """
4
5 import unittest
6 import random
7
8 from scapy.packet import Raw
9 from scapy.layers.l2 import Ether
10 from scapy.layers.inet import IP, TCP, UDP, ICMP
11 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
12 from scapy.layers.inet6 import IPv6ExtHdrFragment
13 from framework import VppTestCase, VppTestRunner
14 from util import Host, ppp
15
16
17 class TestACLplugin(VppTestCase):
18     """ ACL plugin Test Case """
19
20     # traffic types
21     IP = 0
22     ICMP = 1
23
24     # IP version
25     IPRANDOM = -1
26     IPV4 = 0
27     IPV6 = 1
28
29     # rule types
30     DENY = 0
31     PERMIT = 1
32
33     # supported protocols
34     proto = [[6, 17], [1, 58]]
35     proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
36     ICMPv4 = 0
37     ICMPv6 = 1
38     TCP = 0
39     UDP = 1
40     PROTO_ALL = 0
41
42     # port ranges
43     PORTS_ALL = -1
44     PORTS_RANGE = 0
45     PORTS_RANGE_2 = 1
46     udp_sport_from = 10
47     udp_sport_to = udp_sport_from + 5
48     udp_dport_from = 20000
49     udp_dport_to = udp_dport_from + 5000
50     tcp_sport_from = 30
51     tcp_sport_to = tcp_sport_from + 5
52     tcp_dport_from = 40000
53     tcp_dport_to = tcp_dport_from + 5000
54
55     udp_sport_from_2 = 90
56     udp_sport_to_2 = udp_sport_from_2 + 5
57     udp_dport_from_2 = 30000
58     udp_dport_to_2 = udp_dport_from_2 + 5000
59     tcp_sport_from_2 = 130
60     tcp_sport_to_2 = tcp_sport_from_2 + 5
61     tcp_dport_from_2 = 20000
62     tcp_dport_to_2 = tcp_dport_from_2 + 5000
63
64     icmp4_type = 8  # echo request
65     icmp4_code = 3
66     icmp6_type = 128  # echo request
67     icmp6_code = 3
68
69     icmp4_type_2 = 8
70     icmp4_code_from_2 = 5
71     icmp4_code_to_2 = 20
72     icmp6_type_2 = 128
73     icmp6_code_from_2 = 8
74     icmp6_code_to_2 = 42
75
76     # Test variables
77     bd_id = 1
78
79     @classmethod
80     def setUpClass(cls):
81         """
82         Perform standard class setup (defined by class method setUpClass in
83         class VppTestCase) before running the test case, set test case related
84         variables and configure VPP.
85         """
86         super(TestACLplugin, cls).setUpClass()
87
88         try:
89             # Create 2 pg interfaces
90             cls.create_pg_interfaces(range(2))
91
92             # Packet flows mapping pg0 -> pg1, pg2 etc.
93             cls.flows = dict()
94             cls.flows[cls.pg0] = [cls.pg1]
95
96             # Packet sizes
97             cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
98
99             # Create BD with MAC learning and unknown unicast flooding disabled
100             # and put interfaces to this BD
101             cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
102                                            learn=1)
103             for pg_if in cls.pg_interfaces:
104                 cls.vapi.sw_interface_set_l2_bridge(pg_if.sw_if_index,
105                                                     bd_id=cls.bd_id)
106
107             # Set up all interfaces
108             for i in cls.pg_interfaces:
109                 i.admin_up()
110
111             # Mapping between packet-generator index and lists of test hosts
112             cls.hosts_by_pg_idx = dict()
113             for pg_if in cls.pg_interfaces:
114                 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
115
116             # Create list of deleted hosts
117             cls.deleted_hosts_by_pg_idx = dict()
118             for pg_if in cls.pg_interfaces:
119                 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
120
121             # warm-up the mac address tables
122             # self.warmup_test()
123
124         except Exception:
125             super(TestACLplugin, cls).tearDownClass()
126             raise
127
128     def setUp(self):
129         super(TestACLplugin, self).setUp()
130         self.reset_packet_infos()
131
132     def tearDown(self):
133         """
134         Show various debug prints after each test.
135         """
136         super(TestACLplugin, self).tearDown()
137         if not self.vpp_dead:
138             self.logger.info(self.vapi.ppcli("show l2fib verbose"))
139             self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
140             self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
141             self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
142             self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
143                                              % self.bd_id))
144
145     def create_hosts(self, count, start=0):
146         """
147         Create required number of host MAC addresses and distribute them among
148         interfaces. Create host IPv4 address for every host MAC address.
149
150         :param int count: Number of hosts to create MAC/IPv4 addresses for.
151         :param int start: Number to start numbering from.
152         """
153         n_int = len(self.pg_interfaces)
154         macs_per_if = count / n_int
155         i = -1
156         for pg_if in self.pg_interfaces:
157             i += 1
158             start_nr = macs_per_if * i + start
159             end_nr = count + start if i == (n_int - 1) \
160                 else macs_per_if * (i + 1) + start
161             hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
162             for j in range(start_nr, end_nr):
163                 host = Host(
164                     "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
165                     "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
166                     "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
167                 hosts.append(host)
168
169     def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
170                     s_prefix=0, s_ip='\x00\x00\x00\x00',
171                     d_prefix=0, d_ip='\x00\x00\x00\x00'):
172         if proto == -1:
173             return
174         if ports == self.PORTS_ALL:
175             sport_from = 0
176             dport_from = 0
177             sport_to = 65535 if proto != 1 and proto != 58 else 255
178             dport_to = sport_to
179         elif ports == self.PORTS_RANGE:
180             if proto == 1:
181                 sport_from = self.icmp4_type
182                 sport_to = self.icmp4_type
183                 dport_from = self.icmp4_code
184                 dport_to = self.icmp4_code
185             elif proto == 58:
186                 sport_from = self.icmp6_type
187                 sport_to = self.icmp6_type
188                 dport_from = self.icmp6_code
189                 dport_to = self.icmp6_code
190             elif proto == self.proto[self.IP][self.TCP]:
191                 sport_from = self.tcp_sport_from
192                 sport_to = self.tcp_sport_to
193                 dport_from = self.tcp_dport_from
194                 dport_to = self.tcp_dport_to
195             elif proto == self.proto[self.IP][self.UDP]:
196                 sport_from = self.udp_sport_from
197                 sport_to = self.udp_sport_to
198                 dport_from = self.udp_dport_from
199                 dport_to = self.udp_dport_to
200         elif ports == self.PORTS_RANGE_2:
201             if proto == 1:
202                 sport_from = self.icmp4_type_2
203                 sport_to = self.icmp4_type_2
204                 dport_from = self.icmp4_code_from_2
205                 dport_to = self.icmp4_code_to_2
206             elif proto == 58:
207                 sport_from = self.icmp6_type_2
208                 sport_to = self.icmp6_type_2
209                 dport_from = self.icmp6_code_from_2
210                 dport_to = self.icmp6_code_to_2
211             elif proto == self.proto[self.IP][self.TCP]:
212                 sport_from = self.tcp_sport_from_2
213                 sport_to = self.tcp_sport_to_2
214                 dport_from = self.tcp_dport_from_2
215                 dport_to = self.tcp_dport_to_2
216             elif proto == self.proto[self.IP][self.UDP]:
217                 sport_from = self.udp_sport_from_2
218                 sport_to = self.udp_sport_to_2
219                 dport_from = self.udp_dport_from_2
220                 dport_to = self.udp_dport_to_2
221         else:
222             sport_from = ports
223             sport_to = ports
224             dport_from = ports
225             dport_to = ports
226
227         rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto,
228                  'srcport_or_icmptype_first': sport_from,
229                  'srcport_or_icmptype_last': sport_to,
230                  'src_ip_prefix_len': s_prefix,
231                  'src_ip_addr': s_ip,
232                  'dstport_or_icmpcode_first': dport_from,
233                  'dstport_or_icmpcode_last': dport_to,
234                  'dst_ip_prefix_len': d_prefix,
235                  'dst_ip_addr': d_ip})
236         return rule
237
238     def apply_rules(self, rules, tag=''):
239         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
240                                           tag=tag)
241         self.logger.info("Dumped ACL: " + str(
242             self.vapi.acl_dump(reply.acl_index)))
243         # Apply a ACL on the interface as inbound
244         for i in self.pg_interfaces:
245             self.vapi.acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
246                                                  n_input=1,
247                                                  acls=[reply.acl_index])
248         return
249
250     def etype_whitelist(self, whitelist, n_input):
251         # Apply whitelists on all the interfaces
252         for i in self.pg_interfaces:
253             # checkstyle can't read long names. Help them.
254             fun = self.vapi.acl_interface_set_etype_whitelist
255             fun(sw_if_index=i.sw_if_index, n_input=n_input,
256                 whitelist=whitelist)
257         return
258
259     def create_upper_layer(self, packet_index, proto, ports=0):
260         p = self.proto_map[proto]
261         if p == 'UDP':
262             if ports == 0:
263                 return UDP(sport=random.randint(self.udp_sport_from,
264                                                 self.udp_sport_to),
265                            dport=random.randint(self.udp_dport_from,
266                                                 self.udp_dport_to))
267             else:
268                 return UDP(sport=ports, dport=ports)
269         elif p == 'TCP':
270             if ports == 0:
271                 return TCP(sport=random.randint(self.tcp_sport_from,
272                                                 self.tcp_sport_to),
273                            dport=random.randint(self.tcp_dport_from,
274                                                 self.tcp_dport_to))
275             else:
276                 return TCP(sport=ports, dport=ports)
277         return ''
278
279     def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
280                       proto=-1, ports=0, fragments=False,
281                       pkt_raw=True, etype=-1):
282         """
283         Create input packet stream for defined interface using hosts or
284         deleted_hosts list.
285
286         :param object src_if: Interface to create packet stream for.
287         :param list packet_sizes: List of required packet sizes.
288         :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
289         :return: Stream of packets.
290         """
291         pkts = []
292         if self.flows.__contains__(src_if):
293             src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
294             for dst_if in self.flows[src_if]:
295                 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
296                 n_int = len(dst_hosts) * len(src_hosts)
297                 for i in range(0, n_int):
298                     dst_host = dst_hosts[i / len(src_hosts)]
299                     src_host = src_hosts[i % len(src_hosts)]
300                     pkt_info = self.create_packet_info(src_if, dst_if)
301                     if ipv6 == 1:
302                         pkt_info.ip = 1
303                     elif ipv6 == 0:
304                         pkt_info.ip = 0
305                     else:
306                         pkt_info.ip = random.choice([0, 1])
307                     if proto == -1:
308                         pkt_info.proto = random.choice(self.proto[self.IP])
309                     else:
310                         pkt_info.proto = proto
311                     payload = self.info_to_payload(pkt_info)
312                     p = Ether(dst=dst_host.mac, src=src_host.mac)
313                     if etype > 0:
314                         p = Ether(dst=dst_host.mac,
315                                   src=src_host.mac,
316                                   type=etype)
317                     if pkt_info.ip:
318                         p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
319                         if fragments:
320                             p /= IPv6ExtHdrFragment(offset=64, m=1)
321                     else:
322                         if fragments:
323                             p /= IP(src=src_host.ip4, dst=dst_host.ip4,
324                                     flags=1, frag=64)
325                         else:
326                             p /= IP(src=src_host.ip4, dst=dst_host.ip4)
327                     if traffic_type == self.ICMP:
328                         if pkt_info.ip:
329                             p /= ICMPv6EchoRequest(type=self.icmp6_type,
330                                                    code=self.icmp6_code)
331                         else:
332                             p /= ICMP(type=self.icmp4_type,
333                                       code=self.icmp4_code)
334                     else:
335                         p /= self.create_upper_layer(i, pkt_info.proto, ports)
336                     if pkt_raw:
337                         p /= Raw(payload)
338                         pkt_info.data = p.copy()
339                     if pkt_raw:
340                         size = random.choice(packet_sizes)
341                         self.extend_packet(p, size)
342                     pkts.append(p)
343         return pkts
344
345     def verify_capture(self, pg_if, capture,
346                        traffic_type=0, ip_type=0, etype=-1):
347         """
348         Verify captured input packet stream for defined interface.
349
350         :param object pg_if: Interface to verify captured packet stream for.
351         :param list capture: Captured packet stream.
352         :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
353         """
354         last_info = dict()
355         for i in self.pg_interfaces:
356             last_info[i.sw_if_index] = None
357         dst_sw_if_index = pg_if.sw_if_index
358         for packet in capture:
359             if etype > 0:
360                 if packet[Ether].type != etype:
361                     self.logger.error(ppp("Unexpected ethertype in packet:",
362                                           packet))
363                 else:
364                     continue
365             try:
366                 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
367                 if traffic_type == self.ICMP and ip_type == self.IPV6:
368                     payload_info = self.payload_to_info(
369                         packet[ICMPv6EchoRequest].data)
370                     payload = packet[ICMPv6EchoRequest]
371                 else:
372                     payload_info = self.payload_to_info(str(packet[Raw]))
373                     payload = packet[self.proto_map[payload_info.proto]]
374             except:
375                 self.logger.error(ppp("Unexpected or invalid packet "
376                                       "(outside network):", packet))
377                 raise
378
379             if ip_type != 0:
380                 self.assertEqual(payload_info.ip, ip_type)
381             if traffic_type == self.ICMP:
382                 try:
383                     if payload_info.ip == 0:
384                         self.assertEqual(payload.type, self.icmp4_type)
385                         self.assertEqual(payload.code, self.icmp4_code)
386                     else:
387                         self.assertEqual(payload.type, self.icmp6_type)
388                         self.assertEqual(payload.code, self.icmp6_code)
389                 except:
390                     self.logger.error(ppp("Unexpected or invalid packet "
391                                           "(outside network):", packet))
392                     raise
393             else:
394                 try:
395                     ip_version = IPv6 if payload_info.ip == 1 else IP
396
397                     ip = packet[ip_version]
398                     packet_index = payload_info.index
399
400                     self.assertEqual(payload_info.dst, dst_sw_if_index)
401                     self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
402                                       (pg_if.name, payload_info.src,
403                                        packet_index))
404                     next_info = self.get_next_packet_info_for_interface2(
405                         payload_info.src, dst_sw_if_index,
406                         last_info[payload_info.src])
407                     last_info[payload_info.src] = next_info
408                     self.assertTrue(next_info is not None)
409                     self.assertEqual(packet_index, next_info.index)
410                     saved_packet = next_info.data
411                     # Check standard fields
412                     self.assertEqual(ip.src, saved_packet[ip_version].src)
413                     self.assertEqual(ip.dst, saved_packet[ip_version].dst)
414                     p = self.proto_map[payload_info.proto]
415                     if p == 'TCP':
416                         tcp = packet[TCP]
417                         self.assertEqual(tcp.sport, saved_packet[
418                             TCP].sport)
419                         self.assertEqual(tcp.dport, saved_packet[
420                             TCP].dport)
421                     elif p == 'UDP':
422                         udp = packet[UDP]
423                         self.assertEqual(udp.sport, saved_packet[
424                             UDP].sport)
425                         self.assertEqual(udp.dport, saved_packet[
426                             UDP].dport)
427                 except:
428                     self.logger.error(ppp("Unexpected or invalid packet:",
429                                           packet))
430                     raise
431         for i in self.pg_interfaces:
432             remaining_packet = self.get_next_packet_info_for_interface2(
433                 i, dst_sw_if_index, last_info[i.sw_if_index])
434             self.assertTrue(
435                 remaining_packet is None,
436                 "Port %u: Packet expected from source %u didn't arrive" %
437                 (dst_sw_if_index, i.sw_if_index))
438
439     def run_traffic_no_check(self):
440         # Test
441         # Create incoming packet streams for packet-generator interfaces
442         for i in self.pg_interfaces:
443             if self.flows.__contains__(i):
444                 pkts = self.create_stream(i, self.pg_if_packet_sizes)
445                 if len(pkts) > 0:
446                     i.add_stream(pkts)
447
448         # Enable packet capture and start packet sending
449         self.pg_enable_capture(self.pg_interfaces)
450         self.pg_start()
451
452     def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
453                         frags=False, pkt_raw=True, etype=-1):
454         # Test
455         # Create incoming packet streams for packet-generator interfaces
456         pkts_cnt = 0
457         for i in self.pg_interfaces:
458             if self.flows.__contains__(i):
459                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
460                                           traffic_type, ip_type, proto, ports,
461                                           frags, pkt_raw, etype)
462                 if len(pkts) > 0:
463                     i.add_stream(pkts)
464                     pkts_cnt += len(pkts)
465
466         # Enable packet capture and start packet sendingself.IPV
467         self.pg_enable_capture(self.pg_interfaces)
468         self.pg_start()
469
470         # Verify
471         # Verify outgoing packet streams per packet-generator interface
472         for src_if in self.pg_interfaces:
473             if self.flows.__contains__(src_if):
474                 for dst_if in self.flows[src_if]:
475                     capture = dst_if.get_capture(pkts_cnt)
476                     self.logger.info("Verifying capture on interface %s" %
477                                      dst_if.name)
478                     self.verify_capture(dst_if, capture,
479                                         traffic_type, ip_type, etype)
480
481     def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
482                               ports=0, frags=False, etype=-1):
483         # Test
484         self.reset_packet_infos()
485         for i in self.pg_interfaces:
486             if self.flows.__contains__(i):
487                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
488                                           traffic_type, ip_type, proto, ports,
489                                           frags, True, etype)
490                 if len(pkts) > 0:
491                     i.add_stream(pkts)
492
493         # Enable packet capture and start packet sending
494         self.pg_enable_capture(self.pg_interfaces)
495         self.pg_start()
496
497         # Verify
498         # Verify outgoing packet streams per packet-generator interface
499         for src_if in self.pg_interfaces:
500             if self.flows.__contains__(src_if):
501                 for dst_if in self.flows[src_if]:
502                     self.logger.info("Verifying capture on interface %s" %
503                                      dst_if.name)
504                     capture = dst_if.get_capture(0)
505                     self.assertEqual(len(capture), 0)
506
507     def test_0000_warmup_test(self):
508         """ ACL plugin version check; learn MACs
509         """
510         self.create_hosts(16)
511         self.run_traffic_no_check()
512         reply = self.vapi.papi.acl_plugin_get_version()
513         self.assertEqual(reply.major, 1)
514         self.logger.info("Working with ACL plugin version: %d.%d" % (
515             reply.major, reply.minor))
516         # minor version changes are non breaking
517         # self.assertEqual(reply.minor, 0)
518
519     def test_0001_acl_create(self):
520         """ ACL create/delete test
521         """
522
523         self.logger.info("ACLP_TEST_START_0001")
524         # Add an ACL
525         r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
526               'srcport_or_icmptype_first': 1234,
527               'srcport_or_icmptype_last': 1235,
528               'src_ip_prefix_len': 0,
529               'src_ip_addr': '\x00\x00\x00\x00',
530               'dstport_or_icmpcode_first': 1234,
531               'dstport_or_icmpcode_last': 1234,
532               'dst_ip_addr': '\x00\x00\x00\x00',
533               'dst_ip_prefix_len': 0}]
534         # Test 1: add a new ACL
535         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r,
536                                           tag="permit 1234")
537         self.assertEqual(reply.retval, 0)
538         # The very first ACL gets #0
539         self.assertEqual(reply.acl_index, 0)
540         first_acl = reply.acl_index
541         rr = self.vapi.acl_dump(reply.acl_index)
542         self.logger.info("Dumped ACL: " + str(rr))
543         self.assertEqual(len(rr), 1)
544         # We should have the same number of ACL entries as we had asked
545         self.assertEqual(len(rr[0].r), len(r))
546         # The rules should be the same. But because the submitted and returned
547         # are different types, we need to iterate over rules and keys to get
548         # to basic values.
549         for i_rule in range(0, len(r) - 1):
550             for rule_key in r[i_rule]:
551                 self.assertEqual(rr[0].r[i_rule][rule_key],
552                                  r[i_rule][rule_key])
553
554         # Add a deny-1234 ACL
555         r_deny = [{'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
556                    'srcport_or_icmptype_first': 1234,
557                    'srcport_or_icmptype_last': 1235,
558                    'src_ip_prefix_len': 0,
559                    'src_ip_addr': '\x00\x00\x00\x00',
560                    'dstport_or_icmpcode_first': 1234,
561                    'dstport_or_icmpcode_last': 1234,
562                    'dst_ip_addr': '\x00\x00\x00\x00',
563                    'dst_ip_prefix_len': 0},
564                   {'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
565                    'srcport_or_icmptype_first': 0,
566                    'srcport_or_icmptype_last': 0,
567                    'src_ip_prefix_len': 0,
568                    'src_ip_addr': '\x00\x00\x00\x00',
569                    'dstport_or_icmpcode_first': 0,
570                    'dstport_or_icmpcode_last': 0,
571                    'dst_ip_addr': '\x00\x00\x00\x00',
572                    'dst_ip_prefix_len': 0}]
573
574         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_deny,
575                                           tag="deny 1234;permit all")
576         self.assertEqual(reply.retval, 0)
577         # The second ACL gets #1
578         self.assertEqual(reply.acl_index, 1)
579         second_acl = reply.acl_index
580
581         # Test 2: try to modify a nonexistent ACL
582         reply = self.vapi.acl_add_replace(acl_index=432, r=r,
583                                           tag="FFFF:FFFF", expected_retval=-6)
584         self.assertEqual(reply.retval, -6)
585         # The ACL number should pass through
586         self.assertEqual(reply.acl_index, 432)
587         # apply an ACL on an interface inbound, try to delete ACL, must fail
588         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
589                                              n_input=1,
590                                              acls=[first_acl])
591         reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=-142)
592         # Unapply an ACL and then try to delete it - must be ok
593         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
594                                              n_input=0,
595                                              acls=[])
596         reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=0)
597
598         # apply an ACL on an interface outbound, try to delete ACL, must fail
599         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
600                                              n_input=0,
601                                              acls=[second_acl])
602         reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=-143)
603         # Unapply the ACL and then try to delete it - must be ok
604         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
605                                              n_input=0,
606                                              acls=[])
607         reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=0)
608
609         # try to apply a nonexistent ACL - must fail
610         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
611                                              n_input=1,
612                                              acls=[first_acl],
613                                              expected_retval=-6)
614
615         self.logger.info("ACLP_TEST_FINISH_0001")
616
617     def test_0002_acl_permit_apply(self):
618         """ permit ACL apply test
619         """
620         self.logger.info("ACLP_TEST_START_0002")
621
622         rules = []
623         rules.append(self.create_rule(self.IPV4, self.PERMIT,
624                      0, self.proto[self.IP][self.UDP]))
625         rules.append(self.create_rule(self.IPV4, self.PERMIT,
626                      0, self.proto[self.IP][self.TCP]))
627
628         # Apply rules
629         self.apply_rules(rules, "permit per-flow")
630
631         # Traffic should still pass
632         self.run_verify_test(self.IP, self.IPV4, -1)
633         self.logger.info("ACLP_TEST_FINISH_0002")
634
635     def test_0003_acl_deny_apply(self):
636         """ deny ACL apply test
637         """
638         self.logger.info("ACLP_TEST_START_0003")
639         # Add a deny-flows ACL
640         rules = []
641         rules.append(self.create_rule(self.IPV4, self.DENY,
642                      self.PORTS_ALL, self.proto[self.IP][self.UDP]))
643         # Permit ip any any in the end
644         rules.append(self.create_rule(self.IPV4, self.PERMIT,
645                                       self.PORTS_ALL, 0))
646
647         # Apply rules
648         self.apply_rules(rules, "deny per-flow;permit all")
649
650         # Traffic should not pass
651         self.run_verify_negat_test(self.IP, self.IPV4,
652                                    self.proto[self.IP][self.UDP])
653         self.logger.info("ACLP_TEST_FINISH_0003")
654         # self.assertEqual(1, 0)
655
656     def test_0004_vpp624_permit_icmpv4(self):
657         """ VPP_624 permit ICMPv4
658         """
659         self.logger.info("ACLP_TEST_START_0004")
660
661         # Add an ACL
662         rules = []
663         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
664                                       self.proto[self.ICMP][self.ICMPv4]))
665         # deny ip any any in the end
666         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
667
668         # Apply rules
669         self.apply_rules(rules, "permit icmpv4")
670
671         # Traffic should still pass
672         self.run_verify_test(self.ICMP, self.IPV4,
673                              self.proto[self.ICMP][self.ICMPv4])
674
675         self.logger.info("ACLP_TEST_FINISH_0004")
676
677     def test_0005_vpp624_permit_icmpv6(self):
678         """ VPP_624 permit ICMPv6
679         """
680         self.logger.info("ACLP_TEST_START_0005")
681
682         # Add an ACL
683         rules = []
684         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
685                                       self.proto[self.ICMP][self.ICMPv6]))
686         # deny ip any any in the end
687         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
688
689         # Apply rules
690         self.apply_rules(rules, "permit icmpv6")
691
692         # Traffic should still pass
693         self.run_verify_test(self.ICMP, self.IPV6,
694                              self.proto[self.ICMP][self.ICMPv6])
695
696         self.logger.info("ACLP_TEST_FINISH_0005")
697
698     def test_0006_vpp624_deny_icmpv4(self):
699         """ VPP_624 deny ICMPv4
700         """
701         self.logger.info("ACLP_TEST_START_0006")
702         # Add an ACL
703         rules = []
704         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
705                                       self.proto[self.ICMP][self.ICMPv4]))
706         # permit ip any any in the end
707         rules.append(self.create_rule(self.IPV4, self.PERMIT,
708                                       self.PORTS_ALL, 0))
709
710         # Apply rules
711         self.apply_rules(rules, "deny icmpv4")
712
713         # Traffic should not pass
714         self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
715
716         self.logger.info("ACLP_TEST_FINISH_0006")
717
718     def test_0007_vpp624_deny_icmpv6(self):
719         """ VPP_624 deny ICMPv6
720         """
721         self.logger.info("ACLP_TEST_START_0007")
722         # Add an ACL
723         rules = []
724         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
725                                       self.proto[self.ICMP][self.ICMPv6]))
726         # deny ip any any in the end
727         rules.append(self.create_rule(self.IPV6, self.PERMIT,
728                                       self.PORTS_ALL, 0))
729
730         # Apply rules
731         self.apply_rules(rules, "deny icmpv6")
732
733         # Traffic should not pass
734         self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
735
736         self.logger.info("ACLP_TEST_FINISH_0007")
737
738     def test_0008_tcp_permit_v4(self):
739         """ permit TCPv4
740         """
741         self.logger.info("ACLP_TEST_START_0008")
742
743         # Add an ACL
744         rules = []
745         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
746                      self.proto[self.IP][self.TCP]))
747         # deny ip any any in the end
748         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
749
750         # Apply rules
751         self.apply_rules(rules, "permit ipv4 tcp")
752
753         # Traffic should still pass
754         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
755
756         self.logger.info("ACLP_TEST_FINISH_0008")
757
758     def test_0009_tcp_permit_v6(self):
759         """ permit TCPv6
760         """
761         self.logger.info("ACLP_TEST_START_0009")
762
763         # Add an ACL
764         rules = []
765         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
766                                       self.proto[self.IP][self.TCP]))
767         # deny ip any any in the end
768         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
769
770         # Apply rules
771         self.apply_rules(rules, "permit ip6 tcp")
772
773         # Traffic should still pass
774         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
775
776         self.logger.info("ACLP_TEST_FINISH_0008")
777
778     def test_0010_udp_permit_v4(self):
779         """ permit UDPv4
780         """
781         self.logger.info("ACLP_TEST_START_0010")
782
783         # Add an ACL
784         rules = []
785         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
786                                       self.proto[self.IP][self.UDP]))
787         # deny ip any any in the end
788         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
789
790         # Apply rules
791         self.apply_rules(rules, "permit ipv udp")
792
793         # Traffic should still pass
794         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
795
796         self.logger.info("ACLP_TEST_FINISH_0010")
797
798     def test_0011_udp_permit_v6(self):
799         """ permit UDPv6
800         """
801         self.logger.info("ACLP_TEST_START_0011")
802
803         # Add an ACL
804         rules = []
805         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
806                                       self.proto[self.IP][self.UDP]))
807         # deny ip any any in the end
808         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
809
810         # Apply rules
811         self.apply_rules(rules, "permit ip6 udp")
812
813         # Traffic should still pass
814         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
815
816         self.logger.info("ACLP_TEST_FINISH_0011")
817
818     def test_0012_tcp_deny(self):
819         """ deny TCPv4/v6
820         """
821         self.logger.info("ACLP_TEST_START_0012")
822
823         # Add an ACL
824         rules = []
825         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
826                                       self.proto[self.IP][self.TCP]))
827         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
828                                       self.proto[self.IP][self.TCP]))
829         # permit ip any any in the end
830         rules.append(self.create_rule(self.IPV4, self.PERMIT,
831                                       self.PORTS_ALL, 0))
832         rules.append(self.create_rule(self.IPV6, self.PERMIT,
833                                       self.PORTS_ALL, 0))
834
835         # Apply rules
836         self.apply_rules(rules, "deny ip4/ip6 tcp")
837
838         # Traffic should not pass
839         self.run_verify_negat_test(self.IP, self.IPRANDOM,
840                                    self.proto[self.IP][self.TCP])
841
842         self.logger.info("ACLP_TEST_FINISH_0012")
843
844     def test_0013_udp_deny(self):
845         """ deny UDPv4/v6
846         """
847         self.logger.info("ACLP_TEST_START_0013")
848
849         # Add an ACL
850         rules = []
851         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
852                                       self.proto[self.IP][self.UDP]))
853         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
854                                       self.proto[self.IP][self.UDP]))
855         # permit ip any any in the end
856         rules.append(self.create_rule(self.IPV4, self.PERMIT,
857                                       self.PORTS_ALL, 0))
858         rules.append(self.create_rule(self.IPV6, self.PERMIT,
859                                       self.PORTS_ALL, 0))
860
861         # Apply rules
862         self.apply_rules(rules, "deny ip4/ip6 udp")
863
864         # Traffic should not pass
865         self.run_verify_negat_test(self.IP, self.IPRANDOM,
866                                    self.proto[self.IP][self.UDP])
867
868         self.logger.info("ACLP_TEST_FINISH_0013")
869
870     def test_0014_acl_dump(self):
871         """ verify add/dump acls
872         """
873         self.logger.info("ACLP_TEST_START_0014")
874
875         r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
876              [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
877              [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
878              [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
879              [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
880              [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
881              [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
882              [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
883              [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
884              [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
885              [self.IPV4, self.DENY, self.PORTS_ALL, 0],
886              [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
887              [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
888              [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
889              [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
890              [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
891              [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
892              [self.IPV6, self.DENY, self.PORTS_ALL, 0]
893              ]
894
895         # Add and verify new ACLs
896         rules = []
897         for i in range(len(r)):
898             rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
899
900         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules)
901         result = self.vapi.acl_dump(reply.acl_index)
902
903         i = 0
904         for drules in result:
905             for dr in drules.r:
906                 self.assertEqual(dr.is_ipv6, r[i][0])
907                 self.assertEqual(dr.is_permit, r[i][1])
908                 self.assertEqual(dr.proto, r[i][3])
909
910                 if r[i][2] > 0:
911                     self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
912                 else:
913                     if r[i][2] < 0:
914                         self.assertEqual(dr.srcport_or_icmptype_first, 0)
915                         self.assertEqual(dr.srcport_or_icmptype_last, 65535)
916                     else:
917                         if dr.proto == self.proto[self.IP][self.TCP]:
918                             self.assertGreater(dr.srcport_or_icmptype_first,
919                                                self.tcp_sport_from-1)
920                             self.assertLess(dr.srcport_or_icmptype_first,
921                                             self.tcp_sport_to+1)
922                             self.assertGreater(dr.dstport_or_icmpcode_last,
923                                                self.tcp_dport_from-1)
924                             self.assertLess(dr.dstport_or_icmpcode_last,
925                                             self.tcp_dport_to+1)
926                         elif dr.proto == self.proto[self.IP][self.UDP]:
927                             self.assertGreater(dr.srcport_or_icmptype_first,
928                                                self.udp_sport_from-1)
929                             self.assertLess(dr.srcport_or_icmptype_first,
930                                             self.udp_sport_to+1)
931                             self.assertGreater(dr.dstport_or_icmpcode_last,
932                                                self.udp_dport_from-1)
933                             self.assertLess(dr.dstport_or_icmpcode_last,
934                                             self.udp_dport_to+1)
935                 i += 1
936
937         self.logger.info("ACLP_TEST_FINISH_0014")
938
939     def test_0015_tcp_permit_port_v4(self):
940         """ permit single TCPv4
941         """
942         self.logger.info("ACLP_TEST_START_0015")
943
944         port = random.randint(0, 65535)
945         # Add an ACL
946         rules = []
947         rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
948                                       self.proto[self.IP][self.TCP]))
949         # deny ip any any in the end
950         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
951
952         # Apply rules
953         self.apply_rules(rules, "permit ip4 tcp "+str(port))
954
955         # Traffic should still pass
956         self.run_verify_test(self.IP, self.IPV4,
957                              self.proto[self.IP][self.TCP], port)
958
959         self.logger.info("ACLP_TEST_FINISH_0015")
960
961     def test_0016_udp_permit_port_v4(self):
962         """ permit single UDPv4
963         """
964         self.logger.info("ACLP_TEST_START_0016")
965
966         port = random.randint(0, 65535)
967         # Add an ACL
968         rules = []
969         rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
970                                       self.proto[self.IP][self.UDP]))
971         # deny ip any any in the end
972         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
973
974         # Apply rules
975         self.apply_rules(rules, "permit ip4 tcp "+str(port))
976
977         # Traffic should still pass
978         self.run_verify_test(self.IP, self.IPV4,
979                              self.proto[self.IP][self.UDP], port)
980
981         self.logger.info("ACLP_TEST_FINISH_0016")
982
983     def test_0017_tcp_permit_port_v6(self):
984         """ permit single TCPv6
985         """
986         self.logger.info("ACLP_TEST_START_0017")
987
988         port = random.randint(0, 65535)
989         # Add an ACL
990         rules = []
991         rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
992                                       self.proto[self.IP][self.TCP]))
993         # deny ip any any in the end
994         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
995
996         # Apply rules
997         self.apply_rules(rules, "permit ip4 tcp "+str(port))
998
999         # Traffic should still pass
1000         self.run_verify_test(self.IP, self.IPV6,
1001                              self.proto[self.IP][self.TCP], port)
1002
1003         self.logger.info("ACLP_TEST_FINISH_0017")
1004
1005     def test_0018_udp_permit_port_v6(self):
1006         """ permit single UPPv6
1007         """
1008         self.logger.info("ACLP_TEST_START_0018")
1009
1010         port = random.randint(0, 65535)
1011         # Add an ACL
1012         rules = []
1013         rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1014                                       self.proto[self.IP][self.UDP]))
1015         # deny ip any any in the end
1016         rules.append(self.create_rule(self.IPV6, self.DENY,
1017                                       self.PORTS_ALL, 0))
1018
1019         # Apply rules
1020         self.apply_rules(rules, "permit ip4 tcp "+str(port))
1021
1022         # Traffic should still pass
1023         self.run_verify_test(self.IP, self.IPV6,
1024                              self.proto[self.IP][self.UDP], port)
1025
1026         self.logger.info("ACLP_TEST_FINISH_0018")
1027
1028     def test_0019_udp_deny_port(self):
1029         """ deny single TCPv4/v6
1030         """
1031         self.logger.info("ACLP_TEST_START_0019")
1032
1033         port = random.randint(0, 65535)
1034         # Add an ACL
1035         rules = []
1036         rules.append(self.create_rule(self.IPV4, self.DENY, port,
1037                                       self.proto[self.IP][self.TCP]))
1038         rules.append(self.create_rule(self.IPV6, self.DENY, port,
1039                                       self.proto[self.IP][self.TCP]))
1040         # Permit ip any any in the end
1041         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1042                                       self.PORTS_ALL, 0))
1043         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1044                                       self.PORTS_ALL, 0))
1045
1046         # Apply rules
1047         self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1048
1049         # Traffic should not pass
1050         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1051                                    self.proto[self.IP][self.TCP], port)
1052
1053         self.logger.info("ACLP_TEST_FINISH_0019")
1054
1055     def test_0020_udp_deny_port(self):
1056         """ deny single UDPv4/v6
1057         """
1058         self.logger.info("ACLP_TEST_START_0020")
1059
1060         port = random.randint(0, 65535)
1061         # Add an ACL
1062         rules = []
1063         rules.append(self.create_rule(self.IPV4, self.DENY, port,
1064                                       self.proto[self.IP][self.UDP]))
1065         rules.append(self.create_rule(self.IPV6, self.DENY, port,
1066                                       self.proto[self.IP][self.UDP]))
1067         # Permit ip any any in the end
1068         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1069                                       self.PORTS_ALL, 0))
1070         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1071                                       self.PORTS_ALL, 0))
1072
1073         # Apply rules
1074         self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1075
1076         # Traffic should not pass
1077         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1078                                    self.proto[self.IP][self.UDP], port)
1079
1080         self.logger.info("ACLP_TEST_FINISH_0020")
1081
1082     def test_0021_udp_deny_port_verify_fragment_deny(self):
1083         """ deny single UDPv4/v6, permit ip any, verify non-initial fragment blocked
1084         """
1085         self.logger.info("ACLP_TEST_START_0021")
1086
1087         port = random.randint(0, 65535)
1088         # Add an ACL
1089         rules = []
1090         rules.append(self.create_rule(self.IPV4, self.DENY, port,
1091                                       self.proto[self.IP][self.UDP]))
1092         rules.append(self.create_rule(self.IPV6, self.DENY, port,
1093                                       self.proto[self.IP][self.UDP]))
1094         # deny ip any any in the end
1095         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1096                                       self.PORTS_ALL, 0))
1097         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1098                                       self.PORTS_ALL, 0))
1099
1100         # Apply rules
1101         self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1102
1103         # Traffic should not pass
1104         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1105                                    self.proto[self.IP][self.UDP], port, True)
1106
1107         self.logger.info("ACLP_TEST_FINISH_0021")
1108
1109     def test_0022_zero_length_udp_ipv4(self):
1110         """ VPP-687 zero length udp ipv4 packet"""
1111         self.logger.info("ACLP_TEST_START_0022")
1112
1113         port = random.randint(0, 65535)
1114         # Add an ACL
1115         rules = []
1116         rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
1117                                       self.proto[self.IP][self.UDP]))
1118         # deny ip any any in the end
1119         rules.append(
1120             self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1121
1122         # Apply rules
1123         self.apply_rules(rules, "permit empty udp ip4 " + str(port))
1124
1125         # Traffic should still pass
1126         # Create incoming packet streams for packet-generator interfaces
1127         pkts_cnt = 0
1128         pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1129                                   self.IP, self.IPV4,
1130                                   self.proto[self.IP][self.UDP], port,
1131                                   False, False)
1132         if len(pkts) > 0:
1133             self.pg0.add_stream(pkts)
1134             pkts_cnt += len(pkts)
1135
1136         # Enable packet capture and start packet sendingself.IPV
1137         self.pg_enable_capture(self.pg_interfaces)
1138         self.pg_start()
1139
1140         self.pg1.get_capture(pkts_cnt)
1141
1142         self.logger.info("ACLP_TEST_FINISH_0022")
1143
1144     def test_0023_zero_length_udp_ipv6(self):
1145         """ VPP-687 zero length udp ipv6 packet"""
1146         self.logger.info("ACLP_TEST_START_0023")
1147
1148         port = random.randint(0, 65535)
1149         # Add an ACL
1150         rules = []
1151         rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1152                                       self.proto[self.IP][self.UDP]))
1153         # deny ip any any in the end
1154         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1155
1156         # Apply rules
1157         self.apply_rules(rules, "permit empty udp ip6 "+str(port))
1158
1159         # Traffic should still pass
1160         # Create incoming packet streams for packet-generator interfaces
1161         pkts_cnt = 0
1162         pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1163                                   self.IP, self.IPV6,
1164                                   self.proto[self.IP][self.UDP], port,
1165                                   False, False)
1166         if len(pkts) > 0:
1167             self.pg0.add_stream(pkts)
1168             pkts_cnt += len(pkts)
1169
1170         # Enable packet capture and start packet sendingself.IPV
1171         self.pg_enable_capture(self.pg_interfaces)
1172         self.pg_start()
1173
1174         # Verify outgoing packet streams per packet-generator interface
1175         self.pg1.get_capture(pkts_cnt)
1176
1177         self.logger.info("ACLP_TEST_FINISH_0023")
1178
1179     def test_0108_tcp_permit_v4(self):
1180         """ permit TCPv4 + non-match range
1181         """
1182         self.logger.info("ACLP_TEST_START_0108")
1183
1184         # Add an ACL
1185         rules = []
1186         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1187                      self.proto[self.IP][self.TCP]))
1188         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1189                      self.proto[self.IP][self.TCP]))
1190         # deny ip any any in the end
1191         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1192
1193         # Apply rules
1194         self.apply_rules(rules, "permit ipv4 tcp")
1195
1196         # Traffic should still pass
1197         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1198
1199         self.logger.info("ACLP_TEST_FINISH_0108")
1200
1201     def test_0109_tcp_permit_v6(self):
1202         """ permit TCPv6 + non-match range
1203         """
1204         self.logger.info("ACLP_TEST_START_0109")
1205
1206         # Add an ACL
1207         rules = []
1208         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1209                                       self.proto[self.IP][self.TCP]))
1210         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1211                                       self.proto[self.IP][self.TCP]))
1212         # deny ip any any in the end
1213         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1214
1215         # Apply rules
1216         self.apply_rules(rules, "permit ip6 tcp")
1217
1218         # Traffic should still pass
1219         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
1220
1221         self.logger.info("ACLP_TEST_FINISH_0109")
1222
1223     def test_0110_udp_permit_v4(self):
1224         """ permit UDPv4 + non-match range
1225         """
1226         self.logger.info("ACLP_TEST_START_0110")
1227
1228         # Add an ACL
1229         rules = []
1230         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1231                                       self.proto[self.IP][self.UDP]))
1232         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1233                                       self.proto[self.IP][self.UDP]))
1234         # deny ip any any in the end
1235         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1236
1237         # Apply rules
1238         self.apply_rules(rules, "permit ipv4 udp")
1239
1240         # Traffic should still pass
1241         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
1242
1243         self.logger.info("ACLP_TEST_FINISH_0110")
1244
1245     def test_0111_udp_permit_v6(self):
1246         """ permit UDPv6 + non-match range
1247         """
1248         self.logger.info("ACLP_TEST_START_0111")
1249
1250         # Add an ACL
1251         rules = []
1252         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1253                                       self.proto[self.IP][self.UDP]))
1254         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1255                                       self.proto[self.IP][self.UDP]))
1256         # deny ip any any in the end
1257         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1258
1259         # Apply rules
1260         self.apply_rules(rules, "permit ip6 udp")
1261
1262         # Traffic should still pass
1263         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
1264
1265         self.logger.info("ACLP_TEST_FINISH_0111")
1266
1267     def test_0112_tcp_deny(self):
1268         """ deny TCPv4/v6 + non-match range
1269         """
1270         self.logger.info("ACLP_TEST_START_0112")
1271
1272         # Add an ACL
1273         rules = []
1274         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1275                                       self.PORTS_RANGE_2,
1276                                       self.proto[self.IP][self.TCP]))
1277         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1278                                       self.PORTS_RANGE_2,
1279                                       self.proto[self.IP][self.TCP]))
1280         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1281                                       self.proto[self.IP][self.TCP]))
1282         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1283                                       self.proto[self.IP][self.TCP]))
1284         # permit ip any any in the end
1285         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1286                                       self.PORTS_ALL, 0))
1287         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1288                                       self.PORTS_ALL, 0))
1289
1290         # Apply rules
1291         self.apply_rules(rules, "deny ip4/ip6 tcp")
1292
1293         # Traffic should not pass
1294         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1295                                    self.proto[self.IP][self.TCP])
1296
1297         self.logger.info("ACLP_TEST_FINISH_0112")
1298
1299     def test_0113_udp_deny(self):
1300         """ deny UDPv4/v6 + non-match range
1301         """
1302         self.logger.info("ACLP_TEST_START_0113")
1303
1304         # Add an ACL
1305         rules = []
1306         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1307                                       self.PORTS_RANGE_2,
1308                                       self.proto[self.IP][self.UDP]))
1309         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1310                                       self.PORTS_RANGE_2,
1311                                       self.proto[self.IP][self.UDP]))
1312         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1313                                       self.proto[self.IP][self.UDP]))
1314         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1315                                       self.proto[self.IP][self.UDP]))
1316         # permit ip any any in the end
1317         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1318                                       self.PORTS_ALL, 0))
1319         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1320                                       self.PORTS_ALL, 0))
1321
1322         # Apply rules
1323         self.apply_rules(rules, "deny ip4/ip6 udp")
1324
1325         # Traffic should not pass
1326         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1327                                    self.proto[self.IP][self.UDP])
1328
1329         self.logger.info("ACLP_TEST_FINISH_0113")
1330
1331     def test_0300_tcp_permit_v4_etype_aaaa(self):
1332         """ permit TCPv4, send 0xAAAA etype
1333         """
1334         self.logger.info("ACLP_TEST_START_0300")
1335
1336         # Add an ACL
1337         rules = []
1338         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1339                      self.proto[self.IP][self.TCP]))
1340         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1341                      self.proto[self.IP][self.TCP]))
1342         # deny ip any any in the end
1343         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1344
1345         # Apply rules
1346         self.apply_rules(rules, "permit ipv4 tcp")
1347
1348         # Traffic should still pass
1349         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1350
1351         # Traffic should still pass also for an odd ethertype
1352         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1353                              0, False, True, 0xaaaa)
1354
1355         self.logger.info("ACLP_TEST_FINISH_0300")
1356
1357     def test_0305_tcp_permit_v4_etype_blacklist_aaaa(self):
1358         """ permit TCPv4, whitelist 0x0BBB ethertype, send 0xAAAA, 0x0BBB
1359         """
1360         self.logger.info("ACLP_TEST_START_0305")
1361
1362         # Add an ACL
1363         rules = []
1364         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1365                      self.proto[self.IP][self.TCP]))
1366         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1367                      self.proto[self.IP][self.TCP]))
1368         # deny ip any any in the end
1369         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1370
1371         # Apply rules
1372         self.apply_rules(rules, "permit ipv4 tcp")
1373
1374         # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1375         self.etype_whitelist([0xbbb], 1)
1376
1377         # The IPv4 traffic should still pass
1378         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1379
1380         # The oddball ethertype should be blocked
1381         self.run_verify_negat_test(self.IP, self.IPV4,
1382                                    self.proto[self.IP][self.TCP],
1383                                    0, False, 0xaaaa)
1384
1385         # The whitelisted traffic, on the other hand, should pass
1386         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1387                              0, False, True, 0x0bbb)
1388
1389         # remove the whitelist, the previously blocked 0xAAAA should pass now
1390         self.etype_whitelist([], 0)
1391         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1392                              0, False, True, 0xaaaa)
1393
1394         self.logger.info("ACLP_TEST_FINISH_0305")
1395
1396 if __name__ == '__main__':
1397     unittest.main(testRunner=VppTestRunner)