tests: make tests less make dependent
[vpp.git] / test / test_acl_plugin_conns.py
index 06f3cf7..cbf0ab3 100644 (file)
@@ -1,8 +1,9 @@
-#!/usr/bin/env python
+#!/usr/bin/env python3
 """ ACL plugin extended stateful tests """
 
 import unittest
 """ ACL plugin extended stateful tests """
 
 import unittest
-from framework import VppTestCase, VppTestRunner, running_extended_tests
+from config import config
+from framework import VppTestCase, VppTestRunner
 from scapy.layers.l2 import Ether
 from scapy.packet import Raw
 from scapy.layers.inet import IP, UDP, TCP
 from scapy.layers.l2 import Ether
 from scapy.packet import Raw
 from scapy.layers.inet import IP, UDP, TCP
@@ -13,6 +14,10 @@ from scapy.layers.inet6 import ICMPv6EchoReply, IPv6ExtHdrRouting
 from scapy.layers.inet6 import IPv6ExtHdrFragment
 from pprint import pprint
 from random import randint
 from scapy.layers.inet6 import IPv6ExtHdrFragment
 from pprint import pprint
 from random import randint
+from util import L4_Conn
+from ipaddress import ip_network
+
+from vpp_acl import AclRule, VppAcl, VppAclInterface
 
 
 def to_acl_rule(self, is_permit, wildcard_sport=False):
 
 
 def to_acl_rule(self, is_permit, wildcard_sport=False):
@@ -34,23 +39,18 @@ def to_acl_rule(self, is_permit, wildcard_sport=False):
         rule_l4_sport_first = rule_l4_sport
         rule_l4_sport_last = rule_l4_sport
 
         rule_l4_sport_first = rule_l4_sport
         rule_l4_sport_last = rule_l4_sport
 
-    new_rule = {
-          'is_permit': is_permit,
-          'is_ipv6': p.haslayer(IPv6),
-          'src_ip_addr': inet_pton(rule_family,
-                                   p[rule_l3_layer].src),
-          'src_ip_prefix_len': rule_prefix_len,
-          'dst_ip_addr': inet_pton(rule_family,
-                                   p[rule_l3_layer].dst),
-          'dst_ip_prefix_len': rule_prefix_len,
-          'srcport_or_icmptype_first': rule_l4_sport_first,
-          'srcport_or_icmptype_last': rule_l4_sport_last,
-          'dstport_or_icmpcode_first': rule_l4_dport,
-          'dstport_or_icmpcode_last': rule_l4_dport,
-          'proto': rule_l4_proto,
-         }
+    new_rule = AclRule(is_permit=is_permit, proto=rule_l4_proto,
+                       src_prefix=ip_network(
+                           (p[rule_l3_layer].src, rule_prefix_len)),
+                       dst_prefix=ip_network(
+                           (p[rule_l3_layer].dst, rule_prefix_len)),
+                       sport_from=rule_l4_sport_first,
+                       sport_to=rule_l4_sport_last,
+                       dport_from=rule_l4_dport, dport_to=rule_l4_dport)
+
     return new_rule
 
     return new_rule
 
+
 Packet.to_acl_rule = to_acl_rule
 
 
 Packet.to_acl_rule = to_acl_rule
 
 
@@ -68,37 +68,7 @@ class IterateWithSleep():
             self.testcase.sleep(self.sleep_sec)
 
 
             self.testcase.sleep(self.sleep_sec)
 
 
-class Conn():
-    def __init__(self, testcase, if1, if2, af, l4proto, port1, port2):
-        self.testcase = testcase
-        self.ifs = [None, None]
-        self.ifs[0] = if1
-        self.ifs[1] = if2
-        self.address_family = af
-        self.l4proto = l4proto
-        self.ports = [None, None]
-        self.ports[0] = port1
-        self.ports[1] = port2
-        self
-
-    def pkt(self, side, flags=None):
-        is_ip6 = 1 if self.address_family == AF_INET6 else 0
-        s0 = side
-        s1 = 1-side
-        src_if = self.ifs[s0]
-        dst_if = self.ifs[s1]
-        layer_3 = [IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4),
-                   IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)]
-        payload = "x"
-        l4args = {'sport': self.ports[s0], 'dport': self.ports[s1]}
-        if flags is not None:
-            l4args['flags'] = flags
-        p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
-             layer_3[is_ip6] /
-             self.l4proto(**l4args) /
-             Raw(payload))
-        return p
-
+class Conn(L4_Conn):
     def apply_acls(self, reflect_side, acl_side):
         pkts = []
         pkts.append(self.pkt(0))
     def apply_acls(self, reflect_side, acl_side):
         pkts = []
         pkts.append(self.pkt(0))
