-# Copyright (c) 2016 Cisco and/or its affiliates.
+# Copyright (c) 2019 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
"""This module implements keywords to manipulate ACL data structures using
Honeycomb REST API."""
+
+from robot.api import logger
+
from resources.libraries.python.topology import Topology
from resources.libraries.python.HTTPRequest import HTTPCodes
from resources.libraries.python.honeycomb.HoneycombSetup import HoneycombError
:type node: dict
:type path: str
:type data: dict
- :return: Content of response.
+ :returns: Content of response.
:rtype: bytearray
:raises HoneycombError: If the status code in response to PUT is not
- 200 = OK.
+ 200 = OK.
"""
if data:
status_code, resp = HcUtil.\
delete_honeycomb_data(node, "config_classify_table", path)
- if status_code != HTTPCodes.OK:
- raise HoneycombError(
- "The configuration of classify table was not successful. "
- "Status code: {0}.".format(status_code))
+ if status_code not in (HTTPCodes.OK, HTTPCodes.ACCEPTED):
+ if data is None and '"error-tag":"data-missing"' in resp:
+ logger.debug("data does not exist in path.")
+ else:
+ raise HoneycombError(
+ "The configuration of classify table was not successful. "
+ "Status code: {0}.".format(status_code))
return resp
@staticmethod
:param table: Classify table to be added.
:type node: dict
:type table: dict
- :return: Content of response.
+ :returns: Content of response.
:rtype: bytearray
"""
:param node: Honeycomb node.
:type node: dict
- :return: Content of response.
+ :returns: Content of response.
:rtype: bytearray
"""
:param table_name: Name of the classify table to be removed.
:type node: dict
:type table_name: str
- :return: Content of response.
+ :returns: Content of response.
:rtype: bytearray
"""
:param node: Honeycomb node.
:type node: dict
- :return: List of classify tables.
+ :returns: List of classify tables.
:rtype: list
"""
raise HoneycombError(
"Not possible to get operational information about the "
"classify tables. Status code: {0}.".format(status_code))
- try:
- return resp["vpp-classifier"]["classify-table"]
- except (KeyError, TypeError):
- return []
+
+ return resp["vpp-classifier-state"]["classify-table"]
@staticmethod
def get_classify_table_oper_data(node, table_name):
:param table_name: Name of the classify table.
:type node: dict
:type table_name: str
- :return: Operational data about the given classify table.
+ :returns: Operational data about the given classify table.
:rtype: dict
"""
- path = "/classify-table/" + table_name
- status_code, resp = HcUtil.\
- get_honeycomb_data(node, "oper_classify_table", path)
-
- if status_code != HTTPCodes.OK:
- raise HoneycombError(
- "Not possible to get operational information about the "
- "classify tables. Status code: {0}.".format(status_code))
- try:
- return resp["classify-table"][0]
- except (KeyError, TypeError):
- return []
+ tables = ACLKeywords.get_all_classify_tables_oper_data(node)
+ for table in tables:
+ if table["name"] == table_name:
+ return table
+ raise HoneycombError("Table {0} not found in ACL table list.".format(
+ table_name))
@staticmethod
def get_all_classify_tables_cfg_data(node):
:param node: Honeycomb node.
:type node: dict
- :return: List of classify tables.
+ :returns: List of classify tables.
:rtype: list
"""
:type node: dict
:type table_name: str
:type session: dict
- :return: Content of response.
+ :returns: Content of response.
:rtype: bytearray
"""
:type node: dict
:type table_name: str
:type session_match: str
- :return: Content of response.
+ :returns: Content of response.
:rtype: bytearray
"""
:param table_name: Name of the classify table.
:type node: dict
:type table_name: str
- :return: List of classify sessions present in the classify table.
+ :returns: List of classify sessions present in the classify table.
:rtype: list
"""
table_data = ACLKeywords.get_classify_table_oper_data(node, table_name)
- try:
- return table_data["classify-table"][0]["classify-session"]
- except (KeyError, TypeError):
- return []
+
+ return table_data["classify-session"]
@staticmethod
def get_classify_session_oper_data(node, table_name, session_match):
:type node: dict
:type table_name: str
:type session_match: str
- :return: Classify session operational data.
+ :returns: Classify session operational data.
:rtype: dict
+ :raises HoneycombError: If no session the specified match Id is found.
"""
- path = "/classify-table/" + table_name + \
- "/classify-session/" + session_match
- status_code, resp = HcUtil.\
- get_honeycomb_data(node, "oper_classify_table", path)
-
- if status_code != HTTPCodes.OK:
- raise HoneycombError(
- "Not possible to get operational information about the "
- "classify tables. Status code: {0}.".format(status_code))
- try:
- return resp["classify-session"][0]
- except (KeyError, TypeError):
- return {}
+ sessions = ACLKeywords.get_all_classify_sessions_oper_data(
+ node, table_name)
+ for session in sessions:
+ if session["match"] == session_match:
+ return session
+ raise HoneycombError(
+ "Session with match value \"{0}\" not found"
+ " under ACL table {1}.".format(session_match, table_name))
@staticmethod
- def create_ietf_classify_chain(node, list_name, layer, data):
+ def create_acl_plugin_classify_chain(node, list_name, data):
"""Create classify chain using the ietf-acl node.
:param node: Honeycomb node.
:param list_name: Name for the classify list.
- :param layer: Network layer to classify on.
:param data: Dictionary of settings to send to Honeycomb.
:type node: dict
:type list_name: str
- :type layer: string
:type data: dict
-
- :return: Content of response.
+ :returns: Content of response.
:rtype: bytearray
:raises HoneycombError: If the operation fails.
"""
- layer = layer.lower()
- suffix_dict = {"l2": "eth",
- "l3_ip4": "ipv4",
- "l3_ip6": "ipv6",
- "mixed": "mixed"
- }
- if layer == "l4":
- raise NotImplementedError
- try:
- suffix = suffix_dict[layer]
- except KeyError:
- raise ValueError("Unexpected value of layer argument {0}."
- "Valid options are: {1}"
- .format(layer, suffix_dict.keys()))
-
- if layer == "mixed":
- path = "/acl/vpp-acl:{0}-acl/{1}"
- else:
- path = "/acl/ietf-access-control-list:{0}-acl/{1}"
- path = path.format(suffix, list_name)
+ path = "/acl/{0}".format(list_name)
status_code, resp = HcUtil.put_honeycomb_data(
- node, "config_ietf_classify_chain", data, path)
+ node, "config_plugin_acl", data, path)
- if status_code != HTTPCodes.OK:
+ if status_code not in (HTTPCodes.OK, HTTPCodes.ACCEPTED):
raise HoneycombError(
"Could not create classify chain."
"Status code: {0}.".format(status_code))
return resp
@staticmethod
- def set_ietf_interface_acl(node, interface, layer, direction, list_name,
- default_action, mode=None):
+ def set_acl_plugin_interface(node, interface, acl_name, direction):
"""Assign an interface to an ietf-acl classify chain.
:param node: Honeycomb node.
:param interface: Name of an interface on the node.
- :param layer: Network layer to classify packets on.
- Valid options are: L2, L3, L4. Mixed ACL not supported yet.
- :param direction: Classify incoming or outgiong packets.
- Valid options are: ingress, egress
- :param list_name: Name of an ietf-acl classify chain.
- :param default_action: Default classifier action: permit or deny.
- :param mode: When using mixed layers, this specifies operational mode
- of the interface - L2 or L3. If layer is not "mixed", this argument
- will be ignored.
+ :param acl_name: Name of an ACL chain configured through ACL-plugin.
+ :param direction: Classify incoming or outgoing packets.
+ Valid options are: ingress, egress
:type node: dict
:type interface: str or int
- :type layer: str
+ :type acl_name: str
:type direction: str
- :type list_name: str
- :type default_action: str
- :type mode: str
-
- :return: Content of response.
+ :returns: Content of response.
:rtype: bytearray
+ :raises ValueError: If the direction argument is incorrect.
:raises HoneycombError: If the operation fails.
"""
- layer = layer.lower()
- if mode is not None:
- mode = mode.lower()
interface = Topology.convert_interface_reference(
node, interface, "name")
"Valid options are: ingress, egress."
.format(direction))
- path = "/interface/{0}/ietf-acl/{1}/access-lists".format(
+ path = "/attachment-points/interface/{0}/{1}/acl-sets/".format(
interface, direction)
- types = {
- "ietf": "ietf-access-control-list:{0}-acl",
- "vpp": "vpp-acl:{0}-acl"}
- layers = {
- "l2": {"mode": "l2", "acl_type": types['ietf'].format("eth")},
- "l3_ip4": {"mode": "l3", "acl_type": types['ietf'].format("ipv4")},
- "l3_ip6": {"mode": "l3", "acl_type": types['ietf'].format("ipv6")},
- "mixed": {"mode": mode, "acl_type": types['vpp'].format("mixed")}
- }
-
- if layer == "L4":
- raise NotImplementedError
- else:
- try:
- data = {
- "access-lists": {
- "acl": [
- {
- "type": layers[layer]['acl_type'],
- "name": list_name
- }
- ],
- "default-action": default_action,
- "mode": layers[layer]['mode']
- }
+ data = {
+ "acl-sets": {
+ "acl-set": {
+ "name": acl_name
}
- except KeyError:
- raise ValueError("Unknown network layer {0}. "
- "Valid options are: {1}".format(
- layer, layers.keys()))
+ }
+ }
status_code, resp = HcUtil.put_honeycomb_data(
- node, "config_vpp_interfaces", data, path)
+ node, "config_plugin_acl", data, path)
- if status_code != HTTPCodes.OK:
+ if status_code not in (HTTPCodes.OK, HTTPCodes.ACCEPTED):
raise HoneycombError(
"Could not configure ACL on interface. "
"Status code: {0}.".format(status_code))
return resp
@staticmethod
- def delete_ietf_interface_acls(node, interface):
- """Remove all ietf-acl assignments from an interface.
+ def delete_interface_plugin_acls(node, interface):
+ """Remove all plugin-acl assignments from an interface.
:param node: Honeycomb node.
:param interface: Name of an interface on the node.
:type node: dict
- :type interface: str or int"""
+ :type interface: str or int
+ """
interface = Topology.convert_interface_reference(
node, interface, "name")
interface = interface.replace("/", "%2F")
- path = "/interface/{0}/ietf-acl/".format(interface)
+ path = "/attachment-points/interface/{0}/".format(interface)
status_code, _ = HcUtil.delete_honeycomb_data(
- node, "config_vpp_interfaces", path)
+ node, "config_plugin_acl", path)
if status_code != HTTPCodes.OK:
raise HoneycombError(
"Status code: {0}.".format(status_code))
@staticmethod
- def delete_ietf_classify_chains(node):
- """Remove all classify chains from the ietf-acl node.
+ def delete_acl_plugin_classify_chains(node):
+ """Remove all plugin-ACL classify chains.
:param node: Honeycomb node.
:type node: dict
"""
status_code, _ = HcUtil.delete_honeycomb_data(
- node, "config_ietf_classify_chain")
+ node, "config_plugin_acl")
if status_code != HTTPCodes.OK:
raise HoneycombError(
- "Could not remove ietf-acl chain. "
+ "Could not remove plugin-acl chain. "
"Status code: {0}.".format(status_code))