X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Fvpp_acl.py;h=958d6973f267443bb451c411c2b2ee8642b6d578;hb=d9b0c6fbf7aa5bd9af84264105b39c82028a4a29;hp=2d2f7ca257bfaff880ad8875a9876ea3e7d1af29;hpb=f90348bcb4afd0af2611cefc43b17ef3042b511c;p=vpp.git diff --git a/test/vpp_acl.py b/test/vpp_acl.py index 2d2f7ca257b..958d6973f26 100644 --- a/test/vpp_acl.py +++ b/test/vpp_acl.py @@ -7,7 +7,6 @@ from vpp_papi_provider import UnexpectedApiReturnValueError class VppAclPlugin(VppObject): - def __init__(self, test, enable_intf_counters=False): self._test = test self.enable_intf_counters = enable_intf_counters @@ -30,11 +29,11 @@ class VppAclPlugin(VppObject): pass def object_id(self): - return ("acl-plugin-%d" % (self._sw_if_index)) + return "acl-plugin-%d" % (self._sw_if_index) -class AclRule(): - """ ACL Rule """ +class AclRule: + """ACL Rule""" # port ranges PORTS_ALL = -1 @@ -70,10 +69,18 @@ class AclRule(): icmp6_code_from_2 = 8 icmp6_code_to_2 = 42 - def __init__(self, is_permit, src_prefix=IPv4Network('0.0.0.0/0'), - dst_prefix=IPv4Network('0.0.0.0/0'), - proto=0, ports=PORTS_ALL, sport_from=None, sport_to=None, - dport_from=None, dport_to=None): + def __init__( + self, + is_permit, + src_prefix=IPv4Network("0.0.0.0/0"), + dst_prefix=IPv4Network("0.0.0.0/0"), + proto=0, + ports=PORTS_ALL, + sport_from=None, + sport_to=None, + dport_from=None, + dport_to=None, + ): self.is_permit = is_permit self.src_prefix = src_prefix self.dst_prefix = dst_prefix @@ -92,9 +99,17 @@ class AclRule(): self.dport_to = dport_to def __copy__(self): - new_rule = AclRule(self.is_permit, self.src_prefix, self.dst_prefix, - self._proto, self._ports, self.sport_from, - self.sport_to, self.dport_from, self.dport_to) + new_rule = AclRule( + self.is_permit, + self.src_prefix, + self.dst_prefix, + self._proto, + self._ports, + self.sport_from, + self.sport_to, + self.dport_from, + self.dport_to, + ) return new_rule def update_ports(self): @@ -172,17 +187,20 @@ class AclRule(): self.update_ports() def encode(self): - return {'is_permit': self.is_permit, 'proto': self.proto, - 'srcport_or_icmptype_first': self.sport_from, - 'srcport_or_icmptype_last': self.sport_to, - 'src_prefix': self.src_prefix, - 'dstport_or_icmpcode_first': self.dport_from, - 'dstport_or_icmpcode_last': self.dport_to, - 'dst_prefix': self.dst_prefix} + return { + "is_permit": self.is_permit, + "proto": self.proto, + "srcport_or_icmptype_first": self.sport_from, + "srcport_or_icmptype_last": self.sport_to, + "src_prefix": self.src_prefix, + "dstport_or_icmpcode_first": self.dport_from, + "dstport_or_icmpcode_last": self.dport_to, + "dst_prefix": self.dst_prefix, + } class VppAcl(VppObject): - """ VPP ACL """ + """VPP ACL""" def __init__(self, test, rules, acl_index=INVALID_INDEX, tag=None): self._test = test @@ -211,8 +229,11 @@ class VppAcl(VppObject): def add_vpp_config(self, expect_error=False): try: reply = self._test.vapi.acl_add_replace( - acl_index=self._acl_index, tag=self.tag, count=self.count, - r=self.encode_rules()) + acl_index=self._acl_index, + tag=self.tag, + count=self.count, + r=self.encode_rules(), + ) self._acl_index = reply.acl_index self._test.registry.register(self, self._test.logger) if expect_error: @@ -247,11 +268,11 @@ class VppAcl(VppObject): return False def object_id(self): - return ("acl-%s-%d" % (self.tag, self._acl_index)) + return "acl-%s-%d" % (self.tag, self._acl_index) class VppEtypeWhitelist(VppObject): - """ VPP Etype Whitelist """ + """VPP Etype Whitelist""" def __init__(self, test, sw_if_index, whitelist, n_input=0): self._test = test @@ -269,26 +290,31 @@ class VppEtypeWhitelist(VppObject): def add_vpp_config(self): self._test.vapi.acl_interface_set_etype_whitelist( - sw_if_index=self._sw_if_index, count=self.count, - n_input=self.n_input, whitelist=self.whitelist) + sw_if_index=self._sw_if_index, + count=self.count, + n_input=self.n_input, + whitelist=self.whitelist, + ) self._test.registry.register(self, self._test.logger) return self def remove_vpp_config(self): self._test.vapi.acl_interface_set_etype_whitelist( - sw_if_index=self._sw_if_index, count=0, n_input=0, whitelist=[]) + sw_if_index=self._sw_if_index, count=0, n_input=0, whitelist=[] + ) def query_vpp_config(self): self._test.vapi.acl_interface_etype_whitelist_dump( - sw_if_index=self._sw_if_index) + sw_if_index=self._sw_if_index + ) return False def object_id(self): - return ("acl-etype_wl-%d" % (self._sw_if_index)) + return "acl-etype_wl-%d" % (self._sw_if_index) class VppAclInterface(VppObject): - """ VPP ACL Interface """ + """VPP ACL Interface""" def __init__(self, test, sw_if_index, acls, n_input=0): self._test = test @@ -313,8 +339,11 @@ class VppAclInterface(VppObject): def add_vpp_config(self, expect_error=False): try: reply = self._test.vapi.acl_interface_set_acl_list( - sw_if_index=self._sw_if_index, n_input=self.n_input, - count=self.count, acls=self.encode_acls()) + sw_if_index=self._sw_if_index, + n_input=self.n_input, + count=self.count, + acls=self.encode_acls(), + ) self._test.registry.register(self, self._test.logger) if expect_error: self._test.fail("Unexpected api reply") @@ -327,7 +356,8 @@ class VppAclInterface(VppObject): def remove_vpp_config(self, expect_error=False): try: reply = self._test.vapi.acl_interface_set_acl_list( - sw_if_index=self._sw_if_index, n_input=0, count=0, acls=[]) + sw_if_index=self._sw_if_index, n_input=0, count=0, acls=[] + ) if expect_error: self._test.fail("Unexpected api reply") except UnexpectedApiReturnValueError: @@ -335,35 +365,38 @@ class VppAclInterface(VppObject): self._test.fail("Unexpected api reply") def query_vpp_config(self): - dump = self._test.vapi.acl_interface_list_dump( - sw_if_index=self._sw_if_index) + dump = self._test.vapi.acl_interface_list_dump(sw_if_index=self._sw_if_index) for acl_list in dump: if acl_list.count > 0: return True return False def object_id(self): - return ("acl-if-list-%d" % (self._sw_if_index)) + return "acl-if-list-%d" % (self._sw_if_index) -class MacipRule(): - """ Mac Ip rule """ +class MacipRule: + """Mac Ip rule""" - def __init__(self, is_permit, src_mac=0, src_mac_mask=0, - src_prefix=IPv4Network('0.0.0.0/0')): + def __init__( + self, is_permit, src_mac=0, src_mac_mask=0, src_prefix=IPv4Network("0.0.0.0/0") + ): self.is_permit = is_permit self.src_mac = src_mac self.src_mac_mask = src_mac_mask self.src_prefix = src_prefix def encode(self): - return {'is_permit': self.is_permit, 'src_mac': self.src_mac, - 'src_mac_mask': self.src_mac_mask, - 'src_prefix': self.src_prefix} + return { + "is_permit": self.is_permit, + "src_mac": self.src_mac, + "src_mac_mask": self.src_mac_mask, + "src_prefix": self.src_prefix, + } class VppMacipAcl(VppObject): - """ Vpp Mac Ip ACL """ + """Vpp Mac Ip ACL""" def __init__(self, test, rules, acl_index=INVALID_INDEX, tag=None): self._test = test @@ -392,8 +425,11 @@ class VppMacipAcl(VppObject): def add_vpp_config(self, expect_error=False): try: reply = self._test.vapi.macip_acl_add_replace( - acl_index=self._acl_index, tag=self.tag, count=self.count, - r=self.encode_rules()) + acl_index=self._acl_index, + tag=self.tag, + count=self.count, + r=self.encode_rules(), + ) self._acl_index = reply.acl_index self._test.registry.register(self, self._test.logger) if expect_error: @@ -428,11 +464,11 @@ class VppMacipAcl(VppObject): return False def object_id(self): - return ("macip-acl-%s-%d" % (self.tag, self._acl_index)) + return "macip-acl-%s-%d" % (self.tag, self._acl_index) class VppMacipAclInterface(VppObject): - """ VPP Mac Ip ACL Interface """ + """VPP Mac Ip ACL Interface""" def __init__(self, test, sw_if_index, acls): self._test = test @@ -450,19 +486,20 @@ class VppMacipAclInterface(VppObject): def add_vpp_config(self): for acl in self.acls: self._test.vapi.macip_acl_interface_add_del( - is_add=True, sw_if_index=self._sw_if_index, - acl_index=acl.acl_index) + is_add=True, sw_if_index=self._sw_if_index, acl_index=acl.acl_index + ) self._test.registry.register(self, self._test.logger) def remove_vpp_config(self): for acl in self.acls: self._test.vapi.macip_acl_interface_add_del( - is_add=False, sw_if_index=self._sw_if_index, - acl_index=acl.acl_index) + is_add=False, sw_if_index=self._sw_if_index, acl_index=acl.acl_index + ) def dump(self): return self._test.vapi.macip_acl_interface_list_dump( - sw_if_index=self._sw_if_index) + sw_if_index=self._sw_if_index + ) def query_vpp_config(self): dump = self.dump() @@ -473,4 +510,4 @@ class VppMacipAclInterface(VppObject): return False def object_id(self): - return ("macip-acl-if-list-%d" % (self._sw_if_index)) + return "macip-acl-if-list-%d" % (self._sw_if_index)