acl: API cleanup
[vpp.git] / src / plugins / acl / test / vpp_acl.py
diff --git a/src/plugins/acl/test/vpp_acl.py b/src/plugins/acl/test/vpp_acl.py
new file mode 100644 (file)
index 0000000..d4ed167
--- /dev/null
@@ -0,0 +1,460 @@
+from ipaddress import IPv4Network
+
+from vpp_object import VppObject
+from vpp_papi import VppEnum
+from vpp_ip import INVALID_INDEX
+from vpp_papi_provider import UnexpectedApiReturnValueError
+
+
+class VppAclPlugin(VppObject):
+
+    def __init__(self, test, enable_intf_counters=False):
+        self._test = test
+        self.enable_intf_counters = enable_intf_counters
+
+    @property
+    def enable_intf_counters(self):
+        return self._enable_intf_counters
+
+    @enable_intf_counters.setter
+    def enable_intf_counters(self, enable):
+        self.vapi.acl_stats_intf_counters_enable(enable=enable)
+
+    def add_vpp_config(self):
+        pass
+
+    def remove_vpp_config(self):
+        pass
+
+    def query_vpp_config(self):
+        pass
+
+    def object_id(self):
+        return ("acl-plugin-%d" % (self._sw_if_index))
+
+
+class AclRule():
+    """ ACL Rule """
+
+    # port ranges
+    PORTS_ALL = -1
+    PORTS_RANGE = 0
+    PORTS_RANGE_2 = 1
+    udp_sport_from = 10
+    udp_sport_to = udp_sport_from + 5
+    udp_dport_from = 20000
+    udp_dport_to = udp_dport_from + 5000
+    tcp_sport_from = 30
+    tcp_sport_to = tcp_sport_from + 5
+    tcp_dport_from = 40000
+    tcp_dport_to = tcp_dport_from + 5000
+
+    udp_sport_from_2 = 90
+    udp_sport_to_2 = udp_sport_from_2 + 5
+    udp_dport_from_2 = 30000
+    udp_dport_to_2 = udp_dport_from_2 + 5000
+    tcp_sport_from_2 = 130
+    tcp_sport_to_2 = tcp_sport_from_2 + 5
+    tcp_dport_from_2 = 20000
+    tcp_dport_to_2 = tcp_dport_from_2 + 5000
+
+    icmp4_type = 8  # echo request
+    icmp4_code = 3
+    icmp6_type = 128  # echo request
+    icmp6_code = 3
+
+    icmp4_type_2 = 8
+    icmp4_code_from_2 = 5
+    icmp4_code_to_2 = 20
+    icmp6_type_2 = 128
+    icmp6_code_from_2 = 8
+    icmp6_code_to_2 = 42
+
+    def __init__(self, is_permit, src_prefix=IPv4Network('0.0.0.0/0'),
+                 dst_prefix=IPv4Network('0.0.0.0/0'),
+                 proto=0, ports=PORTS_ALL):
+        self.is_permit = is_permit
+        self.src_prefix = src_prefix
+        self.dst_prefix = dst_prefix
+        self._proto = proto
+        self._ports = ports
+        self.sport_from = 0
+        self.sport_to = 0
+        self.dport_from = 0
+        self.dport_to = 0
+        self.update_ports()
+
+    def __copy__(self):
+        """
+        ports are assigned implicitly based on _proto and _ports values,
+        so we need to set them manually in case they were user defined
+        """
+        new_rule = AclRule(self.is_permit, self.src_prefix, self.dst_prefix,
+                           self._proto, self._ports)
+        new_rule.sport_from = self.sport_from
+        new_rule.sport_to = self.sport_to
+        new_rule.dport_from = self.dport_from
+        new_rule.dport_to = self.dport_to
+        return new_rule
+
+    def update_ports(self):
+        if self._ports == self.PORTS_ALL:
+            self.sport_from = 0
+            self.dport_from = 0
+            self.sport_to = 65535
+            if self._proto == 1 or self._proto == 58:
+                self.sport_to = 255
+            self.dport_to = self.sport_to
+        elif self._ports == self.PORTS_RANGE:
+            if self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP:
+                self.sport_from = self.icmp4_type
+                self.sport_to = self.icmp4_type
+                self.dport_from = self.icmp4_code
+                self.dport_to = self.icmp4_code
+            elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP6:
+                self.sport_from = self.icmp6_type
+                self.sport_to = self.icmp6_type
+                self.dport_from = self.icmp6_code
+                self.dport_to = self.icmp6_code
+            elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_TCP:
+                self.sport_from = self.tcp_sport_from
+                self.sport_to = self.tcp_sport_to
+                self.dport_from = self.tcp_dport_from
+                self.dport_to = self.tcp_dport_to
+            elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP:
+                self.sport_from = self.udp_sport_from
+                self.sport_to = self.udp_sport_to
+                self.dport_from = self.udp_dport_from
+                self.dport_to = self.udp_dport_to
+        elif self._ports == self.PORTS_RANGE_2:
+            if self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP:
+                self.sport_from = self.icmp4_type_2
+                self.sport_to = self.icmp4_type_2
+                self.dport_from = self.icmp4_code_from_2
+                self.dport_to = self.icmp4_code_to_2
+            elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP6:
+                self.sport_from = self.icmp6_type_2
+                self.sport_to = self.icmp6_type_2
+                self.dport_from = self.icmp6_code_from_2
+                self.dport_to = self.icmp6_code_to_2
+            elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_TCP:
+                self.sport_from = self.tcp_sport_from_2
+                self.sport_to = self.tcp_sport_to_2
+                self.dport_from = self.tcp_dport_from_2
+                self.dport_to = self.tcp_dport_to_2
+            elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP:
+                self.sport_from = self.udp_sport_from_2
+                self.sport_to = self.udp_sport_to_2
+                self.dport_from = self.udp_dport_from_2
+                self.dport_to = self.udp_dport_to_2
+        else:
+            self.sport_from = self._ports
+            self.sport_to = self._ports
+            self.dport_from = self._ports
+            self.dport_to = self._ports
+
+    @property
+    def proto(self):
+        return self._proto
+
+    @proto.setter
+    def proto(self, proto):
+        self._proto = proto
+        self.update_ports()
+
+    @property
+    def ports(self):
+        return self._ports
+
+    @ports.setter
+    def ports(self, ports):
+        self._ports = ports
+        self.update_ports()
+
+    def encode(self):
+        return {'is_permit': self.is_permit, 'proto': self.proto,
+                'srcport_or_icmptype_first': self.sport_from,
+                'srcport_or_icmptype_last': self.sport_to,
+                'src_prefix': self.src_prefix,
+                'dstport_or_icmpcode_first': self.dport_from,
+                'dstport_or_icmpcode_last': self.dport_to,
+                'dst_prefix': self.dst_prefix}
+
+
+class VppAcl(VppObject):
+    """ VPP ACL """
+
+    def __init__(self, test, rules, acl_index=INVALID_INDEX, tag=None):
+        self._test = test
+        self._acl_index = acl_index
+        self.tag = tag
+        self.rules = rules
+
+    @property
+    def acl_index(self):
+        return self._acl_index
+
+    @property
+    def count(self):
+        return len(self.rules)
+
+    def encode_rules(self):
+        rules = []
+        for rule in self.rules:
+            rules.append(rule.encode())
+        return rules
+
+    def add_vpp_config(self, expect_error=False):
+        try:
+            reply = self._test.vapi.acl_add_replace(
+                acl_index=self._acl_index, tag=self.tag, count=self.count,
+                r=self.encode_rules())
+            self._acl_index = reply.acl_index
+            self._test.registry.register(self, self._test.logger)
+            if expect_error:
+                self._test.fail("Unexpected api reply")
+            return self
+        except UnexpectedApiReturnValueError:
+            if not expect_error:
+                self._test.fail("Unexpected api reply")
+        return None
+
+    def remove_vpp_config(self, expect_error=False):
+        try:
+            self._test.vapi.acl_del(acl_index=self._acl_index)
+            if expect_error:
+                self._test.fail("Unexpected api reply")
+        except UnexpectedApiReturnValueError:
+            if not expect_error:
+                self._test.fail("Unexpected api reply")
+
+    def dump(self):
+        return self._test.vapi.acl_dump(acl_index=self._acl_index)
+
+    def query_vpp_config(self):
+        dump = self.dump()
+        for rule in dump:
+            if rule.acl_index == self._acl_index:
+                return True
+        return False
+
+    def object_id(self):
+        return ("acl-%s-%d" % (self.tag, self._acl_index))
+
+
+class VppEtypeWhitelist(VppObject):
+    """ VPP Etype Whitelist """
+
+    def __init__(self, test, sw_if_index, whitelist, n_input=0):
+        self._test = test
+        self.whitelist = whitelist
+        self.n_input = n_input
+        self._sw_if_index = sw_if_index
+
+    @property
+    def sw_if_index(self):
+        return self._sw_if_index
+
+    @property
+    def count(self):
+        return len(self.whitelist)
+
+    def add_vpp_config(self):
+        self._test.vapi.acl_interface_set_etype_whitelist(
+            sw_if_index=self._sw_if_index, count=self.count,
+            n_input=self.n_input, whitelist=self.whitelist)
+        self._test.registry.register(self, self._test.logger)
+        return self
+
+    def remove_vpp_config(self):
+        self._test.vapi.acl_interface_set_etype_whitelist(
+            sw_if_index=self._sw_if_index, count=0, n_input=0, whitelist=[])
+
+    def query_vpp_config(self):
+        self._test.vapi.acl_interface_etype_whitelist_dump(
+            sw_if_index=self._sw_if_index)
+        return False
+
+    def object_id(self):
+        return ("acl-etype_wl-%d" % (self._sw_if_index))
+
+
+class VppAclInterface(VppObject):
+    """ VPP ACL Interface """
+
+    def __init__(self, test, sw_if_index, acls, n_input=0):
+        self._test = test
+        self._sw_if_index = sw_if_index
+        self.n_input = n_input
+        self.acls = acls
+
+    @property
+    def sw_if_index(self):
+        return self._sw_if_index
+
+    @property
+    def count(self):
+        return len(self.acls)
+
+    def encode_acls(self):
+        acls = []
+        for acl in self.acls:
+            acls.append(acl.acl_index)
+        return acls
+
+    def add_vpp_config(self, expect_error=False):
+        try:
+            reply = self._test.vapi.acl_interface_set_acl_list(
+                sw_if_index=self._sw_if_index, n_input=self.n_input,
+                count=self.count, acls=self.encode_acls())
+            self._test.registry.register(self, self._test.logger)
+            if expect_error:
+                self._test.fail("Unexpected api reply")
+            return self
+        except UnexpectedApiReturnValueError:
+            if not expect_error:
+                self._test.fail("Unexpected api reply")
+        return None
+
+    def remove_vpp_config(self, expect_error=False):
+        try:
+            reply = self._test.vapi.acl_interface_set_acl_list(
+                sw_if_index=self._sw_if_index, n_input=0, count=0, acls=[])
+            if expect_error:
+                self._test.fail("Unexpected api reply")
+        except UnexpectedApiReturnValueError:
+            if not expect_error:
+                self._test.fail("Unexpected api reply")
+
+    def query_vpp_config(self):
+        dump = self._test.vapi.acl_interface_list_dump(
+            sw_if_index=self._sw_if_index)
+        for acl_list in dump:
+            if acl_list.count > 0:
+                return True
+        return False
+
+    def object_id(self):
+        return ("acl-if-list-%d" % (self._sw_if_index))
+
+
+class MacipRule():
+    """ Mac Ip rule """
+
+    def __init__(self, is_permit, src_mac=0, src_mac_mask=0,
+                 src_prefix=IPv4Network('0.0.0.0/0')):
+        self.is_permit = is_permit
+        self.src_mac = src_mac
+        self.src_mac_mask = src_mac_mask
+        self.src_prefix = src_prefix
+
+    def encode(self):
+        return {'is_permit': self.is_permit, 'src_mac': self.src_mac,
+                'src_mac_mask': self.src_mac_mask,
+                'src_prefix': self.src_prefix}
+
+
+class VppMacipAcl(VppObject):
+    """ Vpp Mac Ip ACL """
+
+    def __init__(self, test, rules, acl_index=INVALID_INDEX, tag=None):
+        self._test = test
+        self._acl_index = acl_index
+        self.tag = tag
+        self.rules = rules
+
+    @property
+    def acl_index(self):
+        return self._acl_index
+
+    @property
+    def count(self):
+        return len(self.rules)
+
+    def encode_rules(self):
+        rules = []
+        for rule in self.rules:
+            rules.append(rule.encode())
+        return rules
+
+    def add_vpp_config(self, expect_error=False):
+        try:
+            reply = self._test.vapi.macip_acl_add_replace(
+                acl_index=self._acl_index, tag=self.tag, count=self.count,
+                r=self.encode_rules())
+            self._acl_index = reply.acl_index
+            self._test.registry.register(self, self._test.logger)
+            if expect_error:
+                self._test.fail("Unexpected api reply")
+            return self
+        except UnexpectedApiReturnValueError:
+            if not expect_error:
+                self._test.fail("Unexpected api reply")
+        return None
+
+    def remove_vpp_config(self, expect_error=False):
+        try:
+            self._test.vapi.macip_acl_del(acl_index=self._acl_index)
+            if expect_error:
+                self._test.fail("Unexpected api reply")
+        except UnexpectedApiReturnValueError:
+            if not expect_error:
+                self._test.fail("Unexpected api reply")
+
+    def dump(self):
+        return self._test.vapi.macip_acl_dump(acl_index=self._acl_index)
+
+    def query_vpp_config(self):
+        dump = self.dump()
+        for rule in dump:
+            if rule.acl_index == self._acl_index:
+                return True
+        return False
+
+    def object_id(self):
+        return ("macip-acl-%s-%d" % (self.tag, self._acl_index))
+
+
+class VppMacipAclInterface(VppObject):
+    """ VPP Mac Ip ACL Interface """
+
+    def __init__(self, test, sw_if_index, acls):
+        self._test = test
+        self._sw_if_index = sw_if_index
+        self.acls = acls
+
+    @property
+    def sw_if_index(self):
+        return self._sw_if_index
+
+    @property
+    def count(self):
+        return len(self.acls)
+
+    def add_vpp_config(self):
+        for acl in self.acls:
+            self._test.vapi.macip_acl_interface_add_del(
+                is_add=True, sw_if_index=self._sw_if_index,
+                acl_index=acl.acl_index)
+        self._test.registry.register(self, self._test.logger)
+
+    def remove_vpp_config(self):
+        for acl in self.acls:
+            self._test.vapi.macip_acl_interface_add_del(
+                is_add=False, sw_if_index=self._sw_if_index,
+                acl_index=acl.acl_index)
+
+    def dump(self):
+        return self._test.vapi.macip_acl_interface_list_dump(
+            sw_if_index=self._sw_if_index)
+
+    def query_vpp_config(self):
+        dump = self.dump()
+        for acl_list in dump:
+            for acl_index in acl_list.acls:
+                if acl_index != INVALID_INDEX:
+                    return True
+        return False
+
+    def object_id(self):
+        return ("macip-acl-if-list-%d" % (self._sw_if_index))