@@ -108,129 +78,82 @@ class Conn():
         r = []
         r.append(pkt.to_acl_rule(2, wildcard_sport=True))
         r.append(self.wildcard_rule(0))
         r = []
         r.append(pkt.to_acl_rule(2, wildcard_sport=True))
         r.append(self.wildcard_rule(0))
-        res = self.testcase.api_acl_add_replace(0xffffffff, r)
-        self.testcase.assert_equal(res.retval, 0, "error adding ACL")
-        reflect_acl_index = res.acl_index
+        reflect_acl = VppAcl(self.testcase, r)
+        reflect_acl.add_vpp_config()
 
         r = []
         r.append(self.wildcard_rule(0))
 
         r = []
         r.append(self.wildcard_rule(0))
-        res = self.testcase.api_acl_add_replace(0xffffffff, r)
-        self.testcase.assert_equal(res.retval, 0, "error adding deny ACL")
-        deny_acl_index = res.acl_index
+        deny_acl = VppAcl(self.testcase, r)
+        deny_acl.add_vpp_config()
 
         if reflect_side == acl_side:
 
         if reflect_side == acl_side:
-            self.testcase.api_acl_interface_set_acl_list(
-                   self.ifs[acl_side].sw_if_index, 2, 1,
-                   [reflect_acl_index,
-                    deny_acl_index])
-            self.testcase.api_acl_interface_set_acl_list(
-                   self.ifs[1-acl_side].sw_if_index, 0, 0, [])
+            acl_if0 = VppAclInterface(self.testcase,
+                                      self.ifs[acl_side].sw_if_index,
+                                      [reflect_acl, deny_acl], n_input=1)
+            acl_if1 = VppAclInterface(self.testcase,
+                                      self.ifs[1-acl_side].sw_if_index, [],
+                                      n_input=0)
+            acl_if0.add_vpp_config()
+            acl_if1.add_vpp_config()
         else:
         else:
-            self.testcase.api_acl_interface_set_acl_list(
-                   self.ifs[acl_side].sw_if_index, 2, 1,
-                   [deny_acl_index,
-                    reflect_acl_index])
-            self.testcase.api_acl_interface_set_acl_list(
-                   self.ifs[1-acl_side].sw_if_index, 0, 0, [])
+            acl_if0 = VppAclInterface(self.testcase,
+                                      self.ifs[acl_side].sw_if_index,
+                                      [deny_acl, reflect_acl], n_input=1)
+            acl_if1 = VppAclInterface(self.testcase,
+                                      self.ifs[1-acl_side].sw_if_index, [],
+                                      n_input=0)
+            acl_if0.add_vpp_config()
+            acl_if1.add_vpp_config()
 
     def wildcard_rule(self, is_permit):
         any_addr = ["0.0.0.0", "::"]
         rule_family = self.address_family
         is_ip6 = 1 if rule_family == AF_INET6 else 0
 
     def wildcard_rule(self, is_permit):
         any_addr = ["0.0.0.0", "::"]
         rule_family = self.address_family
         is_ip6 = 1 if rule_family == AF_INET6 else 0
-        new_rule = {
-              'is_permit': is_permit,
-              'is_ipv6': is_ip6,
-              'src_ip_addr': inet_pton(rule_family, any_addr[is_ip6]),
-              'src_ip_prefix_len': 0,
-              'dst_ip_addr': inet_pton(rule_family, any_addr[is_ip6]),
-              'dst_ip_prefix_len': 0,
-              'srcport_or_icmptype_first': 0,
-              'srcport_or_icmptype_last': 65535,
-              'dstport_or_icmpcode_first': 0,
-              'dstport_or_icmpcode_last': 65535,
-              'proto': 0,
-             }
+        new_rule = AclRule(is_permit=is_permit, proto=0,
+                           src_prefix=ip_network(
+                               (any_addr[is_ip6], 0)),
+                           dst_prefix=ip_network(
+                               (any_addr[is_ip6], 0)),
+                           sport_from=0, sport_to=65535, dport_from=0,
+                           dport_to=65535)
         return new_rule
 
         return new_rule
 
