+
+ @staticmethod
+ def get_classify_table_data(node, table_index):
+ """Retrieve settings for classify table by ID.
+
+ :param node: VPP node to retrieve classify data from.
+ :param table_index: Index of a specific classify table.
+ :type node: dict
+ :type table_index: int
+ :returns: Classify table settings.
+ :rtype: dict
+ """
+ cmd = 'classify_table_info'
+ err_msg = "Failed to get 'classify_table_info' on host {host}".format(
+ host=node['host'])
+ args = dict(
+ table_id=int(table_index)
+ )
+ with PapiSocketExecutor(node) as papi_exec:
+ reply = papi_exec.add(cmd, **args).get_reply(err_msg)
+ return reply
+
+ @staticmethod
+ def get_classify_session_data(node, table_index):
+ """Retrieve settings for all classify sessions in a table.
+
+ :param node: VPP node to retrieve classify data from.
+ :param table_index: Index of a classify table.
+ :type node: dict
+ :type table_index: int
+ :returns: List of classify session settings.
+ :rtype: list or dict
+ """
+ cmd = "classify_session_dump"
+ args = dict(
+ table_id=int(table_index)
+ )
+ with PapiSocketExecutor(node) as papi_exec:
+ details = papi_exec.add(cmd, **args).get_details()
+
+ return details
+
+ @staticmethod
+ def show_classify_tables_verbose(node):
+ """Show classify tables verbose.
+
+ :param node: Topology node.
+ :type node: dict
+ :returns: Classify tables verbose data.
+ :rtype: str
+ """
+ return PapiSocketExecutor.run_cli_cmd(
+ node, "show classify tables verbose")
+
+ @staticmethod
+ def vpp_log_plugin_acl_settings(node):
+ """Retrieve configured settings from the ACL plugin and write to robot
+ log.
+
+ :param node: VPP node.
+ :type node: dict
+ """
+ PapiSocketExecutor.dump_and_log(node, ["acl_dump", ])
+
+ @staticmethod
+ def vpp_log_plugin_acl_interface_assignment(node):
+ """Retrieve interface assignment from the ACL plugin and write to robot
+ log.
+
+ :param node: VPP node.
+ :type node: dict
+ """
+ PapiSocketExecutor.dump_and_log(node, ["acl_interface_list_dump", ])
+
+ @staticmethod
+ def set_acl_list_for_interface(node, interface, acl_type, acl_idx=None):
+ """Set the list of input or output ACLs applied to the interface. It
+ unapplies any previously applied ACLs.
+
+ :param node: VPP node to set ACL on.
+ :param interface: Interface name or sw_if_index.
+ :param acl_type: Type of ACL(s) - input or output.
+ :param acl_idx: Index(ies) of ACLs to be applied on the interface.
+ :type node: dict
+ :type interface: str or int
+ :type acl_type: str
+ :type acl_idx: list
+ """
+ if isinstance(interface, basestring):
+ sw_if_index = Topology.get_interface_sw_index(node, interface)
+ else:
+ sw_if_index = int(interface)
+
+ acls = acl_idx if isinstance(acl_idx, list) else list()
+
+ Classify._acl_interface_set_acl_list(node=node,
+ sw_if_index=sw_if_index,
+ acl_type=acl_type,
+ acls=acls)
+
+ @staticmethod
+ def add_replace_acl_multi_entries(node, acl_idx=None, rules=None, tag=""):
+ """Add a new ACL or replace the existing one. To replace an existing
+ ACL, pass the ID of this ACL.
+
+ :param node: VPP node to set ACL on.
+ :param acl_idx: ID of ACL. (Optional)
+ :param rules: Required rules. (Optional)
+ :param tag: ACL tag (Optional).
+ :type node: dict
+ :type acl_idx: int
+ :type rules: str
+ :type tag: str
+ """
+ reg_ex_src_ip = re.compile(r'(src [0-9a-fA-F.:/\d{1,2}]*)')
+ reg_ex_dst_ip = re.compile(r'(dst [0-9a-fA-F.:/\d{1,2}]*)')
+ reg_ex_sport = re.compile(r'(sport \d{1,5})')
+ reg_ex_dport = re.compile(r'(dport \d{1,5})')
+ reg_ex_proto = re.compile(r'(proto \d{1,5})')
+
+ acl_rules = list()
+ for rule in rules.split(", "):
+ acl_rule = dict()
+ acl_rule["is_permit"] = 1 if "permit" in rule else 0
+ acl_rule["is_ipv6"] = 1 if "ipv6" in rule else 0
+
+ groups = re.search(reg_ex_src_ip, rule)
+ if groups:
+ grp = groups.group(1).split(' ')[1].split('/')
+ acl_rule["src_ip_addr"] = ip_address(unicode(grp[0])).packed
+ acl_rule["src_ip_prefix_len"] = int(grp[1])
+
+ groups = re.search(reg_ex_dst_ip, rule)
+ if groups:
+ grp = groups.group(1).split(' ')[1].split('/')
+ acl_rule["dst_ip_addr"] = ip_address(unicode(grp[0])).packed
+ acl_rule["dst_ip_prefix_len"] = int(grp[1])
+
+ groups = re.search(reg_ex_sport, rule)
+ if groups:
+ port = int(groups.group(1).split(' ')[1])
+ acl_rule["srcport_or_icmptype_first"] = port
+ acl_rule["srcport_or_icmptype_last"] = port
+ else:
+ acl_rule["srcport_or_icmptype_first"] = 0
+ acl_rule["srcport_or_icmptype_last"] = 65535
+
+ groups = re.search(reg_ex_dport, rule)
+ if groups:
+ port = int(groups.group(1).split(' ')[1])
+ acl_rule["dstport_or_icmpcode_first"] = port
+ acl_rule["dstport_or_icmpcode_last"] = port
+ else:
+ acl_rule["dstport_or_icmpcode_first"] = 0
+ acl_rule["dstport_or_icmpcode_last"] = 65535
+
+ groups = re.search(reg_ex_proto, rule)
+ if groups:
+ proto = int(groups.group(1).split(' ')[1])
+ acl_rule["proto"] = proto
+ else:
+ acl_rule["proto"] = 0
+
+ acl_rules.append(acl_rule)
+
+ Classify._acl_add_replace(
+ node, acl_idx=acl_idx, rules=acl_rules, tag=tag)
+
+ @staticmethod
+ def add_macip_acl_multi_entries(node, rules=""):
+ """Add a new MACIP ACL.
+
+ :param node: VPP node to set MACIP ACL on.
+ :param rules: Required MACIP rules.
+ :type node: dict
+ :type rules: str
+ """
+ reg_ex_ip = re.compile(r'(ip [0-9a-fA-F.:/\d{1,2}]*)')
+ reg_ex_mac = re.compile(r'(mac \S\S:\S\S:\S\S:\S\S:\S\S:\S\S)')
+ reg_ex_mask = re.compile(r'(mask \S\S:\S\S:\S\S:\S\S:\S\S:\S\S)')
+
+ acl_rules = list()
+ for rule in rules.split(", "):
+ acl_rule = dict()
+ acl_rule["is_permit"] = 1 if "permit" in rule else 0
+ acl_rule["is_ipv6"] = 1 if "ipv6" in rule else 0
+
+ groups = re.search(reg_ex_mac, rule)
+ if groups:
+ mac = groups.group(1).split(' ')[1].replace(':', '')
+ acl_rule["src_mac"] = binascii.unhexlify(unicode(mac))
+
+ groups = re.search(reg_ex_mask, rule)
+ if groups:
+ mask = groups.group(1).split(' ')[1].replace(':', '')
+ acl_rule["src_mac_mask"] = binascii.unhexlify(unicode(mask))
+
+ groups = re.search(reg_ex_ip, rule)
+ if groups:
+ grp = groups.group(1).split(' ')[1].split('/')
+ acl_rule["src_ip_addr"] = ip_address(unicode(grp[0])).packed
+ acl_rule["src_ip_prefix_len"] = int(grp[1])
+
+ acl_rules.append(acl_rule)
+
+ Classify._macip_acl_add(node=node, rules=acl_rules)
+
+ @staticmethod
+ def vpp_log_macip_acl_settings(node):
+ """Retrieve configured MACIP settings from the ACL plugin and write to
+ robot log.
+
+ :param node: VPP node.
+ :type node: dict
+ """
+ PapiSocketExecutor.dump_and_log(node, ["macip_acl_dump", ])
+
+ @staticmethod
+ def add_del_macip_acl_interface(node, interface, action, acl_idx):
+ """Apply/un-apply the MACIP ACL to/from a given interface.
+
+ :param node: VPP node to set MACIP ACL on.
+ :param interface: Interface name or sw_if_index.
+ :param action: Required action - add or del.
+ :param acl_idx: ACL index to be applied on the interface.
+ :type node: dict
+ :type interface: str or int
+ :type action: str
+ :type acl_idx: str or int
+ :raises RuntimeError: If unable to set MACIP ACL for the interface.
+ """
+ if isinstance(interface, basestring):
+ sw_if_index = Topology.get_interface_sw_index(node, interface)
+ else:
+ sw_if_index = interface
+
+ is_add = 1 if action == "add" else 0
+
+ cmd = 'macip_acl_interface_add_del'
+ err_msg = "Failed to get 'macip_acl_interface' on host {host}".format(
+ host=node['host'])
+ args = dict(
+ is_add=is_add,
+ sw_if_index=int(sw_if_index),
+ acl_index=int(acl_idx)
+ )
+ with PapiSocketExecutor(node) as papi_exec:
+ papi_exec.add(cmd, **args).get_reply(err_msg)
+
+ @staticmethod
+ def vpp_log_macip_acl_interface_assignment(node):
+ """Get interface list and associated MACIP ACLs and write to robot log.
+
+ :param node: VPP node.
+ :type node: dict
+ """
+ cmd = 'macip_acl_interface_get'
+ err_msg = "Failed to get 'macip_acl_interface' on host {host}".format(
+ host=node['host'])
+ with PapiSocketExecutor(node) as papi_exec:
+ reply = papi_exec.add(cmd).get_reply(err_msg)
+ logger.info(reply)