tests: move test source to vpp/test
[vpp.git] / src / plugins / acl / test / vpp_acl.py
diff --git a/src/plugins/acl/test/vpp_acl.py b/src/plugins/acl/test/vpp_acl.py
deleted file mode 100644 (file)
index 2d2f7ca..0000000
+++ /dev/null
@@ -1,476 +0,0 @@
-from ipaddress import IPv4Network
-
-from vpp_object import VppObject
-from vpp_papi import VppEnum
-from vpp_ip import INVALID_INDEX
-from vpp_papi_provider import UnexpectedApiReturnValueError
-
-
-class VppAclPlugin(VppObject):
-
-    def __init__(self, test, enable_intf_counters=False):
-        self._test = test
-        self.enable_intf_counters = enable_intf_counters
-
-    @property
-    def enable_intf_counters(self):
-        return self._enable_intf_counters
-
-    @enable_intf_counters.setter
-    def enable_intf_counters(self, enable):
-        self.vapi.acl_stats_intf_counters_enable(enable=enable)
-
-    def add_vpp_config(self):
-        pass
-
-    def remove_vpp_config(self):
-        pass
-
-    def query_vpp_config(self):
-        pass
-
-    def object_id(self):
-        return ("acl-plugin-%d" % (self._sw_if_index))
-
-
-class AclRule():
-    """ ACL Rule """
-
-    # port ranges
-    PORTS_ALL = -1
-    PORTS_RANGE = 0
-    PORTS_RANGE_2 = 1
-    udp_sport_from = 10
-    udp_sport_to = udp_sport_from + 5
-    udp_dport_from = 20000
-    udp_dport_to = udp_dport_from + 5000
-    tcp_sport_from = 30
-    tcp_sport_to = tcp_sport_from + 5
-    tcp_dport_from = 40000
-    tcp_dport_to = tcp_dport_from + 5000
-
-    udp_sport_from_2 = 90
-    udp_sport_to_2 = udp_sport_from_2 + 5
-    udp_dport_from_2 = 30000
-    udp_dport_to_2 = udp_dport_from_2 + 5000
-    tcp_sport_from_2 = 130
-    tcp_sport_to_2 = tcp_sport_from_2 + 5
-    tcp_dport_from_2 = 20000
-    tcp_dport_to_2 = tcp_dport_from_2 + 5000
-
-    icmp4_type = 8  # echo request
-    icmp4_code = 3
-    icmp6_type = 128  # echo request
-    icmp6_code = 3
-
-    icmp4_type_2 = 8
-    icmp4_code_from_2 = 5
-    icmp4_code_to_2 = 20
-    icmp6_type_2 = 128
-    icmp6_code_from_2 = 8
-    icmp6_code_to_2 = 42
-
-    def __init__(self, is_permit, src_prefix=IPv4Network('0.0.0.0/0'),
-                 dst_prefix=IPv4Network('0.0.0.0/0'),
-                 proto=0, ports=PORTS_ALL, sport_from=None, sport_to=None,
-                 dport_from=None, dport_to=None):
-        self.is_permit = is_permit
-        self.src_prefix = src_prefix
-        self.dst_prefix = dst_prefix
-        self._proto = proto
-        self._ports = ports
-        # assign ports by range
-        self.update_ports()
-        # assign specified ports
-        if sport_from:
-            self.sport_from = sport_from
-        if sport_to:
-            self.sport_to = sport_to
-        if dport_from:
-            self.dport_from = dport_from
-        if dport_to:
-            self.dport_to = dport_to
-
-    def __copy__(self):
-        new_rule = AclRule(self.is_permit, self.src_prefix, self.dst_prefix,
-                           self._proto, self._ports, self.sport_from,
-                           self.sport_to, self.dport_from, self.dport_to)
-        return new_rule
-
-    def update_ports(self):
-        if self._ports == self.PORTS_ALL:
-            self.sport_from = 0
-            self.dport_from = 0
-            self.sport_to = 65535
-            if self._proto == 1 or self._proto == 58:
-                self.sport_to = 255
-            self.dport_to = self.sport_to
-        elif self._ports == self.PORTS_RANGE:
-            if self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP:
-                self.sport_from = self.icmp4_type
-                self.sport_to = self.icmp4_type
-                self.dport_from = self.icmp4_code
-                self.dport_to = self.icmp4_code
-            elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP6:
-                self.sport_from = self.icmp6_type
-                self.sport_to = self.icmp6_type
-                self.dport_from = self.icmp6_code
-                self.dport_to = self.icmp6_code
-            elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_TCP:
-                self.sport_from = self.tcp_sport_from
-                self.sport_to = self.tcp_sport_to
-                self.dport_from = self.tcp_dport_from
-                self.dport_to = self.tcp_dport_to
-            elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP:
-                self.sport_from = self.udp_sport_from
-                self.sport_to = self.udp_sport_to
-                self.dport_from = self.udp_dport_from
-                self.dport_to = self.udp_dport_to
-        elif self._ports == self.PORTS_RANGE_2:
-            if self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP:
-                self.sport_from = self.icmp4_type_2
-                self.sport_to = self.icmp4_type_2
-                self.dport_from = self.icmp4_code_from_2
-                self.dport_to = self.icmp4_code_to_2
-            elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP6:
-                self.sport_from = self.icmp6_type_2
-                self.sport_to = self.icmp6_type_2
-                self.dport_from = self.icmp6_code_from_2
-                self.dport_to = self.icmp6_code_to_2
-            elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_TCP:
-                self.sport_from = self.tcp_sport_from_2
-                self.sport_to = self.tcp_sport_to_2
-                self.dport_from = self.tcp_dport_from_2
-                self.dport_to = self.tcp_dport_to_2
-            elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP:
-                self.sport_from = self.udp_sport_from_2
-                self.sport_to = self.udp_sport_to_2
-                self.dport_from = self.udp_dport_from_2
-                self.dport_to = self.udp_dport_to_2
-        else:
-            self.sport_from = self._ports
-            self.sport_to = self._ports
-            self.dport_from = self._ports
-            self.dport_to = self._ports
-
-    @property
-    def proto(self):
-        return self._proto
-
-    @proto.setter
-    def proto(self, proto):
-        self._proto = proto
-        self.update_ports()
-
-    @property
-    def ports(self):
-        return self._ports
-
-    @ports.setter
-    def ports(self, ports):
-        self._ports = ports
-        self.update_ports()
-
-    def encode(self):
-        return {'is_permit': self.is_permit, 'proto': self.proto,
-                'srcport_or_icmptype_first': self.sport_from,
-                'srcport_or_icmptype_last': self.sport_to,
-                'src_prefix': self.src_prefix,
-                'dstport_or_icmpcode_first': self.dport_from,
-                'dstport_or_icmpcode_last': self.dport_to,
-                'dst_prefix': self.dst_prefix}
-
-
-class VppAcl(VppObject):
-    """ VPP ACL """
-
-    def __init__(self, test, rules, acl_index=INVALID_INDEX, tag=None):
-        self._test = test
-        self._acl_index = acl_index
-        self.tag = tag
-        self._rules = rules
-
-    @property
-    def rules(self):
-        return self._rules
-
-    @property
-    def acl_index(self):
-        return self._acl_index
-
-    @property
-    def count(self):
-        return len(self._rules)
-
-    def encode_rules(self):
-        rules = []
-        for rule in self._rules:
-            rules.append(rule.encode())
-        return rules
-
-    def add_vpp_config(self, expect_error=False):
-        try:
-            reply = self._test.vapi.acl_add_replace(
-                acl_index=self._acl_index, tag=self.tag, count=self.count,
-                r=self.encode_rules())
-            self._acl_index = reply.acl_index
-            self._test.registry.register(self, self._test.logger)
-            if expect_error:
-                self._test.fail("Unexpected api reply")
-            return self
-        except UnexpectedApiReturnValueError:
-            if not expect_error:
-                self._test.fail("Unexpected api reply")
-        return None
-
-    def modify_vpp_config(self, rules):
-        self._rules = rules
-        self.add_vpp_config()
-
-    def remove_vpp_config(self, expect_error=False):
-        try:
-            self._test.vapi.acl_del(acl_index=self._acl_index)
-            if expect_error:
-                self._test.fail("Unexpected api reply")
-        except UnexpectedApiReturnValueError:
-            if not expect_error:
-                self._test.fail("Unexpected api reply")
-
-    def dump(self):
-        return self._test.vapi.acl_dump(acl_index=self._acl_index)
-
-    def query_vpp_config(self):
-        dump = self.dump()
-        for rule in dump:
-            if rule.acl_index == self._acl_index:
-                return True
-        return False
-
-    def object_id(self):
-        return ("acl-%s-%d" % (self.tag, self._acl_index))
-
-
-class VppEtypeWhitelist(VppObject):
-    """ VPP Etype Whitelist """
-
-    def __init__(self, test, sw_if_index, whitelist, n_input=0):
-        self._test = test
-        self.whitelist = whitelist
-        self.n_input = n_input
-        self._sw_if_index = sw_if_index
-
-    @property
-    def sw_if_index(self):
-        return self._sw_if_index
-
-    @property
-    def count(self):
-        return len(self.whitelist)
-
-    def add_vpp_config(self):
-        self._test.vapi.acl_interface_set_etype_whitelist(
-            sw_if_index=self._sw_if_index, count=self.count,
-            n_input=self.n_input, whitelist=self.whitelist)
-        self._test.registry.register(self, self._test.logger)
-        return self
-
-    def remove_vpp_config(self):
-        self._test.vapi.acl_interface_set_etype_whitelist(
-            sw_if_index=self._sw_if_index, count=0, n_input=0, whitelist=[])
-
-    def query_vpp_config(self):
-        self._test.vapi.acl_interface_etype_whitelist_dump(
-            sw_if_index=self._sw_if_index)
-        return False
-
-    def object_id(self):
-        return ("acl-etype_wl-%d" % (self._sw_if_index))
-
-
-class VppAclInterface(VppObject):
-    """ VPP ACL Interface """
-
-    def __init__(self, test, sw_if_index, acls, n_input=0):
-        self._test = test
-        self._sw_if_index = sw_if_index
-        self.n_input = n_input
-        self.acls = acls
-
-    @property
-    def sw_if_index(self):
-        return self._sw_if_index
-
-    @property
-    def count(self):
-        return len(self.acls)
-
-    def encode_acls(self):
-        acls = []
-        for acl in self.acls:
-            acls.append(acl.acl_index)
-        return acls
-
-    def add_vpp_config(self, expect_error=False):
-        try:
-            reply = self._test.vapi.acl_interface_set_acl_list(
-                sw_if_index=self._sw_if_index, n_input=self.n_input,
-                count=self.count, acls=self.encode_acls())
-            self._test.registry.register(self, self._test.logger)
-            if expect_error:
-                self._test.fail("Unexpected api reply")
-            return self
-        except UnexpectedApiReturnValueError:
-            if not expect_error:
-                self._test.fail("Unexpected api reply")
-        return None
-
-    def remove_vpp_config(self, expect_error=False):
-        try:
-            reply = self._test.vapi.acl_interface_set_acl_list(
-                sw_if_index=self._sw_if_index, n_input=0, count=0, acls=[])
-            if expect_error:
-                self._test.fail("Unexpected api reply")
-        except UnexpectedApiReturnValueError:
-            if not expect_error:
-                self._test.fail("Unexpected api reply")
-
-    def query_vpp_config(self):
-        dump = self._test.vapi.acl_interface_list_dump(
-            sw_if_index=self._sw_if_index)
-        for acl_list in dump:
-            if acl_list.count > 0:
-                return True
-        return False
-
-    def object_id(self):
-        return ("acl-if-list-%d" % (self._sw_if_index))
-
-
-class MacipRule():
-    """ Mac Ip rule """
-
-    def __init__(self, is_permit, src_mac=0, src_mac_mask=0,
-                 src_prefix=IPv4Network('0.0.0.0/0')):
-        self.is_permit = is_permit
-        self.src_mac = src_mac
-        self.src_mac_mask = src_mac_mask
-        self.src_prefix = src_prefix
-
-    def encode(self):
-        return {'is_permit': self.is_permit, 'src_mac': self.src_mac,
-                'src_mac_mask': self.src_mac_mask,
-                'src_prefix': self.src_prefix}
-
-
-class VppMacipAcl(VppObject):
-    """ Vpp Mac Ip ACL """
-
-    def __init__(self, test, rules, acl_index=INVALID_INDEX, tag=None):
-        self._test = test
-        self._acl_index = acl_index
-        self.tag = tag
-        self._rules = rules
-
-    @property
-    def acl_index(self):
-        return self._acl_index
-
-    @property
-    def rules(self):
-        return self._rules
-
-    @property
-    def count(self):
-        return len(self._rules)
-
-    def encode_rules(self):
-        rules = []
-        for rule in self._rules:
-            rules.append(rule.encode())
-        return rules
-
-    def add_vpp_config(self, expect_error=False):
-        try:
-            reply = self._test.vapi.macip_acl_add_replace(
-                acl_index=self._acl_index, tag=self.tag, count=self.count,
-                r=self.encode_rules())
-            self._acl_index = reply.acl_index
-            self._test.registry.register(self, self._test.logger)
-            if expect_error:
-                self._test.fail("Unexpected api reply")
-            return self
-        except UnexpectedApiReturnValueError:
-            if not expect_error:
-                self._test.fail("Unexpected api reply")
-        return None
-
-    def modify_vpp_config(self, rules):
-        self._rules = rules
-        self.add_vpp_config()
-
-    def remove_vpp_config(self, expect_error=False):
-        try:
-            self._test.vapi.macip_acl_del(acl_index=self._acl_index)
-            if expect_error:
-                self._test.fail("Unexpected api reply")
-        except UnexpectedApiReturnValueError:
-            if not expect_error:
-                self._test.fail("Unexpected api reply")
-
-    def dump(self):
-        return self._test.vapi.macip_acl_dump(acl_index=self._acl_index)
-
-    def query_vpp_config(self):
-        dump = self.dump()
-        for rule in dump:
-            if rule.acl_index == self._acl_index:
-                return True
-        return False
-
-    def object_id(self):
-        return ("macip-acl-%s-%d" % (self.tag, self._acl_index))
-
-
-class VppMacipAclInterface(VppObject):
-    """ VPP Mac Ip ACL Interface """
-
-    def __init__(self, test, sw_if_index, acls):
-        self._test = test
-        self._sw_if_index = sw_if_index
-        self.acls = acls
-
-    @property
-    def sw_if_index(self):
-        return self._sw_if_index
-
-    @property
-    def count(self):
-        return len(self.acls)
-
-    def add_vpp_config(self):
-        for acl in self.acls:
-            self._test.vapi.macip_acl_interface_add_del(
-                is_add=True, sw_if_index=self._sw_if_index,
-                acl_index=acl.acl_index)
-        self._test.registry.register(self, self._test.logger)
-
-    def remove_vpp_config(self):
-        for acl in self.acls:
-            self._test.vapi.macip_acl_interface_add_del(
-                is_add=False, sw_if_index=self._sw_if_index,
-                acl_index=acl.acl_index)
-
-    def dump(self):
-        return self._test.vapi.macip_acl_interface_list_dump(
-            sw_if_index=self._sw_if_index)
-
-    def query_vpp_config(self):
-        dump = self.dump()
-        for acl_list in dump:
-            for acl_index in acl_list.acls:
-                if acl_index != INVALID_INDEX:
-                    return True
-        return False
-
-    def object_id(self):
-        return ("macip-acl-if-list-%d" % (self._sw_if_index))