-# Copyright (c) 2018 Cisco and/or its affiliates.
+# Copyright (c) 2020 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
"""Classify utilities library."""
+import re
+
+from ipaddress import ip_address
+
from robot.api import logger
-from resources.libraries.python.VatExecutor import VatExecutor, VatTerminal
-from resources.libraries.python.topology import Topology
+from resources.libraries.python.Constants import Constants
+from resources.libraries.python.InterfaceUtil import InterfaceUtil
+from resources.libraries.python.IPUtil import IPUtil
+from resources.libraries.python.PapiExecutor import PapiSocketExecutor
-class Classify(object):
+class Classify:
"""Classify utilities."""
@staticmethod
- def vpp_creates_classify_table_l3(node, ip_version, direction):
- """Create classify table for IP address filtering.
+ def _build_mac_mask(dst_mac=u"", src_mac=u"", ether_type=u""):
+ """Build MAC ACL mask data in bytes format.
- :param node: VPP node to create classify table.
- :param ip_version: Version of IP protocol.
- :param direction: Direction of traffic - src/dst.
- :type node: dict
- :type ip_version: str
- :type direction: str
- :returns: (table_index, skip_n, match_n)
- table_index: Classify table index.
- skip_n: Number of skip vectors.
- match_n: Number of match vectors.
- :rtype: tuple(int, int, int)
- :raises RuntimeError: If VPP can't create table.
+ :param dst_mac: Source MAC address <0-ffffffffffff>.
+ :param src_mac: Destination MAC address <0-ffffffffffff>.
+ :param ether_type: Ethernet type <0-ffff>.
+ :type dst_mac: str
+ :type src_mac: str
+ :type ether_type: str
+ :returns MAC ACL mask in bytes format.
+ :rtype: bytes
"""
+ return bytes.fromhex(
+ f"{dst_mac.replace(u':', u'')!s:0>12}"
+ f"{src_mac.replace(u':', u'')!s:0>12}"
+ f"{ether_type!s:0>4}"
+ ).rstrip(b'\0')
- output = VatExecutor.cmd_from_template(node, "classify_add_table.vat",
- ip_version=ip_version,
- direction=direction)
-
- if output[0]["retval"] == 0:
- table_index = output[0]["new_table_index"]
- skip_n = output[0]["skip_n_vectors"]
- match_n = output[0]["match_n_vectors"]
- logger.trace('Classify table with table_index {} created on node {}'
- .format(table_index, node['host']))
- else:
- raise RuntimeError('Unable to create classify table on node {}'
- .format(node['host']))
+ @staticmethod
+ def _build_ip_mask(
+ proto=u"", src_ip=u"", dst_ip=u"", src_port=u"", dst_port=u""):
+ """Build IP ACL mask data in bytes format.
+
+ :param proto: Protocol number <0-ff>.
+ :param src_ip: Source ip address <0-ffffffff>.
+ :param dst_ip: Destination ip address <0-ffffffff>.
+ :param src_port: Source port number <0-ffff>.
+ :param str dst_port: Destination port number <0-ffff>.
+ :type proto: str
+ :type src_ip: str
+ :type dst_ip: str
+ :type src_port: str
+ :type dst_port:src
+ :returns: IP mask in bytes format.
+ :rtype: bytes
+ """
+ return bytes.fromhex(
+ f"{proto!s:0>20}{src_ip!s:0>12}{dst_ip!s:0>8}{src_port!s:0>4}"
+ f"{dst_port!s:0>4}"
+ ).rstrip(b'\0')
- return table_index, skip_n, match_n
+ @staticmethod
+ def _build_ip6_mask(
+ next_hdr=u"", src_ip=u"", dst_ip=u"", src_port=u"", dst_port=u""):
+ """Build IPv6 ACL mask data in bytes format.
+
+ :param next_hdr: Next header number <0-ff>.
+ :param src_ip: Source ip address <0-ffffffff>.
+ :param dst_ip: Destination ip address <0-ffffffff>.
+ :param src_port: Source port number <0-ffff>.
+ :param dst_port: Destination port number <0-ffff>.
+ :type next_hdr: str
+ :type src_ip: str
+ :type dst_ip: str
+ :type src_port: str
+ :type dst_port: str
+ :returns: IPv6 ACL mask in bytes format.
+ :rtype: bytes
+ """
+ return bytes.fromhex(
+ f"{next_hdr!s:0>14}{src_ip!s:0>34}{dst_ip!s:0>32}{src_port!s:0>4}"
+ f"{dst_port!s:0>4}"
+ ).rstrip(b'\0')
@staticmethod
- def vpp_creates_classify_table_l2(node, direction):
- """Create classify table for MAC address filtering.
+ def _build_mac_match(dst_mac=u"", src_mac=u"", ether_type=u""):
+ """Build MAC ACL match data in bytes format.
- :param node: VPP node to create classify table.
- :param direction: Direction of traffic - src/dst.
- :type node: dict
- :type direction: str
- :returns: (table_index, skip_n, match_n)
- table_index: Classify table index.
- skip_n: Number of skip vectors.
- match_n: Number of match vectors.
- :rtype: tuple(int, int, int)
- :raises RuntimeError: If VPP can't create table.
+ :param dst_mac: Source MAC address <x:x:x:x:x:x>.
+ :param src_mac: Destination MAC address <x:x:x:x:x:x>.
+ :param ether_type: Ethernet type <0-ffff>.
+ :type dst_mac: str
+ :type src_mac: str
+ :type ether_type: str
+ :returns: MAC ACL match data in bytes format.
+ :rtype: bytes
"""
- output = VatExecutor.cmd_from_template(node,
- "classify_add_table_l2.vat",
- direction=direction)
-
- if output[0]["retval"] == 0:
- table_index = output[0]["new_table_index"]
- skip_n = output[0]["skip_n_vectors"]
- match_n = output[0]["match_n_vectors"]
- logger.trace('Classify table with table_index {} created on node {}'
- .format(table_index, node['host']))
- else:
- raise RuntimeError('Unable to create classify table on node {}'
- .format(node['host']))
+ return bytes.fromhex(
+ f"{dst_mac.replace(u':', u'')!s:0>12}"
+ f"{src_mac.replace(u':', u'')!s:0>12}"
+ f"{ether_type!s:0>4}"
+ ).rstrip(b'\0')
+
+ @staticmethod
+ def _build_ip_match(
+ proto=0, src_ip=4*b"\0", dst_ip=4*b"\0", src_port=0, dst_port=0):
+ """Build IP ACL match data in bytes format.
+
+ :param proto: Protocol number with valid option "x".
+ :param src_ip: Source ip address in packed format.
+ :param dst_ip: Destination ip address in packed format.
+ :param src_port: Source port number "x".
+ :param dst_port: Destination port number "x".
+ :type proto: int
+ :type src_ip: bytes
+ :type dst_ip: bytes
+ :type src_port: int
+ :type dst_port: int
+ :returns: IP ACL match data in byte-string format.
+ :rtype: str
+ """
+ return bytes.fromhex(
+ f"{hex(proto)[2:]!s:0>20}{src_ip.hex()!s:0>12}{dst_ip.hex()!s:0>8}"
+ f"{hex(src_port)[2:]!s:0>4}{hex(dst_port)[2:]!s:0>4}"
+ ).rstrip(b'\0')
- return table_index, skip_n, match_n
+ @staticmethod
+ def _build_ip6_match(
+ next_hdr=0, src_ip=16*b"\0", dst_ip=16*b"\0", src_port=0,
+ dst_port=0):
+ """Build IPv6 ACL match data in byte-string format.
+
+ :param next_hdr: Next header number with valid option "x".
+ :param src_ip: Source ip6 address in packed format.
+ :param dst_ip: Destination ip6 address in packed format.
+ :param src_port: Source port number "x".
+ :param dst_port: Destination port number "x".
+ :type next_hdr: int
+ :type src_ip: bytes
+ :type dst_ip: bytes
+ :type src_port: int
+ :type dst_port: int
+ :returns: IPv6 ACL match data in bytes format.
+ :rtype: bytes
+ """
+ return bytes.fromhex(
+ f"{hex(next_hdr)[2:]!s:0>14}{src_ip.hex()!s:0>34}"
+ f"{dst_ip.hex()!s:0>32}{hex(src_port)[2:]!s:0>4}"
+ f"{hex(dst_port)[2:]!s:0>4}"
+ ).rstrip(b'\0')
@staticmethod
- def vpp_creates_classify_table_hex(node, hex_mask):
- """Create classify table with hex mask.
+ def _classify_add_del_table(
+ node, is_add, mask, match_n_vectors=Constants.BITWISE_NON_ZERO,
+ table_index=Constants.BITWISE_NON_ZERO, nbuckets=2,
+ memory_size=2097152, skip_n_vectors=Constants.BITWISE_NON_ZERO,
+ next_table_index=Constants.BITWISE_NON_ZERO,
+ miss_next_index=Constants.BITWISE_NON_ZERO,
+ current_data_flag=0, current_data_offset=0):
+ """Add or delete a classify table.
- :param node: VPP node to create classify table based on hex mask.
- :param hex_mask: Classify hex mask.
+ :param node: VPP node to create classify table.
+ :param is_add: If True the table is added, if False table is deleted.
+ :param mask: ACL mask in hexstring format.
+ :param match_n_vectors: Number of vectors to match (Default value = ~0).
+ :param table_index: Index of the classify table. (Default value = ~0)
+ :param nbuckets: Number of buckets when adding a table.
+ (Default value = 2)
+ :param memory_size: Memory size when adding a table.
+ (Default value = 2097152)
+ :param skip_n_vectors: Number of skip vectors (Default value = ~0).
+ :param next_table_index: Index of next table. (Default value = ~0)
+ :param miss_next_index: Index of miss table. (Default value = ~0)
+ :param current_data_flag: Option to use current node's packet payload
+ as the starting point from where packets are classified.
+ This option is only valid for L2/L3 input ACL for now.
+ 0: by default, classify data from the buffer's start location
+ 1: classify packets from VPP node's current data pointer.
+ :param current_data_offset: A signed value to shift the start location
+ of the packet to be classified.
+ For example, if input IP ACL node is used, L2 header's first byte
+ can be accessible by configuring current_data_offset to -14
+ if there is no vlan tag.
+ This is valid only if current_data_flag is set to 1.
+ (Default value = 0)
:type node: dict
- :type hex_mask: str
+ :type is_add: bool
+ :type mask: str
+ :type match_n_vectors: int
+ :type table_index: int
+ :type nbuckets: int
+ :type memory_size: int
+ :type skip_n_vectors: int
+ :type next_table_index: int
+ :type miss_next_index: int
+ :type current_data_flag: int
+ :type current_data_offset: int
:returns: (table_index, skip_n, match_n)
table_index: Classify table index.
skip_n: Number of skip vectors.
match_n: Number of match vectors.
:rtype: tuple(int, int, int)
- :raises RuntimeError: If VPP can't create table.
"""
- output = VatExecutor.cmd_from_template(node,
- "classify_add_table_hex.vat",
- hex_mask=hex_mask)
-
- if output[0]["retval"] == 0:
- table_index = output[0]["new_table_index"]
- skip_n = output[0]["skip_n_vectors"]
- match_n = output[0]["match_n_vectors"]
- logger.trace('Classify table with table_index {} created on node {}'
- .format(table_index, node['host']))
- else:
- raise RuntimeError('Unable to create classify table on node {}'
- .format(node['host']))
-
- return table_index, skip_n, match_n
+ cmd = u"classify_add_del_table"
+ args = dict(
+ is_add=is_add,
+ del_chain=False,
+ table_index=table_index,
+ nbuckets=nbuckets,
+ memory_size=memory_size,
+ skip_n_vectors=skip_n_vectors,
+ match_n_vectors=match_n_vectors,
+ next_table_index=next_table_index,
+ miss_next_index=miss_next_index,
+ current_data_flag=current_data_flag,
+ current_data_offset=current_data_offset,
+ mask_len=len(mask),
+ mask=mask
+ )
+ err_msg = f"Failed to create a classify table on host {node[u'host']}"
+
+ with PapiSocketExecutor(node) as papi_exec:
+ reply = papi_exec.add(cmd, **args).get_reply(err_msg)
+
+ return int(reply[u"new_table_index"]), int(reply[u"skip_n_vectors"]),\
+ int(reply[u"match_n_vectors"])
@staticmethod
- def vpp_configures_classify_session_l3(node, acl_method, table_index,
- skip_n, match_n, ip_version,
- direction, address):
- """Configuration of classify session for IP address filtering.
-
- :param node: VPP node to setup classify session.
- :param acl_method: ACL method - deny/permit.
- :param table_index: Classify table index.
- :param skip_n: Number of skip vectors based on mask.
- :param match_n: Number of match vectors based on mask.
- :param ip_version: Version of IP protocol.
- :param direction: Direction of traffic - src/dst.
- :param address: IPv4 or IPv6 address.
+ def _classify_add_del_session(
+ node, is_add, table_index, match,
+ opaque_index=Constants.BITWISE_NON_ZERO,
+ hit_next_index=Constants.BITWISE_NON_ZERO, advance=0,
+ action=0, metadata=0):
+ """Add or delete a classify session.
+
+ :param node: VPP node to create classify session.
+ :param is_add: If True the session is added, if False the session
+ is deleted.
+ :param table_index: Index of the table to add/del the session.
+ :param match: For add, match value for session, required, needs to
+ include bytes in front with length of skip_n_vectors of target table
+ times sizeof (u32x4) (values of those bytes will be ignored).
+ :param opaque_index: For add, opaque_index of new session.
+ (Default value = ~0)
+ :param hit_next_index: For add, hit_next_index of new session.
+ (Default value = ~0)
+ :param advance: For add, advance value for session. (Default value = 0)
+ :param action: 0: No action (by default) metadata is not used.
+ 1: Classified IP packets will be looked up from the specified ipv4
+ fib table (configured by metadata as VRF id).
+ Only valid for L3 input ACL node
+ 2: Classified IP packets will be looked up from the specified ipv6
+ fib table (configured by metadata as VRF id).
+ Only valid for L3 input ACL node
+ 3: Classified packet will be steered to source routing policy of
+ given index (in metadata).
+ This is only valid for IPv6 packets redirected to a source
+ routing node.
+ :param metadata: Valid only if action != 0. VRF id if action is 1 or 2.
+ SR policy index if action is 3. (Default value = 0)
:type node: dict
- :type acl_method: str
+ :type is_add: bool
:type table_index: int
- :type skip_n: int
- :type match_n: int
- :type ip_version: str
- :type direction: str
- :type address: str
+ :type match: bytes
+ :type opaque_index: int
+ :type hit_next_index: int
+ :type advance: int
+ :type action: int
+ :type metadata: int
"""
- with VatTerminal(node) as vat:
- vat.vat_terminal_exec_cmd_from_template("classify_add_session.vat",
- acl_method=acl_method,
- table_index=table_index,
- skip_n=skip_n,
- match_n=match_n,
- ip_version=ip_version,
- direction=direction,
- address=address)
+ cmd = u"classify_add_del_session"
+ args = dict(
+ is_add=is_add,
+ table_index=table_index,
+ hit_next_index=hit_next_index,
+ opaque_index=opaque_index,
+ advance=advance,
+ action=action,
+ metadata=metadata,
+ match_len=len(match),
+ match=match
+ )
+ err_msg = f"Failed to create a classify session on host {node[u'host']}"
+
+ with PapiSocketExecutor(node) as papi_exec:
+ papi_exec.add(cmd, **args).get_reply(err_msg)
@staticmethod
- def vpp_configures_classify_session_l2(node, acl_method, table_index,
- skip_n, match_n, direction, address):
- """Configuration of classify session for MAC address filtering.
+ def _macip_acl_add(node, rules, tag=""):
+ """Add MACIP ACL.
- :param node: VPP node to setup classify session.
- :param acl_method: ACL method - deny/permit.
- :param table_index: Classify table index.
- :param skip_n: Number of skip vectors based on mask.
- :param match_n: Number of match vectors based on mask.
- :param direction: Direction of traffic - src/dst.
- :param address: IPv4 or IPv6 address.
+ :param node: VPP node to add MACIP ACL.
+ :param rules: List of rules for given ACL.
+ :param tag: ACL tag.
:type node: dict
- :type acl_method: str
- :type table_index: int
- :type skip_n: int
- :type match_n: int
- :type direction: str
- :type address: str
+ :type rules: list
+ :type tag: str
"""
- with VatTerminal(node) as vat:
- vat.vat_terminal_exec_cmd_from_template(
- "classify_add_session_l2.vat",
- acl_method=acl_method,
- table_index=table_index,
- skip_n=skip_n,
- match_n=match_n,
- direction=direction,
- address=address)
+ cmd = u"macip_acl_add"
+ args = dict(
+ r=rules,
+ count=len(rules),
+ tag=tag
+ )
+
+ err_msg = f"Failed to add MACIP ACL on host {node[u'host']}"
+
+ with PapiSocketExecutor(node) as papi_exec:
+ papi_exec.add(cmd, **args).get_reply(err_msg)
@staticmethod
- def vpp_configures_classify_session_hex(node, acl_method, table_index,
- skip_n, match_n, hex_value):
- """Configuration of classify session with hex value.
+ def _acl_interface_set_acl_list(node, sw_if_index, acl_type, acls):
+ """Set ACL list for interface.
- :param node: VPP node to setup classify session.
- :param acl_method: ACL method - deny/permit.
- :param table_index: Classify table index.
- :param skip_n: Number of skip vectors based on mask.
- :param match_n: Number of match vectors based on mask.
- :param hex_value: Classify hex value.
+ :param node: VPP node to set ACL list for interface.
+ :param sw_if_index: sw_if_index of the used interface.
+ :param acl_type: Type of ACL(s) - input or output.
+ :param acls: List of ACLs.
:type node: dict
- :type acl_method: str
- :type table_index: int
- :type skip_n: int
- :type match_n: int
- :type hex_value: str
+ :type sw_if_index: int
+ :type acl_type: str
+ :type acls: list
"""
- with VatTerminal(node) as vat:
- vat.vat_terminal_exec_cmd_from_template(
- "classify_add_session_hex.vat",
- acl_method=acl_method,
- table_index=table_index,
- skip_n=skip_n,
- match_n=match_n,
- hex_value=hex_value)
+ cmd = u"acl_interface_set_acl_list"
+ args = dict(
+ sw_if_index=sw_if_index,
+ acls=acls,
+ n_input=len(acls) if acl_type == u"input" else 0,
+ count=len(acls)
+ )
+
+ err_msg = f"Failed to set acl list for interface {sw_if_index} " \
+ f"on host {node[u'host']}"
+
+ with PapiSocketExecutor(node) as papi_exec:
+ papi_exec.add(cmd, **args).get_reply(err_msg)
@staticmethod
- def vpp_configures_classify_session_generic(node, session_type, table_index,
- skip_n, match_n, match,
- match2=''):
- """Configuration of classify session.
+ def _acl_add_replace(node, acl_idx, rules, tag=""):
+ """ Add/replace ACLs.
- :param node: VPP node to setup classify session.
- :param session_type: Session type - hit-next, l2-hit-next, acl-hit-next
- or policer-hit-next, and their respective parameters.
- :param table_index: Classify table index.
- :param skip_n: Number of skip vectors based on mask.
- :param match_n: Number of match vectors based on mask.
- :param match: Match value - l2, l3, l4 or hex, and their
- respective parameters.
- :param match2: Additional match values, to avoid using overly long
- variables in RobotFramework.
+ :param node: VPP node to add MACIP ACL.
+ :param acl_idx: ACL index.
+ :param rules: List of rules for given ACL.
+ :param tag: ACL tag.
:type node: dict
- :type session_type: str
- :type table_index: int
- :type skip_n: int
- :type match_n: int
- :type match: str
- :type match2: str
+ :type acl_idx: int
+ :type rules: list
+ :type tag: str
"""
+ cmd = u"acl_add_replace"
+ args = dict(
+ tag=tag,
+ acl_index=4294967295 if acl_idx is None else acl_idx,
+ count=len(rules),
+ r=rules
+ )
- match = ' '.join((match, match2))
+ err_msg = f"Failed to add/replace ACLs on host {node[u'host']}"
- with VatTerminal(node) as vat:
- vat.vat_terminal_exec_cmd_from_template(
- "classify_add_session_generic.vat",
- type=session_type,
- table_index=table_index,
- skip_n=skip_n,
- match_n=match_n,
- match=match,
- )
+ with PapiSocketExecutor(node) as papi_exec:
+ papi_exec.add(cmd, **args).get_reply(err_msg)
@staticmethod
- def compute_classify_hex_mask(ip_version, protocol, direction):
- """Compute classify hex mask for TCP or UDP packet matching.
+ def vpp_creates_classify_table_l3(node, ip_version, direction, netmask):
+ """Create classify table for IP address filtering.
+ :param node: VPP node to create classify table.
:param ip_version: Version of IP protocol.
- :param protocol: Type of protocol.
- :param direction: Traffic direction.
+ :param direction: Direction of traffic - src/dst.
+ :param netmask: IPv4 or Ipv6 (depending on the parameter 'ip_version')
+ netmask (decimal, e.g. 255.255.255.255).
+ :type node: dict
:type ip_version: str
- :type protocol: str
:type direction: str
- :returns: Classify hex mask.
- :rtype: str
- :raises ValueError: If protocol is not TCP or UDP.
- :raises ValueError: If direction is not source or destination or
- source + destination.
- """
- if protocol == 'TCP' or protocol == 'UDP':
- base_mask = Classify._compute_base_mask(ip_version)
-
- if direction == 'source':
- return base_mask + 'FFFF0000'
- elif direction == 'destination':
- return base_mask + '0000FFFF'
- elif direction == 'source + destination':
- return base_mask + 'FFFFFFFF'
- else:
- raise ValueError("Invalid direction!")
- else:
- raise ValueError("Invalid protocol!")
-
- @staticmethod
- def compute_classify_hex_value(hex_mask, source_port, destination_port):
- """Compute classify hex value for TCP or UDP packet matching.
-
- :param hex_mask: Classify hex mask.
- :param source_port: Source TCP/UDP port.
- :param destination_port: Destination TCP/UDP port.
- :type hex_mask: str
- :type source_port: str
- :type destination_port: str
- :returns: Classify hex value.
- :rtype: str
+ :type netmask: str
+ :returns: (table_index, skip_n, match_n)
+ table_index: Classify table index.
+ skip_n: Number of skip vectors.
+ match_n: Number of match vectors.
+ :rtype: tuple(int, int, int)
+ :raises ValueError: If the parameters 'ip_version' or 'direction' have
+ incorrect values.
"""
- source_port_hex = Classify._port_convert(source_port)
- destination_port_hex = Classify._port_convert(destination_port)
-
- return hex_mask[:-8] + source_port_hex + destination_port_hex
+ mask_f = dict(
+ ip4=Classify._build_ip_mask,
+ ip6=Classify._build_ip6_mask
+ )
- @staticmethod
- def _port_convert(port):
- """Convert port number for classify hex table format.
+ if ip_version in (u"ip4", u"ip6"):
+ netmask = ip_address(netmask).packed
+ else:
+ raise ValueError(f"IP version {ip_version} is not supported.")
- :param port: TCP/UDP port number.
- :type port: str
- :returns: TCP/UDP port number in 4-digit hexadecimal format.
- :rtype: str
- """
- return '{0:04x}'.format(int(port))
+ if direction == u"src":
+ mask = mask_f[ip_version](src_ip=netmask.hex())
+ elif direction == u"dst":
+ mask = mask_f[ip_version](dst_ip=netmask.hex())
+ else:
+ raise ValueError(f"Direction {direction} is not supported.")
+
+ # Add l2 ethernet header to mask
+ mask = 14 * b'\0' + mask
+
+ # Get index of the first significant mask octet
+ i = len(mask) - len(mask.lstrip(b'\0'))
+
+ # Compute skip_n parameter
+ skip_n = i // 16
+ # Remove octets to be skipped from the mask
+ mask = mask[skip_n*16:]
+ # Pad mask to an even multiple of the vector size
+ mask = mask + (16 - len(mask) % 16 if len(mask) % 16 else 0) * b'\0'
+ # Compute match_n parameter
+ match_n = len(mask) // 16
+
+ return Classify._classify_add_del_table(
+ node,
+ is_add=True,
+ mask=mask,
+ match_n_vectors=match_n,
+ skip_n_vectors=skip_n
+ )
@staticmethod
- def _compute_base_mask(ip_version):
- """Compute base classify hex mask based on IP version.
+ def vpp_configures_classify_session_l3(
+ node, acl_method, table_index, skip_n, match_n, ip_version,
+ direction, address, hit_next_index=None,
+ opaque_index=Constants.BITWISE_NON_ZERO, action=0, metadata=0):
+ """Configuration of classify session for IP address filtering.
+ :param node: VPP node to setup classify session.
+ :param acl_method: ACL method - deny/permit.
+ :param table_index: Classify table index.
+ :param skip_n: Number of skip vectors.
+ :param match_n: Number of vectors to match.
:param ip_version: Version of IP protocol.
+ :param direction: Direction of traffic - src/dst.
+ :param address: IPv4 or IPv6 address.
+ :param hit_next_index: hit_next_index of new session.
+ (Default value = None)
+ :param opaque_index: opaque_index of new session. (Default value = ~0)
+ :param action: 0: No action (by default) metadata is not used.
+ 1: Classified IP packets will be looked up from the specified ipv4
+ fib table (configured by metadata as VRF id).
+ Only valid for L3 input ACL node
+ 2: Classified IP packets will be looked up from the specified ipv6
+ fib table (configured by metadata as VRF id).
+ Only valid for L3 input ACL node
+ 3: Classified packet will be steered to source routing policy of
+ given index (in metadata).
+ This is only valid for IPv6 packets redirected to a source
+ routing node.
+ :param metadata: Valid only if action != 0. VRF id if action is 1 or 2.
+ SR policy index if action is 3. (Default value = 0)
+ :type node: dict
+ :type acl_method: str
+ :type table_index: int
+ :type skip_n: int
+ :type match_n: int
:type ip_version: str
- :returns: Base hex mask.
- :rtype: str
+ :type direction: str
+ :type address: str
+ :type hit_next_index: int
+ :type opaque_index: int
+ :type action: int
+ :type metadata: int
+ :raises ValueError: If the parameter 'direction' has incorrect value.
"""
- if ip_version == 'ip4':
- return 68 * '0'
- # base value of classify hex table for IPv4 TCP/UDP ports
- elif ip_version == 'ip6':
- return 108 * '0'
- # base value of classify hex table for IPv6 TCP/UDP ports
+ match_f = dict(
+ ip4=Classify._build_ip_match,
+ ip6=Classify._build_ip6_match
+ )
+ acl_hit_next_index = dict(
+ permit=Constants.BITWISE_NON_ZERO,
+ deny=0
+ )
+
+ if ip_version in (u"ip4", u"ip6"):
+ address = ip_address(address).packed
else:
- raise ValueError("Invalid IP version!")
+ raise ValueError(f"IP version {ip_version} is not supported.")
+
+ if direction == u"src":
+ match = match_f[ip_version](src_ip=address)
+ elif direction == u"dst":
+ match = match_f[ip_version](dst_ip=address)
+ else:
+ raise ValueError(f"Direction {direction} is not supported.")
+
+ # Prepend match with l2 ethernet header part
+ match = 14 * b'\0' + match
+
+ # Pad match to match skip_n_vector + match_n_vector size
+ match = match + ((match_n + skip_n) * 16 - len(match)
+ if len(match) < (match_n + skip_n) * 16
+ else 0) * b'\0'
+
+ Classify._classify_add_del_session(
+ node,
+ is_add=True,
+ table_index=table_index,
+ hit_next_index=hit_next_index if hit_next_index is not None
+ else acl_hit_next_index[acl_method],
+ opaque_index=opaque_index,
+ match=match,
+ action=action,
+ metadata=metadata
+ )
@staticmethod
def get_classify_table_data(node, table_index):
:returns: Classify table settings.
:rtype: dict
"""
- with VatTerminal(node) as vat:
- data = vat.vat_terminal_exec_cmd_from_template(
- "classify_table_info.vat",
- table_id=table_index
- )
- return data[0]
+ cmd = u"classify_table_info"
+ err_msg = f"Failed to get 'classify_table_info' on host {node[u'host']}"
+ args = dict(
+ table_id=int(table_index)
+ )
+ with PapiSocketExecutor(node) as papi_exec:
+ reply = papi_exec.add(cmd, **args).get_reply(err_msg)
+ return reply
@staticmethod
- def get_classify_session_data(node, table_index, session_index=None):
- """Retrieve settings for all classify sessions in a table,
- or for a specific classify session.
+ def get_classify_session_data(node, table_index):
+ """Retrieve settings for all classify sessions in a table.
:param node: VPP node to retrieve classify data from.
:param table_index: Index of a classify table.
- :param session_index: Index of a specific classify session. (Optional)
:type node: dict
:type table_index: int
- :type session_index: int
- :returns: List of classify session settings, or a dictionary of settings
- for a specific classify session.
+ :returns: List of classify session settings.
:rtype: list or dict
"""
- with VatTerminal(node) as vat:
- data = vat.vat_terminal_exec_cmd_from_template(
- "classify_session_dump.vat",
- table_id=table_index
- )
- if session_index is not None:
- return data[0][session_index]
- else:
- return data[0]
+ cmd = u"classify_session_dump"
+ args = dict(
+ table_id=int(table_index)
+ )
+ with PapiSocketExecutor(node) as papi_exec:
+ details = papi_exec.add(cmd, **args).get_details()
+
+ return details
+
+ @staticmethod
+ def show_classify_tables_verbose(node):
+ """Show classify tables verbose.
+
+ :param node: Topology node.
+ :type node: dict
+ :returns: Classify tables verbose data.
+ :rtype: str
+ """
+ return PapiSocketExecutor.run_cli_cmd(
+ node, u"show classify tables verbose"
+ )
@staticmethod
def vpp_log_plugin_acl_settings(node):
- """Retrieve configured settings from the ACL plugin
- and write to robot log.
+ """Retrieve configured settings from the ACL plugin and write to robot
+ log.
:param node: VPP node.
:type node: dict
"""
- try:
- VatExecutor.cmd_from_template(
- node, "acl_plugin/acl_dump.vat")
- except (ValueError, RuntimeError):
- # Fails to parse JSON data in response, but it is still logged
- pass
+ PapiSocketExecutor.dump_and_log(node, [u"acl_dump", ])
@staticmethod
def vpp_log_plugin_acl_interface_assignment(node):
- """Retrieve interface assignment from the ACL plugin
- and write to robot log.
+ """Retrieve interface assignment from the ACL plugin and write to robot
+ log.
:param node: VPP node.
:type node: dict
"""
- try:
- VatExecutor.cmd_from_template(
- node, "acl_plugin/acl_interface_dump.vat", json_out=False)
- except RuntimeError:
- # Fails to parse response, but it is still logged
- pass
+ PapiSocketExecutor.dump_and_log(node, [u"acl_interface_list_dump", ])
@staticmethod
def set_acl_list_for_interface(node, interface, acl_type, acl_idx=None):
:type interface: str or int
:type acl_type: str
:type acl_idx: list
- :raises RuntimeError: If unable to set ACL list for the interface.
"""
- if isinstance(interface, basestring):
- sw_if_index = Topology.get_interface_sw_index(node, interface)
- else:
- sw_if_index = interface
-
- acl_list = acl_type + ' ' + ' '.join(str(idx) for idx in acl_idx) \
- if acl_idx else acl_type
-
- try:
- with VatTerminal(node, json_param=False) as vat:
- vat.vat_terminal_exec_cmd_from_template(
- "acl_plugin/acl_interface_set_acl_list.vat",
- interface=sw_if_index, acl_list=acl_list)
- except RuntimeError:
- raise RuntimeError("Setting of ACL list for interface {0} failed "
- "on node {1}".format(interface, node['host']))
+ Classify._acl_interface_set_acl_list(
+ node=node,
+ sw_if_index=int(InterfaceUtil.get_interface_index(node, interface)),
+ acl_type=acl_type,
+ acls=acl_idx if isinstance(acl_idx, list) else list()
+ )
@staticmethod
- def add_replace_acl(node, acl_idx=None, ip_ver="ipv4", action="permit",
- src=None, dst=None, sport=None, dport=None, proto=None,
- tcpflg_val=None, tcpflg_mask=None):
- """Add a new ACL or replace the existing one. To replace an existing
- ACL, pass the ID of this ACL.
-
- :param node: VPP node to set ACL on.
- :param acl_idx: ID of ACL. (Optional)
- :param ip_ver: IP version. (Optional)
- :param action: ACL action. (Optional)
- :param src: Source IP in format IP/plen. (Optional)
- :param dst: Destination IP in format IP/plen. (Optional)
- :param sport: Source port or ICMP4/6 type - range format X-Y allowed.
- (Optional)
- :param dport: Destination port or ICMP4/6 code - range format X-Y
- allowed. (Optional)
- :param proto: L4 protocol (http://www.iana.org/assignments/protocol-
- numbers/protocol-numbers.xhtml). (Optional)
- :param tcpflg_val: TCP flags value. (Optional)
- :param tcpflg_mask: TCP flags mask. (Optional)
- :type node: dict
- :type acl_idx: int
- :type ip_ver: str
- :type action: str
- :type src: str
- :type dst: str
- :type sport: str or int
- :type dport: str or int
- :type proto: int
- :type tcpflg_val: int
- :type tcpflg_mask: int
- :raises RuntimeError: If unable to add or replace ACL.
- """
- acl_idx = '{0}'.format(acl_idx) if acl_idx else ''
-
- src = 'src {0}'.format(src) if src else ''
-
- dst = 'dst {0}'.format(dst) if dst else ''
-
- sport = 'sport {0}'.format(sport) if sport else ''
-
- dport = 'dport {0}'.format(dport) if dport else ''
-
- proto = 'proto {0}'.format(proto) if proto else ''
-
- tcpflags = 'tcpflags {0} {1}'.format(tcpflg_val, tcpflg_mask) \
- if tcpflg_val and tcpflg_mask else ''
-
- try:
- with VatTerminal(node, json_param=False) as vat:
- vat.vat_terminal_exec_cmd_from_template(
- "acl_plugin/acl_add_replace.vat", acl_idx=acl_idx,
- ip_ver=ip_ver, action=action, src=src, dst=dst, sport=sport,
- dport=dport, proto=proto, tcpflags=tcpflags)
- except RuntimeError:
- raise RuntimeError("Adding or replacing of ACL failed on "
- "node {0}".format(node['host']))
-
- @staticmethod
- def add_replace_acl_multi_entries(node, acl_idx=None, rules=None):
+ def add_replace_acl_multi_entries(node, acl_idx=None, rules=None, tag=u""):
"""Add a new ACL or replace the existing one. To replace an existing
ACL, pass the ID of this ACL.
:param node: VPP node to set ACL on.
:param acl_idx: ID of ACL. (Optional)
:param rules: Required rules. (Optional)
+ :param tag: ACL tag (Optional).
:type node: dict
:type acl_idx: int
:type rules: str
- :raises RuntimeError: If unable to add or replace ACL.
- """
- acl_idx = '{0}'.format(acl_idx) if acl_idx else ''
-
- rules = '{0}'.format(rules) if rules else ''
-
- try:
- with VatTerminal(node, json_param=False) as vat:
- vat.vat_terminal_exec_cmd_from_template(
- "acl_plugin/acl_add_replace.vat", acl_idx=acl_idx,
- ip_ver=rules, action='', src='', dst='', sport='',
- dport='', proto='', tcpflags='')
- except RuntimeError:
- raise RuntimeError("Adding or replacing of ACL failed on "
- "node {0}".format(node['host']))
-
- @staticmethod
- def delete_acl(node, idx):
- """Delete required ACL.
-
- :param node: VPP node to delete ACL on.
- :param idx: Index of ACL to be deleted.
- :type node: dict
- :type idx: int or str
- :raises RuntimeError: If unable to delete ACL.
+ :type tag: str
"""
- try:
- with VatTerminal(node, json_param=False) as vat:
- vat.vat_terminal_exec_cmd_from_template(
- "acl_plugin/acl_delete.vat", idx=idx)
- except RuntimeError:
- raise RuntimeError("Deletion of ACL failed on node {0}".
- format(node['host']))
-
- @staticmethod
- def cli_show_acl(node, acl_idx=None):
- """Show ACLs.
-
- :param node: VPP node to show ACL on.
- :param acl_idx: Index of ACL to be shown.
- :type node: dict
- :type acl_idx: int or str
- :raises RuntimeError: If unable to delete ACL.
- """
- acl_idx = '{0}'.format(acl_idx) if acl_idx else ''
+ reg_ex_src_ip = re.compile(r"(src [0-9a-fA-F.:/\d{1,2}]*)")
+ reg_ex_dst_ip = re.compile(r"(dst [0-9a-fA-F.:/\d{1,2}]*)")
+ reg_ex_sport = re.compile(r"(sport \d{1,5})")
+ reg_ex_dport = re.compile(r"(dport \d{1,5})")
+ reg_ex_proto = re.compile(r"(proto \d{1,5})")
+
+ acl_rules = list()
+ for rule in rules.split(u", "):
+ acl_rule = dict(
+ is_permit=2 if u"permit+reflect" in rule
+ else 1 if u"permit" in rule else 0,
+ src_prefix=0,
+ dst_prefix=0,
+ proto=0,
+ srcport_or_icmptype_first=0,
+ srcport_or_icmptype_last=65535,
+ dstport_or_icmpcode_first=0,
+ dstport_or_icmpcode_last=65535,
+ tcp_flags_mask=0,
+ tcp_flags_value=0
+ )
- try:
- with VatTerminal(node, json_param=False) as vat:
- vat.vat_terminal_exec_cmd_from_template(
- "acl_plugin/show_acl.vat", idx=acl_idx)
- except RuntimeError:
- raise RuntimeError("Failed to show ACL on node {0}".
- format(node['host']))
+ groups = re.search(reg_ex_src_ip, rule)
+ if groups:
+ grp = groups.group(1).split(u" ")[1].split(u"/")
+ acl_rule[u"src_prefix"] = IPUtil.create_prefix_object(
+ ip_address(grp[0]), int(grp[1])
+ )
+
+ groups = re.search(reg_ex_dst_ip, rule)
+ if groups:
+ grp = groups.group(1).split(u" ")[1].split(u"/")
+ acl_rule[u"dst_prefix"] = IPUtil.create_prefix_object(
+ ip_address(grp[0]), int(grp[1])
+ )
+
+ groups = re.search(reg_ex_sport, rule)
+ if groups:
+ port = int(groups.group(1).split(u" ")[1])
+ acl_rule[u"srcport_or_icmptype_first"] = port
+ acl_rule[u"srcport_or_icmptype_last"] = port
+
+ groups = re.search(reg_ex_dport, rule)
+ if groups:
+ port = int(groups.group(1).split(u" ")[1])
+ acl_rule[u"dstport_or_icmpcode_first"] = port
+ acl_rule[u"dstport_or_icmpcode_last"] = port
+
+ groups = re.search(reg_ex_proto, rule)
+ if groups:
+ proto = int(groups.group(1).split(' ')[1])
+ acl_rule[u"proto"] = proto
+
+ acl_rules.append(acl_rule)
+
+ Classify._acl_add_replace(
+ node, acl_idx=acl_idx, rules=acl_rules, tag=tag
+ )
@staticmethod
- def add_macip_acl(node, ip_ver="ipv4", action="permit", src_ip=None,
- src_mac=None, src_mac_mask=None):
+ def add_macip_acl_multi_entries(node, rules=u""):
"""Add a new MACIP ACL.
:param node: VPP node to set MACIP ACL on.
- :param ip_ver: IP version. (Optional)
- :param action: ACL action. (Optional)
- :param src_ip: Source IP in format IP/plen. (Optional)
- :param src_mac: Source MAC address in format with colons. (Optional)
- :param src_mac_mask: Source MAC address mask in format with colons.
- 00:00:00:00:00:00 is a wildcard mask. (Optional)
+ :param rules: Required MACIP rules.
:type node: dict
- :type ip_ver: str
- :type action: str
- :type src_ip: str
- :type src_mac: str
- :type src_mac_mask: str
- :raises RuntimeError: If unable to add MACIP ACL.
+ :type rules: str
"""
- src_ip = 'ip {0}'.format(src_ip) if src_ip else ''
-
- src_mac = 'mac {0}'.format(src_mac) if src_mac else ''
-
- src_mac_mask = 'mask {0}'.format(src_mac_mask) if src_mac_mask else ''
-
- try:
- with VatTerminal(node, json_param=False) as vat:
- vat.vat_terminal_exec_cmd_from_template(
- "acl_plugin/macip_acl_add.vat", ip_ver=ip_ver,
- action=action, src_ip=src_ip, src_mac=src_mac,
- src_mac_mask=src_mac_mask)
- except RuntimeError:
- raise RuntimeError("Adding of MACIP ACL failed on node {0}".
- format(node['host']))
+ reg_ex_ip = re.compile(r"(ip [0-9a-fA-F.:/\d{1,2}]*)")
+ reg_ex_mac = re.compile(r"(mac \S\S:\S\S:\S\S:\S\S:\S\S:\S\S)")
+ reg_ex_mask = re.compile(r"(mask \S\S:\S\S:\S\S:\S\S:\S\S:\S\S)")
+
+ acl_rules = list()
+ for rule in rules.split(u", "):
+ acl_rule = dict(
+ is_permit=2 if u"permit+reflect" in rule
+ else 1 if u"permit" in rule else 0,
+ src_mac=6*b'0',
+ src_mac_mask=6*b'0',
+ prefix=0
+ )
- @staticmethod
- def add_macip_acl_multi_entries(node, rules=None):
- """Add a new MACIP ACL.
+ groups = re.search(reg_ex_mac, rule)
+ if groups:
+ mac = groups.group(1).split(u" ")[1].replace(u":", u"")
+ acl_rule[u"src_mac"] = bytes.fromhex(mac)
- :param node: VPP node to set MACIP ACL on.
- :param rules: Required MACIP rules. (Optional)
- :type node: dict
- :type rules: str
- :raises RuntimeError: If unable to add MACIP ACL.
- """
- rules = '{0}'.format(rules) if rules else ''
+ groups = re.search(reg_ex_mask, rule)
+ if groups:
+ mask = groups.group(1).split(u" ")[1].replace(u":", u"")
+ acl_rule[u"src_mac_mask"] = bytes.fromhex(mask)
- try:
- with VatTerminal(node, json_param=False) as vat:
- vat.vat_terminal_exec_cmd_from_template(
- "acl_plugin/macip_acl_add.vat", ip_ver=rules, action='',
- src_ip='', src_mac='', src_mac_mask='')
- except RuntimeError:
- raise RuntimeError("Adding of MACIP ACL failed on node {0}".
- format(node['host']))
+ groups = re.search(reg_ex_ip, rule)
+ if groups:
+ grp = groups.group(1).split(u" ")[1].split(u"/")
+ acl_rule[u"src_prefix"] = IPUtil.create_prefix_object(
+ ip_address((grp[0])), int(grp[1])
+ )
- @staticmethod
- def delete_macip_acl(node, idx):
- """Delete required MACIP ACL.
+ acl_rules.append(acl_rule)
- :param node: VPP node to delete MACIP ACL on.
- :param idx: Index of ACL to be deleted.
- :type node: dict
- :type idx: int or str
- :raises RuntimeError: If unable to delete MACIP ACL.
- """
- try:
- with VatTerminal(node, json_param=False) as vat:
- vat.vat_terminal_exec_cmd_from_template(
- "acl_plugin/macip_acl_delete.vat", idx=idx)
- except RuntimeError:
- raise RuntimeError("Deletion of MACIP ACL failed on node {0}".
- format(node['host']))
+ Classify._macip_acl_add(node=node, rules=acl_rules)
@staticmethod
def vpp_log_macip_acl_settings(node):
- """Retrieve configured MACIP settings from the ACL plugin
- and write to robot log.
+ """Retrieve configured MACIP settings from the ACL plugin and write to
+ robot log.
:param node: VPP node.
:type node: dict
"""
- try:
- VatExecutor.cmd_from_template(
- node, "acl_plugin/macip_acl_dump.vat")
- except (ValueError, RuntimeError):
- # Fails to parse JSON data in response, but it is still logged
- pass
+ PapiSocketExecutor.dump_and_log(node, [u"macip_acl_dump", ])
@staticmethod
def add_del_macip_acl_interface(node, interface, action, acl_idx):
:type acl_idx: str or int
:raises RuntimeError: If unable to set MACIP ACL for the interface.
"""
- if isinstance(interface, basestring):
- sw_if_index = Topology.get_interface_sw_index(node, interface)
- else:
- sw_if_index = interface
-
- try:
- with VatTerminal(node, json_param=False) as vat:
- vat.vat_terminal_exec_cmd_from_template(
- "acl_plugin/macip_acl_interface_add_del.vat",
- sw_if_index=sw_if_index, action=action, acl_idx=acl_idx)
- except RuntimeError:
- raise RuntimeError("Setting of MACIP ACL index for interface {0} "
- "failed on node {1}".
- format(interface, node['host']))
+ cmd = u"macip_acl_interface_add_del"
+ err_msg = f"Failed to get 'macip_acl_interface' on host {node[u'host']}"
+ args = dict(
+ is_add=bool(action == u"add"),
+ sw_if_index=int(InterfaceUtil.get_interface_index(node, interface)),
+ acl_index=int(acl_idx)
+ )
+ with PapiSocketExecutor(node) as papi_exec:
+ papi_exec.add(cmd, **args).get_reply(err_msg)
@staticmethod
def vpp_log_macip_acl_interface_assignment(node):
:param node: VPP node.
:type node: dict
"""
- try:
- VatExecutor.cmd_from_template(
- node, "acl_plugin/macip_acl_interface_get.vat", json_out=False)
- except RuntimeError:
- # Fails to parse response, but it is still logged
- pass
+ cmd = u"macip_acl_interface_get"
+ err_msg = f"Failed to get 'macip_acl_interface' on host {node[u'host']}"
+ with PapiSocketExecutor(node) as papi_exec:
+ reply = papi_exec.add(cmd).get_reply(err_msg)
+ logger.info(reply)