-    def send(self, side, flags=None):
-        self.ifs[side].add_stream(self.pkt(side, flags))
-        self.ifs[1-side].enable_capture()
-        self.testcase.pg_start()
-
-    def recv(self, side):
-        p = self.ifs[side].wait_for_packet(1)
-        return p
-
-    def send_through(self, side, flags=None):
-        self.send(side, flags)
-        p = self.recv(1-side)
-        return p
 
 
-    def send_pingpong(self, side, flags1=None, flags2=None):
-        p1 = self.send_through(side, flags1)
-        p2 = self.send_through(1-side, flags2)
-        return [p1, p2]
-
-
-@unittest.skipUnless(running_extended_tests(), "part of extended tests")
+@unittest.skipUnless(config.extended, "part of extended tests")
 class ACLPluginConnTestCase(VppTestCase):
     """ ACL plugin connection-oriented extended testcases """
 
     @classmethod
 class ACLPluginConnTestCase(VppTestCase):
     """ ACL plugin connection-oriented extended testcases """
 
     @classmethod
-    def setUpClass(self):
-        super(ACLPluginConnTestCase, self).setUpClass()
+    def setUpClass(cls):
+        super(ACLPluginConnTestCase, cls).setUpClass()
         # create pg0 and pg1
         # create pg0 and pg1
-        self.create_pg_interfaces(range(2))
-        for i in self.pg_interfaces:
+        cls.create_pg_interfaces(range(2))
+        cmd = "set acl-plugin session table event-trace 1"
+        cls.logger.info(cls.vapi.cli(cmd))
+        for i in cls.pg_interfaces:
             i.admin_up()
             i.config_ip4()
             i.config_ip6()
             i.resolve_arp()
             i.resolve_ndp()
 
             i.admin_up()
             i.config_ip4()
             i.config_ip6()
             i.resolve_arp()
             i.resolve_ndp()
 
+    @classmethod
+    def tearDownClass(cls):
+        super(ACLPluginConnTestCase, cls).tearDownClass()
+
     def tearDown(self):
         """Run standard test teardown and log various show commands
         """
         super(ACLPluginConnTestCase, self).tearDown()
     def tearDown(self):
         """Run standard test teardown and log various show commands
         """
         super(ACLPluginConnTestCase, self).tearDown()
-        if not self.vpp_dead:
-            self.logger.info(self.vapi.cli("show ip arp"))
-            self.logger.info(self.vapi.cli("show ip6 neighbors"))
-            self.logger.info(self.vapi.cli("show acl-plugin sessions"))
-            self.logger.info(self.vapi.cli("show acl-plugin acl"))
-            self.logger.info(self.vapi.cli("show acl-plugin interface"))
-            self.logger.info(self.vapi.cli("show acl-plugin tables"))
-
-    def api_acl_add_replace(self, acl_index, r, count=-1, tag="",
-                            expected_retval=0):
-        """Add/replace an ACL
-
-        :param int acl_index: ACL index to replace, 4294967295 to create new.
-        :param acl_rule r: ACL rules array.
-        :param str tag: symbolic tag (description) for this ACL.
-        :param int count: number of rules.
-        """
-        if (count < 0):
-            count = len(r)
-        return self.vapi.api(self.vapi.papi.acl_add_replace,
-                             {'acl_index': acl_index,
-                              'r': r,
-                              'count': count,
-                              'tag': tag
-                              }, expected_retval=expected_retval)
-
-    def api_acl_interface_set_acl_list(self, sw_if_index, count, n_input, acls,
-                                       expected_retval=0):
-        return self.vapi.api(self.vapi.papi.acl_interface_set_acl_list,
-                             {'sw_if_index': sw_if_index,
-                              'count': count,
-                              'n_input': n_input,
-                              'acls': acls
-                              }, expected_retval=expected_retval)
-
-    def api_acl_dump(self, acl_index, expected_retval=0):
-        return self.vapi.api(self.vapi.papi.acl_dump,
-                             {'acl_index': acl_index},
-                             expected_retval=expected_retval)
+
+    def show_commands_at_teardown(self):
+        self.logger.info(self.vapi.cli("show ip neighbors"))
+        self.logger.info(self.vapi.cli("show ip6 neighbors"))
+        self.logger.info(self.vapi.cli("show acl-plugin sessions"))
+        self.logger.info(self.vapi.cli("show acl-plugin acl"))
+        self.logger.info(self.vapi.cli("show acl-plugin interface"))
+        self.logger.info(self.vapi.cli("show acl-plugin tables"))
+        self.logger.info(self.vapi.cli("show event-logger all"))
 
     def run_basic_conn_test(self, af, acl_side):
         """ Basic conn timeout test """
 
     def run_basic_conn_test(self, af, acl_side):
         """ Basic conn timeout test """