ACL-plugin does not match UDP next-header, VPP-687
[vpp.git] / test / test_acl_plugin.py
1 #!/usr/bin/env python
2 """ACL plugin Test Case HLD:
3 """
4
5 import unittest
6 import random
7
8 from scapy.packet import Raw
9 from scapy.layers.l2 import Ether
10 from scapy.layers.inet import IP, TCP, UDP, ICMP
11 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
12 from scapy.layers.inet6 import IPv6ExtHdrFragment
13 from framework import VppTestCase, VppTestRunner
14 from util import Host, ppp
15
16
17 class TestACLplugin(VppTestCase):
18     """ ACL plugin Test Case """
19
20     # traffic types
21     IP = 0
22     ICMP = 1
23
24     # IP version
25     IPRANDOM = -1
26     IPV4 = 0
27     IPV6 = 1
28
29     # rule types
30     DENY = 0
31     PERMIT = 1
32
33     # supported protocols
34     proto = [[6, 17], [1, 58]]
35     proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
36     ICMPv4 = 0
37     ICMPv6 = 1
38     TCP = 0
39     UDP = 1
40     PROTO_ALL = 0
41
42     # port ranges
43     PORTS_ALL = -1
44     PORTS_RANGE = 0
45     udp_sport_from = 10
46     udp_sport_to = udp_sport_from + 5
47     udp_dport_from = 20000
48     udp_dport_to = udp_dport_from + 5000
49     tcp_sport_from = 30
50     tcp_sport_to = tcp_sport_from + 5
51     tcp_dport_from = 40000
52     tcp_dport_to = tcp_dport_from + 5000
53
54     icmp4_type = 8  # echo request
55     icmp4_code = 3
56     icmp6_type = 128  # echo request
57     icmp6_code = 3
58
59     # Test variables
60     bd_id = 1
61
62     @classmethod
63     def setUpClass(cls):
64         """
65         Perform standard class setup (defined by class method setUpClass in
66         class VppTestCase) before running the test case, set test case related
67         variables and configure VPP.
68         """
69         super(TestACLplugin, cls).setUpClass()
70
71         random.seed()
72
73         try:
74             # Create 2 pg interfaces
75             cls.create_pg_interfaces(range(2))
76
77             # Packet flows mapping pg0 -> pg1, pg2 etc.
78             cls.flows = dict()
79             cls.flows[cls.pg0] = [cls.pg1]
80
81             # Packet sizes
82             cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
83
84             # Create BD with MAC learning and unknown unicast flooding disabled
85             # and put interfaces to this BD
86             cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
87                                            learn=1)
88             for pg_if in cls.pg_interfaces:
89                 cls.vapi.sw_interface_set_l2_bridge(pg_if.sw_if_index,
90                                                     bd_id=cls.bd_id)
91
92             # Set up all interfaces
93             for i in cls.pg_interfaces:
94                 i.admin_up()
95
96             # Mapping between packet-generator index and lists of test hosts
97             cls.hosts_by_pg_idx = dict()
98             for pg_if in cls.pg_interfaces:
99                 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
100
101             # Create list of deleted hosts
102             cls.deleted_hosts_by_pg_idx = dict()
103             for pg_if in cls.pg_interfaces:
104                 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
105
106             # warm-up the mac address tables
107             # self.warmup_test()
108
109         except Exception:
110             super(TestACLplugin, cls).tearDownClass()
111             raise
112
113     def setUp(self):
114         super(TestACLplugin, self).setUp()
115         self.reset_packet_infos()
116
117     def tearDown(self):
118         """
119         Show various debug prints after each test.
120         """
121         super(TestACLplugin, self).tearDown()
122         if not self.vpp_dead:
123             self.logger.info(self.vapi.ppcli("show l2fib verbose"))
124             self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
125                                              % self.bd_id))
126
127     def create_hosts(self, count, start=0):
128         """
129         Create required number of host MAC addresses and distribute them among
130         interfaces. Create host IPv4 address for every host MAC address.
131
132         :param int count: Number of hosts to create MAC/IPv4 addresses for.
133         :param int start: Number to start numbering from.
134         """
135         n_int = len(self.pg_interfaces)
136         macs_per_if = count / n_int
137         i = -1
138         for pg_if in self.pg_interfaces:
139             i += 1
140             start_nr = macs_per_if * i + start
141             end_nr = count + start if i == (n_int - 1) \
142                 else macs_per_if * (i + 1) + start
143             hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
144             for j in range(start_nr, end_nr):
145                 host = Host(
146                     "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
147                     "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
148                     "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
149                 hosts.append(host)
150
151     def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
152                     s_prefix=0, s_ip='\x00\x00\x00\x00',
153                     d_prefix=0, d_ip='\x00\x00\x00\x00'):
154         if proto == -1:
155             return
156         if ports == self.PORTS_ALL:
157             sport_from = 0
158             dport_from = 0
159             sport_to = 65535 if proto != 1 and proto != 58 else 255
160             dport_to = sport_to
161         elif ports == self.PORTS_RANGE:
162             if proto == 1:
163                 sport_from = self.icmp4_type
164                 sport_to = self.icmp4_type
165                 dport_from = self.icmp4_code
166                 dport_to = self.icmp4_code
167             elif proto == 58:
168                 sport_from = self.icmp6_type
169                 sport_to = self.icmp6_type
170                 dport_from = self.icmp6_code
171                 dport_to = self.icmp6_code
172             elif proto == self.proto[self.IP][self.TCP]:
173                 sport_from = self.tcp_sport_from
174                 sport_to = self.tcp_sport_to
175                 dport_from = self.tcp_dport_from
176                 dport_to = self.tcp_dport_to
177             elif proto == self.proto[self.IP][self.UDP]:
178                 sport_from = self.udp_sport_from
179                 sport_to = self.udp_sport_to
180                 dport_from = self.udp_dport_from
181                 dport_to = self.udp_dport_to
182         else:
183             sport_from = ports
184             sport_to = ports
185             dport_from = ports
186             dport_to = ports
187
188         rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto,
189                  'srcport_or_icmptype_first': sport_from,
190                  'srcport_or_icmptype_last': sport_to,
191                  'src_ip_prefix_len': s_prefix,
192                  'src_ip_addr': s_ip,
193                  'dstport_or_icmpcode_first': dport_from,
194                  'dstport_or_icmpcode_last': dport_to,
195                  'dst_ip_prefix_len': d_prefix,
196                  'dst_ip_addr': d_ip})
197         return rule
198
199     def apply_rules(self, rules, tag=''):
200         reply = self.api_acl_add_replace(acl_index=4294967295, r=rules,
201                                          count=len(rules),
202                                          tag=tag)
203         self.logger.info("Dumped ACL: " + str(
204             self.api_acl_dump(reply.acl_index)))
205         # Apply a ACL on the interface as inbound
206         for i in self.pg_interfaces:
207             self.api_acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
208                                                 count=1, n_input=1,
209                                                 acls=[reply.acl_index])
210         return
211
212     def create_upper_layer(self, packet_index, proto, ports=0):
213         p = self.proto_map[proto]
214         if p == 'UDP':
215             if ports == 0:
216                 return UDP(sport=random.randint(self.udp_sport_from,
217                                                 self.udp_sport_to),
218                            dport=random.randint(self.udp_dport_from,
219                                                 self.udp_dport_to))
220             else:
221                 return UDP(sport=ports, dport=ports)
222         elif p == 'TCP':
223             if ports == 0:
224                 return TCP(sport=random.randint(self.tcp_sport_from,
225                                                 self.tcp_sport_to),
226                            dport=random.randint(self.tcp_dport_from,
227                                                 self.tcp_dport_to))
228             else:
229                 return TCP(sport=ports, dport=ports)
230         return ''
231
232     def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
233                       proto=-1, ports=0, fragments=False, pkt_raw=True):
234         """
235         Create input packet stream for defined interface using hosts or
236         deleted_hosts list.
237
238         :param object src_if: Interface to create packet stream for.
239         :param list packet_sizes: List of required packet sizes.
240         :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
241         :return: Stream of packets.
242         """
243         pkts = []
244         if self.flows.__contains__(src_if):
245             src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
246             for dst_if in self.flows[src_if]:
247                 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
248                 n_int = len(dst_hosts) * len(src_hosts)
249                 for i in range(0, n_int):
250                     dst_host = dst_hosts[i / len(src_hosts)]
251                     src_host = src_hosts[i % len(src_hosts)]
252                     pkt_info = self.create_packet_info(src_if, dst_if)
253                     if ipv6 == 1:
254                         pkt_info.ip = 1
255                     elif ipv6 == 0:
256                         pkt_info.ip = 0
257                     else:
258                         pkt_info.ip = random.choice([0, 1])
259                     if proto == -1:
260                         pkt_info.proto = random.choice(self.proto[self.IP])
261                     else:
262                         pkt_info.proto = proto
263                     payload = self.info_to_payload(pkt_info)
264                     p = Ether(dst=dst_host.mac, src=src_host.mac)
265                     if pkt_info.ip:
266                         p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
267                         if fragments:
268                             p /= IPv6ExtHdrFragment(offset=64, m=1)
269                     else:
270                         if fragments:
271                             p /= IP(src=src_host.ip4, dst=dst_host.ip4,
272                                     flags=1, frag=64)
273                         else:
274                             p /= IP(src=src_host.ip4, dst=dst_host.ip4)
275                     if traffic_type == self.ICMP:
276                         if pkt_info.ip:
277                             p /= ICMPv6EchoRequest(type=self.icmp6_type,
278                                                    code=self.icmp6_code)
279                         else:
280                             p /= ICMP(type=self.icmp4_type,
281                                       code=self.icmp4_code)
282                     else:
283                         p /= self.create_upper_layer(i, pkt_info.proto, ports)
284                     if pkt_raw:
285                         p /= Raw(payload)
286                         pkt_info.data = p.copy()
287                     if pkt_raw:
288                         size = random.choice(packet_sizes)
289                         self.extend_packet(p, size)
290                     pkts.append(p)
291         return pkts
292
293     def verify_capture(self, pg_if, capture, traffic_type=0, ip_type=0):
294         """
295         Verify captured input packet stream for defined interface.
296
297         :param object pg_if: Interface to verify captured packet stream for.
298         :param list capture: Captured packet stream.
299         :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
300         """
301         last_info = dict()
302         for i in self.pg_interfaces:
303             last_info[i.sw_if_index] = None
304         dst_sw_if_index = pg_if.sw_if_index
305         for packet in capture:
306             try:
307                 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
308                 if traffic_type == self.ICMP and ip_type == self.IPV6:
309                     payload_info = self.payload_to_info(
310                         packet[ICMPv6EchoRequest].data)
311                     payload = packet[ICMPv6EchoRequest]
312                 else:
313                     payload_info = self.payload_to_info(str(packet[Raw]))
314                     payload = packet[self.proto_map[payload_info.proto]]
315             except:
316                 self.logger.error(ppp("Unexpected or invalid packet "
317                                       "(outside network):", packet))
318                 raise
319
320             if ip_type != 0:
321                 self.assertEqual(payload_info.ip, ip_type)
322             if traffic_type == self.ICMP:
323                 try:
324                     if payload_info.ip == 0:
325                         self.assertEqual(payload.type, self.icmp4_type)
326                         self.assertEqual(payload.code, self.icmp4_code)
327                     else:
328                         self.assertEqual(payload.type, self.icmp6_type)
329                         self.assertEqual(payload.code, self.icmp6_code)
330                 except:
331                     self.logger.error(ppp("Unexpected or invalid packet "
332                                           "(outside network):", packet))
333                     raise
334             else:
335                 try:
336                     ip_version = IPv6 if payload_info.ip == 1 else IP
337
338                     ip = packet[ip_version]
339                     packet_index = payload_info.index
340
341                     self.assertEqual(payload_info.dst, dst_sw_if_index)
342                     self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
343                                       (pg_if.name, payload_info.src,
344                                        packet_index))
345                     next_info = self.get_next_packet_info_for_interface2(
346                         payload_info.src, dst_sw_if_index,
347                         last_info[payload_info.src])
348                     last_info[payload_info.src] = next_info
349                     self.assertTrue(next_info is not None)
350                     self.assertEqual(packet_index, next_info.index)
351                     saved_packet = next_info.data
352                     # Check standard fields
353                     self.assertEqual(ip.src, saved_packet[ip_version].src)
354                     self.assertEqual(ip.dst, saved_packet[ip_version].dst)
355                     p = self.proto_map[payload_info.proto]
356                     if p == 'TCP':
357                         tcp = packet[TCP]
358                         self.assertEqual(tcp.sport, saved_packet[
359                             TCP].sport)
360                         self.assertEqual(tcp.dport, saved_packet[
361                             TCP].dport)
362                     elif p == 'UDP':
363                         udp = packet[UDP]
364                         self.assertEqual(udp.sport, saved_packet[
365                             UDP].sport)
366                         self.assertEqual(udp.dport, saved_packet[
367                             UDP].dport)
368                 except:
369                     self.logger.error(ppp("Unexpected or invalid packet:",
370                                           packet))
371                     raise
372         for i in self.pg_interfaces:
373             remaining_packet = self.get_next_packet_info_for_interface2(
374                 i, dst_sw_if_index, last_info[i.sw_if_index])
375             self.assertTrue(
376                 remaining_packet is None,
377                 "Port %u: Packet expected from source %u didn't arrive" %
378                 (dst_sw_if_index, i.sw_if_index))
379
380     def run_traffic_no_check(self):
381         # Test
382         # Create incoming packet streams for packet-generator interfaces
383         for i in self.pg_interfaces:
384             if self.flows.__contains__(i):
385                 pkts = self.create_stream(i, self.pg_if_packet_sizes)
386                 if len(pkts) > 0:
387                     i.add_stream(pkts)
388
389         # Enable packet capture and start packet sending
390         self.pg_enable_capture(self.pg_interfaces)
391         self.pg_start()
392
393     def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
394                         frags=False, pkt_raw=True):
395         # Test
396         # Create incoming packet streams for packet-generator interfaces
397         pkts_cnt = 0
398         for i in self.pg_interfaces:
399             if self.flows.__contains__(i):
400                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
401                                           traffic_type, ip_type, proto, ports,
402                                           frags, pkt_raw)
403                 if len(pkts) > 0:
404                     i.add_stream(pkts)
405                     pkts_cnt += len(pkts)
406
407         # Enable packet capture and start packet sendingself.IPV
408         self.pg_enable_capture(self.pg_interfaces)
409         self.pg_start()
410
411         # Verify
412         # Verify outgoing packet streams per packet-generator interface
413         for src_if in self.pg_interfaces:
414             if self.flows.__contains__(src_if):
415                 for dst_if in self.flows[src_if]:
416                     capture = dst_if.get_capture(pkts_cnt)
417                     self.logger.info("Verifying capture on interface %s" %
418                                      dst_if.name)
419                     self.verify_capture(dst_if, capture, traffic_type, ip_type)
420
421     def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
422                               ports=0, frags=False):
423         # Test
424         self.reset_packet_infos()
425         for i in self.pg_interfaces:
426             if self.flows.__contains__(i):
427                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
428                                           traffic_type, ip_type, proto, ports,
429                                           frags)
430                 if len(pkts) > 0:
431                     i.add_stream(pkts)
432
433         # Enable packet capture and start packet sending
434         self.pg_enable_capture(self.pg_interfaces)
435         self.pg_start()
436
437         # Verify
438         # Verify outgoing packet streams per packet-generator interface
439         for src_if in self.pg_interfaces:
440             if self.flows.__contains__(src_if):
441                 for dst_if in self.flows[src_if]:
442                     self.logger.info("Verifying capture on interface %s" %
443                                      dst_if.name)
444                     capture = dst_if.get_capture(0)
445                     self.assertEqual(len(capture), 0)
446
447     def api_acl_add_replace(self, acl_index, r, count, tag='',
448                             expected_retval=0):
449         """Add/replace an ACL
450
451         :param int acl_index: ACL index to replace,
452         4294967295 to create new ACL.
453         :param acl_rule r: ACL rules array.
454         :param str tag: symbolic tag (description) for this ACL.
455         :param int count: number of rules.
456         """
457         return self.vapi.api(self.vapi.papi.acl_add_replace,
458                              {'acl_index': acl_index,
459                               'r': r,
460                               'count': count,
461                               'tag': tag},
462                              expected_retval=expected_retval)
463
464     def api_acl_interface_set_acl_list(self, sw_if_index, count, n_input, acls,
465                                        expected_retval=0):
466         return self.vapi.api(self.vapi.papi.acl_interface_set_acl_list,
467                              {'sw_if_index': sw_if_index,
468                               'count': count,
469                               'n_input': n_input,
470                               'acls': acls},
471                              expected_retval=expected_retval)
472
473     def api_acl_dump(self, acl_index, expected_retval=0):
474         return self.vapi.api(self.vapi.papi.acl_dump,
475                              {'acl_index': acl_index},
476                              expected_retval=expected_retval)
477
478     def test_0000_warmup_test(self):
479         """ ACL plugin version check; learn MACs
480         """
481         self.create_hosts(16)
482         self.run_traffic_no_check()
483         reply = self.vapi.papi.acl_plugin_get_version()
484         self.assertEqual(reply.major, 1)
485         self.logger.info("Working with ACL plugin version: %d.%d" % (
486             reply.major, reply.minor))
487         # minor version changes are non breaking
488         # self.assertEqual(reply.minor, 0)
489
490     def test_0001_acl_create(self):
491         """ ACL create test
492         """
493
494         self.logger.info("ACLP_TEST_START_0001")
495         # Add an ACL
496         r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
497               'srcport_or_icmptype_first': 1234,
498               'srcport_or_icmptype_last': 1235,
499               'src_ip_prefix_len': 0,
500               'src_ip_addr': '\x00\x00\x00\x00',
501               'dstport_or_icmpcode_first': 1234,
502               'dstport_or_icmpcode_last': 1234,
503               'dst_ip_addr': '\x00\x00\x00\x00',
504               'dst_ip_prefix_len': 0}]
505         # Test 1: add a new ACL
506         reply = self.api_acl_add_replace(acl_index=4294967295, r=r,
507                                          count=len(r), tag="permit 1234")
508         self.assertEqual(reply.retval, 0)
509         # The very first ACL gets #0
510         self.assertEqual(reply.acl_index, 0)
511         rr = self.api_acl_dump(reply.acl_index)
512         self.logger.info("Dumped ACL: " + str(rr))
513         self.assertEqual(len(rr), 1)
514         # We should have the same number of ACL entries as we had asked
515         self.assertEqual(len(rr[0].r), len(r))
516         # The rules should be the same. But because the submitted and returned
517         # are different types, we need to iterate over rules and keys to get
518         # to basic values.
519         for i_rule in range(0, len(r) - 1):
520             for rule_key in r[i_rule]:
521                 self.assertEqual(rr[0].r[i_rule][rule_key],
522                                  r[i_rule][rule_key])
523
524         # Add a deny-1234 ACL
525         r_deny = ({'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
526                    'srcport_or_icmptype_first': 1234,
527                    'srcport_or_icmptype_last': 1235,
528                    'src_ip_prefix_len': 0,
529                    'src_ip_addr': '\x00\x00\x00\x00',
530                    'dstport_or_icmpcode_first': 1234,
531                    'dstport_or_icmpcode_last': 1234,
532                    'dst_ip_addr': '\x00\x00\x00\x00',
533                    'dst_ip_prefix_len': 0},
534                   {'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
535                    'srcport_or_icmptype_first': 0,
536                    'srcport_or_icmptype_last': 0,
537                    'src_ip_prefix_len': 0,
538                    'src_ip_addr': '\x00\x00\x00\x00',
539                    'dstport_or_icmpcode_first': 0,
540                    'dstport_or_icmpcode_last': 0,
541                    'dst_ip_addr': '\x00\x00\x00\x00',
542                    'dst_ip_prefix_len': 0})
543
544         reply = self.api_acl_add_replace(acl_index=4294967295, r=r_deny,
545                                          count=len(r_deny),
546                                          tag="deny 1234;permit all")
547         self.assertEqual(reply.retval, 0)
548         # The second ACL gets #1
549         self.assertEqual(reply.acl_index, 1)
550
551         # Test 2: try to modify a nonexistent ACL
552         reply = self.api_acl_add_replace(acl_index=432, r=r, count=len(r),
553                                          tag="FFFF:FFFF", expected_retval=-1)
554         self.assertEqual(reply.retval, -1)
555         # The ACL number should pass through
556         self.assertEqual(reply.acl_index, 432)
557
558         self.logger.info("ACLP_TEST_FINISH_0001")
559
560     def test_0002_acl_permit_apply(self):
561         """ permit ACL apply test
562         """
563         self.logger.info("ACLP_TEST_START_0002")
564
565         rules = []
566         rules.append(self.create_rule(self.IPV4, self.PERMIT,
567                      0, self.proto[self.IP][self.UDP]))
568         rules.append(self.create_rule(self.IPV4, self.PERMIT,
569                      0, self.proto[self.IP][self.TCP]))
570
571         # Apply rules
572         self.apply_rules(rules, "permit per-flow")
573
574         # Traffic should still pass
575         self.run_verify_test(self.IP, self.IPV4, -1)
576         self.logger.info("ACLP_TEST_FINISH_0002")
577
578     def test_0003_acl_deny_apply(self):
579         """ deny ACL apply test
580         """
581         self.logger.info("ACLP_TEST_START_0003")
582         # Add a deny-flows ACL
583         rules = []
584         rules.append(self.create_rule(self.IPV4, self.DENY,
585                      self.PORTS_ALL, self.proto[self.IP][self.UDP]))
586         # Permit ip any any in the end
587         rules.append(self.create_rule(self.IPV4, self.PERMIT,
588                                       self.PORTS_ALL, 0))
589
590         # Apply rules
591         self.apply_rules(rules, "deny per-flow;permit all")
592
593         # Traffic should not pass
594         self.run_verify_negat_test(self.IP, self.IPV4,
595                                    self.proto[self.IP][self.UDP])
596         self.logger.info("ACLP_TEST_FINISH_0003")
597         # self.assertEqual(1, 0)
598
599     def test_0004_vpp624_permit_icmpv4(self):
600         """ VPP_624 permit ICMPv4
601         """
602         self.logger.info("ACLP_TEST_START_0004")
603
604         # Add an ACL
605         rules = []
606         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
607                                       self.proto[self.ICMP][self.ICMPv4]))
608         # deny ip any any in the end
609         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
610
611         # Apply rules
612         self.apply_rules(rules, "permit icmpv4")
613
614         # Traffic should still pass
615         self.run_verify_test(self.ICMP, self.IPV4,
616                              self.proto[self.ICMP][self.ICMPv4])
617
618         self.logger.info("ACLP_TEST_FINISH_0004")
619
620     def test_0005_vpp624_permit_icmpv6(self):
621         """ VPP_624 permit ICMPv6
622         """
623         self.logger.info("ACLP_TEST_START_0005")
624
625         # Add an ACL
626         rules = []
627         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
628                                       self.proto[self.ICMP][self.ICMPv6]))
629         # deny ip any any in the end
630         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
631
632         # Apply rules
633         self.apply_rules(rules, "permit icmpv6")
634
635         # Traffic should still pass
636         self.run_verify_test(self.ICMP, self.IPV6,
637                              self.proto[self.ICMP][self.ICMPv6])
638
639         self.logger.info("ACLP_TEST_FINISH_0005")
640
641     def test_0006_vpp624_deny_icmpv4(self):
642         """ VPP_624 deny ICMPv4
643         """
644         self.logger.info("ACLP_TEST_START_0006")
645         # Add an ACL
646         rules = []
647         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
648                                       self.proto[self.ICMP][self.ICMPv4]))
649         # permit ip any any in the end
650         rules.append(self.create_rule(self.IPV4, self.PERMIT,
651                                       self.PORTS_ALL, 0))
652
653         # Apply rules
654         self.apply_rules(rules, "deny icmpv4")
655
656         # Traffic should not pass
657         self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
658
659         self.logger.info("ACLP_TEST_FINISH_0006")
660
661     def test_0007_vpp624_deny_icmpv6(self):
662         """ VPP_624 deny ICMPv6
663         """
664         self.logger.info("ACLP_TEST_START_0007")
665         # Add an ACL
666         rules = []
667         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
668                                       self.proto[self.ICMP][self.ICMPv6]))
669         # deny ip any any in the end
670         rules.append(self.create_rule(self.IPV6, self.PERMIT,
671                                       self.PORTS_ALL, 0))
672
673         # Apply rules
674         self.apply_rules(rules, "deny icmpv6")
675
676         # Traffic should not pass
677         self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
678
679         self.logger.info("ACLP_TEST_FINISH_0007")
680
681     def test_0008_tcp_permit_v4(self):
682         """ permit TCPv4
683         """
684         self.logger.info("ACLP_TEST_START_0008")
685
686         # Add an ACL
687         rules = []
688         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
689                      self.proto[self.IP][self.TCP]))
690         # deny ip any any in the end
691         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
692
693         # Apply rules
694         self.apply_rules(rules, "permit ipv4 tcp")
695
696         # Traffic should still pass
697         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
698
699         self.logger.info("ACLP_TEST_FINISH_0008")
700
701     def test_0009_tcp_permit_v6(self):
702         """ permit TCPv6
703         """
704         self.logger.info("ACLP_TEST_START_0009")
705
706         # Add an ACL
707         rules = []
708         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
709                                       self.proto[self.IP][self.TCP]))
710         # deny ip any any in the end
711         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
712
713         # Apply rules
714         self.apply_rules(rules, "permit ip6 tcp")
715
716         # Traffic should still pass
717         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
718
719         self.logger.info("ACLP_TEST_FINISH_0008")
720
721     def test_0010_udp_permit_v4(self):
722         """ permit UDPv4
723         """
724         self.logger.info("ACLP_TEST_START_0010")
725
726         # Add an ACL
727         rules = []
728         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
729                                       self.proto[self.IP][self.UDP]))
730         # deny ip any any in the end
731         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
732
733         # Apply rules
734         self.apply_rules(rules, "permit ipv udp")
735
736         # Traffic should still pass
737         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
738
739         self.logger.info("ACLP_TEST_FINISH_0010")
740
741     def test_0011_udp_permit_v6(self):
742         """ permit UDPv6
743         """
744         self.logger.info("ACLP_TEST_START_0011")
745
746         # Add an ACL
747         rules = []
748         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
749                                       self.proto[self.IP][self.UDP]))
750         # deny ip any any in the end
751         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
752
753         # Apply rules
754         self.apply_rules(rules, "permit ip6 udp")
755
756         # Traffic should still pass
757         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
758
759         self.logger.info("ACLP_TEST_FINISH_0011")
760
761     def test_0012_tcp_deny(self):
762         """ deny TCPv4/v6
763         """
764         self.logger.info("ACLP_TEST_START_0012")
765
766         # Add an ACL
767         rules = []
768         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
769                                       self.proto[self.IP][self.TCP]))
770         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
771                                       self.proto[self.IP][self.TCP]))
772         # permit ip any any in the end
773         rules.append(self.create_rule(self.IPV4, self.PERMIT,
774                                       self.PORTS_ALL, 0))
775         rules.append(self.create_rule(self.IPV6, self.PERMIT,
776                                       self.PORTS_ALL, 0))
777
778         # Apply rules
779         self.apply_rules(rules, "deny ip4/ip6 tcp")
780
781         # Traffic should not pass
782         self.run_verify_negat_test(self.IP, self.IPRANDOM,
783                                    self.proto[self.IP][self.TCP])
784
785         self.logger.info("ACLP_TEST_FINISH_0012")
786
787     def test_0013_udp_deny(self):
788         """ deny UDPv4/v6
789         """
790         self.logger.info("ACLP_TEST_START_0013")
791
792         # Add an ACL
793         rules = []
794         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
795                                       self.proto[self.IP][self.UDP]))
796         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
797                                       self.proto[self.IP][self.UDP]))
798         # permit ip any any in the end
799         rules.append(self.create_rule(self.IPV4, self.PERMIT,
800                                       self.PORTS_ALL, 0))
801         rules.append(self.create_rule(self.IPV6, self.PERMIT,
802                                       self.PORTS_ALL, 0))
803
804         # Apply rules
805         self.apply_rules(rules, "deny ip4/ip6 udp")
806
807         # Traffic should not pass
808         self.run_verify_negat_test(self.IP, self.IPRANDOM,
809                                    self.proto[self.IP][self.UDP])
810
811         self.logger.info("ACLP_TEST_FINISH_0013")
812
813     def test_0014_acl_dump(self):
814         """ verify add/dump acls
815         """
816         self.logger.info("ACLP_TEST_START_0014")
817
818         r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
819              [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
820              [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
821              [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
822              [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
823              [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
824              [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
825              [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
826              [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
827              [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
828              [self.IPV4, self.DENY, self.PORTS_ALL, 0],
829              [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
830              [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
831              [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
832              [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
833              [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
834              [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
835              [self.IPV6, self.DENY, self.PORTS_ALL, 0]
836              ]
837
838         # Add and verify new ACLs
839         rules = []
840         for i in range(len(r)):
841             rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
842
843         reply = self.api_acl_add_replace(acl_index=4294967295, r=rules,
844                                          count=len(rules))
845         result = self.api_acl_dump(reply.acl_index)
846
847         i = 0
848         for drules in result:
849             for dr in drules.r:
850                 self.assertEqual(dr.is_ipv6, r[i][0])
851                 self.assertEqual(dr.is_permit, r[i][1])
852                 self.assertEqual(dr.proto, r[i][3])
853
854                 if r[i][2] > 0:
855                     self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
856                 else:
857                     if r[i][2] < 0:
858                         self.assertEqual(dr.srcport_or_icmptype_first, 0)
859                         self.assertEqual(dr.srcport_or_icmptype_last, 65535)
860                     else:
861                         if dr.proto == self.proto[self.IP][self.TCP]:
862                             self.assertGreater(dr.srcport_or_icmptype_first,
863                                                self.tcp_sport_from-1)
864                             self.assertLess(dr.srcport_or_icmptype_first,
865                                             self.tcp_sport_to+1)
866                             self.assertGreater(dr.dstport_or_icmpcode_last,
867                                                self.tcp_dport_from-1)
868                             self.assertLess(dr.dstport_or_icmpcode_last,
869                                             self.tcp_dport_to+1)
870                         elif dr.proto == self.proto[self.IP][self.UDP]:
871                             self.assertGreater(dr.srcport_or_icmptype_first,
872                                                self.udp_sport_from-1)
873                             self.assertLess(dr.srcport_or_icmptype_first,
874                                             self.udp_sport_to+1)
875                             self.assertGreater(dr.dstport_or_icmpcode_last,
876                                                self.udp_dport_from-1)
877                             self.assertLess(dr.dstport_or_icmpcode_last,
878                                             self.udp_dport_to+1)
879                 i += 1
880
881         self.logger.info("ACLP_TEST_FINISH_0014")
882
883     def test_0015_tcp_permit_port_v4(self):
884         """ permit single TCPv4
885         """
886         self.logger.info("ACLP_TEST_START_0015")
887
888         port = random.randint(0, 65535)
889         # Add an ACL
890         rules = []
891         rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
892                                       self.proto[self.IP][self.TCP]))
893         # deny ip any any in the end
894         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
895
896         # Apply rules
897         self.apply_rules(rules, "permit ip4 tcp "+str(port))
898
899         # Traffic should still pass
900         self.run_verify_test(self.IP, self.IPV4,
901                              self.proto[self.IP][self.TCP], port)
902
903         self.logger.info("ACLP_TEST_FINISH_0015")
904
905     def test_0016_udp_permit_port_v4(self):
906         """ permit single UDPv4
907         """
908         self.logger.info("ACLP_TEST_START_0016")
909
910         port = random.randint(0, 65535)
911         # Add an ACL
912         rules = []
913         rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
914                                       self.proto[self.IP][self.UDP]))
915         # deny ip any any in the end
916         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
917
918         # Apply rules
919         self.apply_rules(rules, "permit ip4 tcp "+str(port))
920
921         # Traffic should still pass
922         self.run_verify_test(self.IP, self.IPV4,
923                              self.proto[self.IP][self.UDP], port)
924
925         self.logger.info("ACLP_TEST_FINISH_0016")
926
927     def test_0017_tcp_permit_port_v6(self):
928         """ permit single TCPv6
929         """
930         self.logger.info("ACLP_TEST_START_0017")
931
932         port = random.randint(0, 65535)
933         # Add an ACL
934         rules = []
935         rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
936                                       self.proto[self.IP][self.TCP]))
937         # deny ip any any in the end
938         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
939
940         # Apply rules
941         self.apply_rules(rules, "permit ip4 tcp "+str(port))
942
943         # Traffic should still pass
944         self.run_verify_test(self.IP, self.IPV6,
945                              self.proto[self.IP][self.TCP], port)
946
947         self.logger.info("ACLP_TEST_FINISH_0017")
948
949     def test_0018_udp_permit_port_v6(self):
950         """ permit single UPPv6
951         """
952         self.logger.info("ACLP_TEST_START_0018")
953
954         port = random.randint(0, 65535)
955         # Add an ACL
956         rules = []
957         rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
958                                       self.proto[self.IP][self.UDP]))
959         # deny ip any any in the end
960         rules.append(self.create_rule(self.IPV6, self.DENY,
961                                       self.PORTS_ALL, 0))
962
963         # Apply rules
964         self.apply_rules(rules, "permit ip4 tcp "+str(port))
965
966         # Traffic should still pass
967         self.run_verify_test(self.IP, self.IPV6,
968                              self.proto[self.IP][self.UDP], port)
969
970         self.logger.info("ACLP_TEST_FINISH_0018")
971
972     def test_0019_udp_deny_port(self):
973         """ deny single TCPv4/v6
974         """
975         self.logger.info("ACLP_TEST_START_0019")
976
977         port = random.randint(0, 65535)
978         # Add an ACL
979         rules = []
980         rules.append(self.create_rule(self.IPV4, self.DENY, port,
981                                       self.proto[self.IP][self.TCP]))
982         rules.append(self.create_rule(self.IPV6, self.DENY, port,
983                                       self.proto[self.IP][self.TCP]))
984         # Permit ip any any in the end
985         rules.append(self.create_rule(self.IPV4, self.PERMIT,
986                                       self.PORTS_ALL, 0))
987         rules.append(self.create_rule(self.IPV6, self.PERMIT,
988                                       self.PORTS_ALL, 0))
989
990         # Apply rules
991         self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
992
993         # Traffic should not pass
994         self.run_verify_negat_test(self.IP, self.IPRANDOM,
995                                    self.proto[self.IP][self.TCP], port)
996
997         self.logger.info("ACLP_TEST_FINISH_0019")
998
999     def test_0020_udp_deny_port(self):
1000         """ deny single UDPv4/v6
1001         """
1002         self.logger.info("ACLP_TEST_START_0020")
1003
1004         port = random.randint(0, 65535)
1005         # Add an ACL
1006         rules = []
1007         rules.append(self.create_rule(self.IPV4, self.DENY, port,
1008                                       self.proto[self.IP][self.UDP]))
1009         rules.append(self.create_rule(self.IPV6, self.DENY, port,
1010                                       self.proto[self.IP][self.UDP]))
1011         # Permit ip any any in the end
1012         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1013                                       self.PORTS_ALL, 0))
1014         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1015                                       self.PORTS_ALL, 0))
1016
1017         # Apply rules
1018         self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1019
1020         # Traffic should not pass
1021         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1022                                    self.proto[self.IP][self.UDP], port)
1023
1024         self.logger.info("ACLP_TEST_FINISH_0020")
1025
1026     def test_0021_udp_deny_port_verify_fragment_deny(self):
1027         """ deny single UDPv4/v6, permit ip any, verify non-initial fragment blocked
1028         """
1029         self.logger.info("ACLP_TEST_START_0021")
1030
1031         port = random.randint(0, 65535)
1032         # Add an ACL
1033         rules = []
1034         rules.append(self.create_rule(self.IPV4, self.DENY, port,
1035                                       self.proto[self.IP][self.UDP]))
1036         rules.append(self.create_rule(self.IPV6, self.DENY, port,
1037                                       self.proto[self.IP][self.UDP]))
1038         # deny ip any any in the end
1039         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1040                                       self.PORTS_ALL, 0))
1041         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1042                                       self.PORTS_ALL, 0))
1043
1044         # Apply rules
1045         self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1046
1047         # Traffic should not pass
1048         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1049                                    self.proto[self.IP][self.UDP], port, True)
1050
1051         self.logger.info("ACLP_TEST_FINISH_0021")
1052
1053     def test_0022_zero_length_udp_ipv4(self):
1054         """ VPP-687 zero length udp ipv4 packet"""
1055         self.logger.info("ACLP_TEST_START_0022")
1056
1057         port = random.randint(0, 65535)
1058         # Add an ACL
1059         rules = []
1060         rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
1061                                       self.proto[self.IP][self.UDP]))
1062         # deny ip any any in the end
1063         rules.append(
1064             self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1065
1066         # Apply rules
1067         self.apply_rules(rules, "permit empty udp ip4 " + str(port))
1068
1069         # Traffic should still pass
1070         # Create incoming packet streams for packet-generator interfaces
1071         pkts_cnt = 0
1072         pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1073                                   self.IP, self.IPV4,
1074                                   self.proto[self.IP][self.UDP], port,
1075                                   False, False)
1076         if len(pkts) > 0:
1077             self.pg0.add_stream(pkts)
1078             pkts_cnt += len(pkts)
1079
1080         # Enable packet capture and start packet sendingself.IPV
1081         self.pg_enable_capture(self.pg_interfaces)
1082         self.pg_start()
1083
1084         self.pg1.get_capture(pkts_cnt)
1085
1086         self.logger.info("ACLP_TEST_FINISH_0022")
1087
1088     def test_0023_zero_length_udp_ipv6(self):
1089         """ VPP-687 zero length udp ipv6 packet"""
1090         self.logger.info("ACLP_TEST_START_0023")
1091
1092         port = random.randint(0, 65535)
1093         # Add an ACL
1094         rules = []
1095         rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1096                                       self.proto[self.IP][self.UDP]))
1097         # deny ip any any in the end
1098         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1099
1100         # Apply rules
1101         self.apply_rules(rules, "permit empty udp ip6 "+str(port))
1102
1103         # Traffic should still pass
1104         # Create incoming packet streams for packet-generator interfaces
1105         pkts_cnt = 0
1106         pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1107                                   self.IP, self.IPV6,
1108                                   self.proto[self.IP][self.UDP], port,
1109                                   False, False)
1110         if len(pkts) > 0:
1111             self.pg0.add_stream(pkts)
1112             pkts_cnt += len(pkts)
1113
1114         # Enable packet capture and start packet sendingself.IPV
1115         self.pg_enable_capture(self.pg_interfaces)
1116         self.pg_start()
1117
1118         # Verify outgoing packet streams per packet-generator interface
1119         self.pg1.get_capture(pkts_cnt)
1120
1121         self.logger.info("ACLP_TEST_FINISH_0023")
1122
1123 if __name__ == '__main__':
1124     unittest.main(testRunner=VppTestRunner)