-# Copyright (c) 2019 Cisco and/or its affiliates.
+# Copyright (c) 2021 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
"""Classify utilities library."""
-import binascii
import re
from ipaddress import ip_address
from robot.api import logger
-from resources.libraries.python.topology import Topology
+from resources.libraries.python.Constants import Constants
+from resources.libraries.python.InterfaceUtil import InterfaceUtil
+from resources.libraries.python.IPUtil import IPUtil
from resources.libraries.python.PapiExecutor import PapiSocketExecutor
-class Classify(object):
+class Classify:
"""Classify utilities."""
@staticmethod
- def _build_mac_mask(dst_mac='', src_mac='', ether_type=''):
- """Build MAC ACL mask data in hexstring format.
+ def _build_mac_mask(dst_mac=u"", src_mac=u"", ether_type=u""):
+ """Build MAC ACL mask data in bytes format.
:param dst_mac: Source MAC address <0-ffffffffffff>.
:param src_mac: Destination MAC address <0-ffffffffffff>.
:type dst_mac: str
:type src_mac: str
:type ether_type: str
- :returns MAC ACL mask in hexstring format.
- :rtype: str
+ :returns MAC ACL mask in bytes format.
+ :rtype: bytes
"""
- if ether_type:
- end = 28
- elif src_mac:
- end = 24
- else:
- end = 12
-
- return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
- dst_mac.replace(':', ''), src_mac.replace(':', ''),
- ether_type))[0:end]
+ return bytes.fromhex(
+ f"{dst_mac.replace(u':', u'')!s:0>12}"
+ f"{src_mac.replace(u':', u'')!s:0>12}"
+ f"{ether_type!s:0>4}"
+ ).rstrip(b'\0')
@staticmethod
- def _build_ip_mask(proto='', src_ip='', dst_ip='', src_port='',
- dst_port=''):
- """Build IP ACL mask data in hexstring format.
+ def _build_ip_mask(
+ proto=u"", src_ip=u"", dst_ip=u"", src_port=u"", dst_port=u""):
+ """Build IP ACL mask data in bytes format.
:param proto: Protocol number <0-ff>.
:param src_ip: Source ip address <0-ffffffff>.
:type dst_ip: str
:type src_port: str
:type dst_port:src
- :returns: IP mask in hexstring format.
- :rtype: str
+ :returns: IP mask in bytes format.
+ :rtype: bytes
"""
- if dst_port:
- end = 48
- elif src_port:
- end = 44
- elif dst_ip:
- end = 40
- elif src_ip:
- end = 32
- else:
- end = 20
-
- return ('{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}'.format(
- proto, src_ip, dst_ip, src_port, dst_port))[0:end]
+ return bytes.fromhex(
+ f"{proto!s:0>20}{src_ip!s:0>12}{dst_ip!s:0>8}{src_port!s:0>4}"
+ f"{dst_port!s:0>4}"
+ ).rstrip(b'\0')
@staticmethod
- def _build_ip6_mask(next_hdr='', src_ip='', dst_ip='', src_port='',
- dst_port=''):
- """Build IPv6 ACL mask data in hexstring format.
+ def _build_ip6_mask(
+ next_hdr=u"", src_ip=u"", dst_ip=u"", src_port=u"", dst_port=u""):
+ """Build IPv6 ACL mask data in bytes format.
:param next_hdr: Next header number <0-ff>.
:param src_ip: Source ip address <0-ffffffff>.
:type dst_ip: str
:type src_port: str
:type dst_port: str
- :returns: IPv6 ACL mask in hexstring format.
- :rtype: str
+ :returns: IPv6 ACL mask in bytes format.
+ :rtype: bytes
"""
- if dst_port:
- end = 88
- elif src_port:
- end = 84
- elif dst_ip:
- end = 80
- elif src_ip:
- end = 48
- else:
- end = 14
-
- return ('{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}'.format(
- next_hdr, src_ip, dst_ip, src_port, dst_port))[0:end]
+ return bytes.fromhex(
+ f"{next_hdr!s:0>14}{src_ip!s:0>34}{dst_ip!s:0>32}{src_port!s:0>4}"
+ f"{dst_port!s:0>4}"
+ ).rstrip(b'\0')
@staticmethod
- def _build_mac_match(dst_mac='', src_mac='', ether_type=''):
- """Build MAC ACL match data in hexstring format.
+ def _build_mac_match(dst_mac=u"", src_mac=u"", ether_type=u""):
+ """Build MAC ACL match data in bytes format.
:param dst_mac: Source MAC address <x:x:x:x:x:x>.
:param src_mac: Destination MAC address <x:x:x:x:x:x>.
:type dst_mac: str
:type src_mac: str
:type ether_type: str
- :returns: MAC ACL match data in hexstring format.
- :rtype: str
+ :returns: MAC ACL match data in bytes format.
+ :rtype: bytes
"""
- if ether_type:
- end = 28
- elif src_mac:
- end = 24
- else:
- end = 12
-
- return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
- dst_mac.replace(':', ''), src_mac.replace(':', ''),
- ether_type))[0:end]
+ return bytes.fromhex(
+ f"{dst_mac.replace(u':', u'')!s:0>12}"
+ f"{src_mac.replace(u':', u'')!s:0>12}"
+ f"{ether_type!s:0>4}"
+ ).rstrip(b'\0')
@staticmethod
- def _build_ip_match(proto=0, src_ip='', dst_ip='', src_port=0, dst_port=0):
- """Build IP ACL match data in hexstring format.
+ def _build_ip_match(
+ proto=0, src_ip=4*b"\0", dst_ip=4*b"\0", src_port=0, dst_port=0):
+ """Build IP ACL match data in bytes format.
:param proto: Protocol number with valid option "x".
- :param src_ip: Source ip address with format of "x.x.x.x".
- :param dst_ip: Destination ip address with format of "x.x.x.x".
+ :param src_ip: Source ip address in packed format.
+ :param dst_ip: Destination ip address in packed format.
:param src_port: Source port number "x".
:param dst_port: Destination port number "x".
:type proto: int
- :type src_ip: str
- :type dst_ip: str
+ :type src_ip: bytes
+ :type dst_ip: bytes
:type src_port: int
:type dst_port: int
- :returns: IP ACL match data in hexstring format.
+ :returns: IP ACL match data in byte-string format.
:rtype: str
"""
- if src_ip:
- src_ip = binascii.hexlify(ip_address(unicode(src_ip)).packed)
- if dst_ip:
- dst_ip = binascii.hexlify(ip_address(unicode(dst_ip)).packed)
- if dst_port:
- end = 48
- elif src_port:
- end = 44
- elif dst_ip:
- end = 40
- elif src_ip:
- end = 32
- else:
- end = 20
-
- return ('{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}'.format(
- hex(proto)[2:], src_ip, dst_ip, hex(src_port)[2:],
- hex(dst_port)[2:]))[0:end]
+ return bytes.fromhex(
+ f"{hex(proto)[2:]!s:0>20}{src_ip.hex()!s:0>12}{dst_ip.hex()!s:0>8}"
+ f"{hex(src_port)[2:]!s:0>4}{hex(dst_port)[2:]!s:0>4}"
+ ).rstrip(b'\0')
@staticmethod
- def _build_ip6_match(next_hdr=0, src_ip='', dst_ip='', src_port=0,
- dst_port=0):
- """Build IPv6 ACL match data in hexstring format.
+ def _build_ip6_match(
+ next_hdr=0, src_ip=16*b"\0", dst_ip=16*b"\0", src_port=0,
+ dst_port=0):
+ """Build IPv6 ACL match data in byte-string format.
:param next_hdr: Next header number with valid option "x".
- :param src_ip: Source ip6 address with format of "xxxx:xxxx::xxxx".
- :param dst_ip: Destination ip6 address with format of
- "xxxx:xxxx::xxxx".
+ :param src_ip: Source ip6 address in packed format.
+ :param dst_ip: Destination ip6 address in packed format.
:param src_port: Source port number "x".
:param dst_port: Destination port number "x".
:type next_hdr: int
- :type src_ip: str
- :type dst_ip: str
+ :type src_ip: bytes
+ :type dst_ip: bytes
:type src_port: int
:type dst_port: int
- :returns: IPv6 ACL match data in hexstring format.
- :rtype: str
+ :returns: IPv6 ACL match data in bytes format.
+ :rtype: bytes
"""
- if src_ip:
- src_ip = binascii.hexlify(ip_address(unicode(src_ip)).packed)
- if dst_ip:
- dst_ip = binascii.hexlify(ip_address(unicode(dst_ip)).packed)
- if dst_port:
- end = 88
- elif src_port:
- end = 84
- elif dst_ip:
- end = 80
- elif src_ip:
- end = 48
- else:
- end = 14
-
- return ('{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}'.format(
- hex(next_hdr)[2:], src_ip, dst_ip, hex(src_port)[2:],
- hex(dst_port)[2:]))[0:end]
+ return bytes.fromhex(
+ f"{hex(next_hdr)[2:]!s:0>14}{src_ip.hex()!s:0>34}"
+ f"{dst_ip.hex()!s:0>32}{hex(src_port)[2:]!s:0>4}"
+ f"{hex(dst_port)[2:]!s:0>4}"
+ ).rstrip(b'\0')
@staticmethod
- def _classify_add_del_table(node, is_add, mask, match_n_vectors=1,
- table_index=0xFFFFFFFF, nbuckets=2,
- memory_size=2097152, skip_n_vectors=0,
- next_table_index=0xFFFFFFFF,
- miss_next_index=0xFFFFFFFF, current_data_flag=0,
- current_data_offset=0):
+ def _classify_add_del_table(
+ node, is_add, mask, match_n_vectors=Constants.BITWISE_NON_ZERO,
+ table_index=Constants.BITWISE_NON_ZERO, nbuckets=2,
+ memory_size=2097152, skip_n_vectors=Constants.BITWISE_NON_ZERO,
+ next_table_index=Constants.BITWISE_NON_ZERO,
+ miss_next_index=Constants.BITWISE_NON_ZERO,
+ current_data_flag=0, current_data_offset=0):
"""Add or delete a classify table.
:param node: VPP node to create classify table.
- :param is_add: If 1 the table is added, if 0 the table is deleted.
+ :param is_add: If True the table is added, if False table is deleted.
:param mask: ACL mask in hexstring format.
- :param match_n_vectors: Number of vectors to match (Default value = 1).
- :param table_index: Index of the classify table.
- (Default value = 0xFFFFFFFF)
+ :param match_n_vectors: Number of vectors to match (Default value = ~0).
+ :param table_index: Index of the classify table. (Default value = ~0)
:param nbuckets: Number of buckets when adding a table.
(Default value = 2)
:param memory_size: Memory size when adding a table.
(Default value = 2097152)
- :param skip_n_vectors: Number of skip vectors (Default value = 0).
- :param next_table_index: Index of next table.
- (Default value = 0xFFFFFFFF)
- :param miss_next_index: Index of miss table.
- (Default value = 0xFFFFFFFF)
+ :param skip_n_vectors: Number of skip vectors (Default value = ~0).
+ :param next_table_index: Index of next table. (Default value = ~0)
+ :param miss_next_index: Index of miss table. (Default value = ~0)
:param current_data_flag: Option to use current node's packet payload
as the starting point from where packets are classified.
This option is only valid for L2/L3 input ACL for now.
This is valid only if current_data_flag is set to 1.
(Default value = 0)
:type node: dict
- :type is_add: int
+ :type is_add: bool
:type mask: str
:type match_n_vectors: int
:type table_index: int
match_n: Number of match vectors.
:rtype: tuple(int, int, int)
"""
- mask_len = ((len(mask) - 1) / 16 + 1) * 16
- mask = mask + '\0' * (mask_len - len(mask))
-
+ cmd = u"classify_add_del_table"
args = dict(
is_add=is_add,
+ del_chain=False,
table_index=table_index,
nbuckets=nbuckets,
memory_size=memory_size,
miss_next_index=miss_next_index,
current_data_flag=current_data_flag,
current_data_offset=current_data_offset,
- mask_len=mask_len,
+ mask_len=len(mask),
mask=mask
)
-
- cmd = 'classify_add_del_table'
- err_msg = "Failed to create a classify table on host {host}".format(
- host=node['host'])
+ err_msg = f"Failed to create a classify table on host {node[u'host']}"
with PapiSocketExecutor(node) as papi_exec:
reply = papi_exec.add(cmd, **args).get_reply(err_msg)
- return int(reply["new_table_index"]), int(reply["skip_n_vectors"]),\
- int(reply["match_n_vectors"])
+ return int(reply[u"new_table_index"]), int(reply[u"skip_n_vectors"]),\
+ int(reply[u"match_n_vectors"])
@staticmethod
- def _classify_add_del_session(node, is_add, table_index, match,
- opaque_index=0xFFFFFFFF,
- hit_next_index=0xFFFFFFFF, advance=0,
- action=0, metadata=0):
+ def _classify_add_del_session(
+ node, is_add, table_index, match,
+ opaque_index=Constants.BITWISE_NON_ZERO,
+ hit_next_index=Constants.BITWISE_NON_ZERO, advance=0,
+ action=0, metadata=0):
"""Add or delete a classify session.
:param node: VPP node to create classify session.
- :param is_add: If 1 the session is added, if 0 the session is deleted.
+ :param is_add: If True the session is added, if False the session
+ is deleted.
:param table_index: Index of the table to add/del the session.
:param match: For add, match value for session, required, needs to
include bytes in front with length of skip_n_vectors of target table
times sizeof (u32x4) (values of those bytes will be ignored).
:param opaque_index: For add, opaque_index of new session.
- (Default value = 0xFFFFFFFF)
+ (Default value = ~0)
:param hit_next_index: For add, hit_next_index of new session.
- (Default value = 0xFFFFFFFF)
+ (Default value = ~0)
:param advance: For add, advance value for session. (Default value = 0)
:param action: 0: No action (by default) metadata is not used.
1: Classified IP packets will be looked up from the specified ipv4
- fib table (configured by metadata as VRF id).
- Only valid for L3 input ACL node
+ fib table (configured by metadata as VRF id).
+ Only valid for L3 input ACL node
2: Classified IP packets will be looked up from the specified ipv6
- fib table (configured by metadata as VRF id).
- Only valid for L3 input ACL node
- 3: Classified packet will be steered to source routig policy of
- given index (in metadata).
- This is only valid for IPv6 packets redirected to a source
- routing node.
- :param metadata: Valid only if action != 0
- VRF id if action is 1 or 2. SR policy index if action is 3.
- (Default value = 0)
+ fib table (configured by metadata as VRF id).
+ Only valid for L3 input ACL node
+ 3: Classified packet will be steered to source routing policy of
+ given index (in metadata).
+ This is only valid for IPv6 packets redirected to a source
+ routing node.
+ :param metadata: Valid only if action != 0. VRF id if action is 1 or 2.
+ SR policy index if action is 3. (Default value = 0)
:type node: dict
- :type is_add: int
+ :type is_add: bool
:type table_index: int
- :type match: str
+ :type match: bytes
:type opaque_index: int
:type hit_next_index: int
:type advance: int
:type action: int
:type metadata: int
"""
-
- match_len = ((len(match) - 1) / 16 + 1) * 16
- match = match + '\0' * (match_len - len(match))
+ cmd = u"classify_add_del_session"
args = dict(
is_add=is_add,
table_index=table_index,
advance=advance,
action=action,
metadata=metadata,
- match_len=match_len,
+ match_len=len(match),
match=match
)
- cmd = 'classify_add_del_session'
- err_msg = "Failed to create a classify session on host {host}".format(
- host=node['host'])
+ err_msg = f"Failed to create a classify session on host {node[u'host']}"
with PapiSocketExecutor(node) as papi_exec:
papi_exec.add(cmd, **args).get_reply(err_msg)
:type rules: list
:type tag: str
"""
- cmd = "macip_acl_add"
+ cmd = u"macip_acl_add"
args = dict(
r=rules,
count=len(rules),
tag=tag
)
- err_msg = "Failed to create a classify session on host {host}".format(
- host=node['host'])
+ err_msg = f"Failed to add MACIP ACL on host {node[u'host']}"
with PapiSocketExecutor(node) as papi_exec:
papi_exec.add(cmd, **args).get_reply(err_msg)
:type acl_type: str
:type acls: list
"""
- cmd = "acl_interface_set_acl_list"
- n_input = len(acls) if acl_type == "input" else 0
+ cmd = u"acl_interface_set_acl_list"
args = dict(
sw_if_index=sw_if_index,
acls=acls,
- n_input=n_input,
+ n_input=len(acls) if acl_type == u"input" else 0,
count=len(acls)
)
- err_msg = "Failed to set acl list for interface {idx} on host {host}".\
- format(idx=sw_if_index, host=node['host'])
+ err_msg = f"Failed to set acl list for interface {sw_if_index} " \
+ f"on host {node[u'host']}"
with PapiSocketExecutor(node) as papi_exec:
papi_exec.add(cmd, **args).get_reply(err_msg)
:type rules: list
:type tag: str
"""
- cmd = "acl_add_replace"
+ cmd = u"acl_add_replace"
args = dict(
- tag=tag.encode("utf-8"),
+ tag=tag,
acl_index=4294967295 if acl_idx is None else acl_idx,
count=len(rules),
r=rules
)
- err_msg = "Failed to add/replace acls on host {host}".format(
- host=node['host'])
+ err_msg = f"Failed to add/replace ACLs on host {node[u'host']}"
with PapiSocketExecutor(node) as papi_exec:
papi_exec.add(cmd, **args).get_reply(err_msg)
@staticmethod
- def vpp_creates_classify_table_l3(node, ip_version, direction, ip_addr):
+ def vpp_creates_classify_table_l3(node, ip_version, direction, netmask):
"""Create classify table for IP address filtering.
:param node: VPP node to create classify table.
:param ip_version: Version of IP protocol.
:param direction: Direction of traffic - src/dst.
- :param ip_addr: IPv4 or Ipv6 (depending on the parameter 'ip_version')
- address.
+ :param netmask: IPv4 or Ipv6 (depending on the parameter 'ip_version')
+ netmask (decimal, e.g. 255.255.255.255).
:type node: dict
:type ip_version: str
:type direction: str
- :type ip_addr: str
+ :type netmask: str
:returns: (table_index, skip_n, match_n)
table_index: Classify table index.
skip_n: Number of skip vectors.
ip4=Classify._build_ip_mask,
ip6=Classify._build_ip6_mask
)
- if ip_version == "ip4" or ip_version == "ip6":
- ip_addr = binascii.hexlify(ip_address(unicode(ip_addr)).packed)
- else:
- raise ValueError("IP version {ver} is not supported.".
- format(ver=ip_version))
- if direction == "src":
- mask = mask_f[ip_version](src_ip=ip_addr)
- elif direction == "dst":
- mask = mask_f[ip_version](dst_ip=ip_addr)
+ if ip_version in (u"ip4", u"ip6"):
+ netmask = ip_address(netmask).packed
else:
- raise ValueError("Direction {dir} is not supported.".
- format(dir=direction))
-
- return Classify._classify_add_del_table(
- node,
- is_add=1,
- mask=binascii.unhexlify(mask),
- match_n_vectors=(len(mask) - 1) // 32 + 1
- )
-
- @staticmethod
- def vpp_creates_classify_table_l2(node, direction, mac=""):
- """Create classify table for MAC address filtering.
+ raise ValueError(f"IP version {ip_version} is not supported.")
- :param node: VPP node to create classify table.
- :param direction: Direction of traffic - src/dst.
- :param mac: Source or destination (depending on the parameter
- 'direction') MAC address.
- :type node: dict
- :type direction: str
- :type mac: str
- :returns: (table_index, skip_n, match_n)
- table_index: Classify table index.
- skip_n: Number of skip vectors.
- match_n: Number of match vectors.
- :rtype: tuple(int, int, int)
- :raises ValueError: If the parameter 'direction' has incorrect value.
- """
- if direction == "src":
- mask = Classify._build_mac_mask(src_mac=mac)
- elif direction == "dst":
- mask = Classify._build_mac_mask(dst_mac=mac)
+ if direction == u"src":
+ mask = mask_f[ip_version](src_ip=netmask.hex())
+ elif direction == u"dst":
+ mask = mask_f[ip_version](dst_ip=netmask.hex())
else:
- raise ValueError("Direction {dir} is not supported.".
- format(dir=direction))
+ raise ValueError(f"Direction {direction} is not supported.")
- return Classify._classify_add_del_table(
- node,
- is_add=1,
- mask=binascii.unhexlify(mask),
- match_n_vectors=(len(mask) - 1) // 32 + 1
- )
+ # Add l2 ethernet header to mask
+ mask = 14 * b'\0' + mask
- @staticmethod
- def vpp_creates_classify_table_hex(node, hex_mask):
- """Create classify table with hex mask.
+ # Get index of the first significant mask octet
+ i = len(mask) - len(mask.lstrip(b'\0'))
+
+ # Compute skip_n parameter
+ skip_n = i // 16
+ # Remove octets to be skipped from the mask
+ mask = mask[skip_n*16:]
+ # Pad mask to an even multiple of the vector size
+ mask = mask + (16 - len(mask) % 16 if len(mask) % 16 else 0) * b'\0'
+ # Compute match_n parameter
+ match_n = len(mask) // 16
- :param node: VPP node to create classify table based on hex mask.
- :param hex_mask: Classify hex mask.
- :type node: dict
- :type hex_mask: str
- :returns: (table_index, skip_n, match_n)
- table_index: Classify table index.
- skip_n: Number of skip vectors.
- match_n: Number of match vectors.
- :rtype: tuple(int, int, int)
- """
return Classify._classify_add_del_table(
node,
- is_add=1,
- mask=binascii.unhexlify(hex_mask),
- match_n_vectors=(len(hex_mask) - 1) // 32 + 1
+ is_add=True,
+ mask=mask,
+ match_n_vectors=match_n,
+ skip_n_vectors=skip_n
)
@staticmethod
- def vpp_configures_classify_session_l3(node, acl_method, table_index,
- ip_version, direction, address):
+ def vpp_configures_classify_session_l3(
+ node, acl_method, table_index, skip_n, match_n, ip_version,
+ direction, address, hit_next_index=None,
+ opaque_index=Constants.BITWISE_NON_ZERO, action=0, metadata=0):
"""Configuration of classify session for IP address filtering.
:param node: VPP node to setup classify session.
:param acl_method: ACL method - deny/permit.
:param table_index: Classify table index.
+ :param skip_n: Number of skip vectors.
+ :param match_n: Number of vectors to match.
:param ip_version: Version of IP protocol.
:param direction: Direction of traffic - src/dst.
:param address: IPv4 or IPv6 address.
+ :param hit_next_index: hit_next_index of new session.
+ (Default value = None)
+ :param opaque_index: opaque_index of new session. (Default value = ~0)
+ :param action: 0: No action (by default) metadata is not used.
+ 1: Classified IP packets will be looked up from the specified ipv4
+ fib table (configured by metadata as VRF id).
+ Only valid for L3 input ACL node
+ 2: Classified IP packets will be looked up from the specified ipv6
+ fib table (configured by metadata as VRF id).
+ Only valid for L3 input ACL node
+ 3: Classified packet will be steered to source routing policy of
+ given index (in metadata).
+ This is only valid for IPv6 packets redirected to a source
+ routing node.
+ :param metadata: Valid only if action != 0. VRF id if action is 1 or 2.
+ SR policy index if action is 3. (Default value = 0)
:type node: dict
:type acl_method: str
:type table_index: int
+ :type skip_n: int
+ :type match_n: int
:type ip_version: str
:type direction: str
:type address: str
+ :type hit_next_index: int
+ :type opaque_index: int
+ :type action: int
+ :type metadata: int
:raises ValueError: If the parameter 'direction' has incorrect value.
"""
match_f = dict(
ip4=Classify._build_ip_match,
ip6=Classify._build_ip6_match
)
- if direction == "src":
- match = match_f[ip_version](src_ip=address)
- elif direction == "dst":
- match = match_f[ip_version](dst_ip=address)
- else:
- raise ValueError("Direction {dir} is not supported.".
- format(dir=direction))
- action = dict(
- permit=0,
- deny=1
+ acl_hit_next_index = dict(
+ permit=Constants.BITWISE_NON_ZERO,
+ deny=0
)
- Classify._classify_add_del_session(
- node,
- is_add=1,
- table_index=table_index,
- match=binascii.unhexlify(match),
- action=action[acl_method])
- @staticmethod
- def vpp_configures_classify_session_l2(node, acl_method, table_index,
- direction, address):
- """Configuration of classify session for MAC address filtering.
+ if ip_version in (u"ip4", u"ip6"):
+ address = ip_address(address).packed
+ else:
+ raise ValueError(f"IP version {ip_version} is not supported.")
- :param node: VPP node to setup classify session.
- :param acl_method: ACL method - deny/permit.
- :param table_index: Classify table index.
- :param direction: Direction of traffic - src/dst.
- :param address: MAC address.
- :type node: dict
- :type acl_method: str
- :type table_index: int
- :type direction: str
- :type address: str
- :raises ValueError: If the parameter 'direction' has incorrect value.
- """
- if direction == "src":
- match = Classify._build_mac_match(src_mac=address)
- elif direction == "dst":
- match = Classify._build_mac_match(dst_mac=address)
+ if direction == u"src":
+ match = match_f[ip_version](src_ip=address)
+ elif direction == u"dst":
+ match = match_f[ip_version](dst_ip=address)
else:
- raise ValueError("Direction {dir} is not supported.".
- format(dir=direction))
- action = dict(
- permit=0,
- deny=1
- )
- Classify._classify_add_del_session(
- node,
- is_add=1,
- table_index=table_index,
- match=binascii.unhexlify(match),
- action=action[acl_method])
+ raise ValueError(f"Direction {direction} is not supported.")
- @staticmethod
- def vpp_configures_classify_session_hex(node, acl_method, table_index,
- hex_value):
- """Configuration of classify session with hex value.
+ # Prepend match with l2 ethernet header part
+ match = 14 * b'\0' + match
+
+ # Pad match to match skip_n_vector + match_n_vector size
+ match = match + ((match_n + skip_n) * 16 - len(match)
+ if len(match) < (match_n + skip_n) * 16
+ else 0) * b'\0'
- :param node: VPP node to setup classify session.
- :param acl_method: ACL method - deny/permit.
- :param table_index: Classify table index.
- :param hex_value: Classify hex value.
- :type node: dict
- :type acl_method: str
- :type table_index: int
- :type hex_value: str
- """
- action = dict(
- permit=0,
- deny=1
- )
Classify._classify_add_del_session(
node,
- is_add=1,
+ is_add=True,
table_index=table_index,
- match=binascii.unhexlify(hex_value),
- action=action[acl_method])
-
- @staticmethod
- def compute_classify_hex_mask(ip_version, protocol, direction):
- """Compute classify hex mask for TCP or UDP packet matching.
-
- :param ip_version: Version of IP protocol.
- :param protocol: Type of protocol.
- :param direction: Traffic direction.
- :type ip_version: str
- :type protocol: str
- :type direction: str
- :returns: Classify hex mask.
- :rtype: str
- :raises ValueError: If protocol is not TCP or UDP.
- :raises ValueError: If direction is not source or destination or
- source + destination.
- """
- if protocol in ('TCP', 'UDP'):
- base_mask = Classify._compute_base_mask(ip_version)
-
- if direction == 'source':
- return base_mask + 'FFFF0000'
- elif direction == 'destination':
- return base_mask + '0000FFFF'
- elif direction == 'source + destination':
- return base_mask + 'FFFFFFFF'
- else:
- raise ValueError("Invalid direction!")
- else:
- raise ValueError("Invalid protocol!")
-
- @staticmethod
- def compute_classify_hex_value(hex_mask, source_port, destination_port):
- """Compute classify hex value for TCP or UDP packet matching.
-
- :param hex_mask: Classify hex mask.
- :param source_port: Source TCP/UDP port.
- :param destination_port: Destination TCP/UDP port.
- :type hex_mask: str
- :type source_port: str
- :type destination_port: str
- :returns: Classify hex value.
- :rtype: str
- """
- source_port_hex = Classify._port_convert(source_port)
- destination_port_hex = Classify._port_convert(destination_port)
-
- return hex_mask[:-8] + source_port_hex + destination_port_hex
-
- @staticmethod
- def _port_convert(port):
- """Convert port number for classify hex table format.
-
- :param port: TCP/UDP port number.
- :type port: str
- :returns: TCP/UDP port number in 4-digit hexadecimal format.
- :rtype: str
- """
- return '{0:04x}'.format(int(port))
-
- @staticmethod
- def _compute_base_mask(ip_version):
- """Compute base classify hex mask based on IP version.
-
- :param ip_version: Version of IP protocol.
- :type ip_version: str
- :returns: Base hex mask.
- :rtype: str
- """
- if ip_version == 'ip4':
- return 68 * '0'
- # base value of classify hex table for IPv4 TCP/UDP ports
- elif ip_version == 'ip6':
- return 108 * '0'
- # base value of classify hex table for IPv6 TCP/UDP ports
- else:
- raise ValueError("Invalid IP version!")
+ hit_next_index=hit_next_index if hit_next_index is not None
+ else acl_hit_next_index[acl_method],
+ opaque_index=opaque_index,
+ match=match,
+ action=action,
+ metadata=metadata
+ )
@staticmethod
def get_classify_table_data(node, table_index):
:returns: Classify table settings.
:rtype: dict
"""
- cmd = 'classify_table_info'
- err_msg = "Failed to get 'classify_table_info' on host {host}".format(
- host=node['host'])
+ cmd = u"classify_table_info"
+ err_msg = f"Failed to get 'classify_table_info' on host {node[u'host']}"
args = dict(
table_id=int(table_index)
)
:returns: List of classify session settings.
:rtype: list or dict
"""
- cmd = "classify_session_dump"
+ cmd = u"classify_session_dump"
args = dict(
table_id=int(table_index)
)
return details
+ @staticmethod
+ def show_classify_tables_verbose(node):
+ """Show classify tables verbose.
+
+ :param node: Topology node.
+ :type node: dict
+ :returns: Classify tables verbose data.
+ :rtype: str
+ """
+ return PapiSocketExecutor.run_cli_cmd(
+ node, u"show classify tables verbose"
+ )
+
@staticmethod
def vpp_log_plugin_acl_settings(node):
"""Retrieve configured settings from the ACL plugin and write to robot
:param node: VPP node.
:type node: dict
"""
- PapiSocketExecutor.dump_and_log(node, ["acl_dump", ])
+ PapiSocketExecutor.dump_and_log(node, [u"acl_dump", ])
@staticmethod
def vpp_log_plugin_acl_interface_assignment(node):
:param node: VPP node.
:type node: dict
"""
- PapiSocketExecutor.dump_and_log(node, ["acl_interface_list_dump", ])
+ PapiSocketExecutor.dump_and_log(node, [u"acl_interface_list_dump", ])
@staticmethod
def set_acl_list_for_interface(node, interface, acl_type, acl_idx=None):
:type acl_type: str
:type acl_idx: list
"""
- if isinstance(interface, basestring):
- sw_if_index = Topology.get_interface_sw_index(node, interface)
- else:
- sw_if_index = int(interface)
-
- acls = acl_idx if isinstance(acl_idx, list) else list()
-
- Classify._acl_interface_set_acl_list(node=node,
- sw_if_index=sw_if_index,
- acl_type=acl_type,
- acls=acls)
+ Classify._acl_interface_set_acl_list(
+ node=node,
+ sw_if_index=int(InterfaceUtil.get_interface_index(node, interface)),
+ acl_type=acl_type,
+ acls=acl_idx if isinstance(acl_idx, list) else list()
+ )
@staticmethod
- def add_replace_acl_multi_entries(node, acl_idx=None, rules=None, tag=""):
+ def add_replace_acl_multi_entries(node, acl_idx=None, rules=None, tag=u""):
"""Add a new ACL or replace the existing one. To replace an existing
ACL, pass the ID of this ACL.
:type rules: str
:type tag: str
"""
- reg_ex_src_ip = re.compile(r'(src [0-9a-fA-F.:/\d{1,2}]*)')
- reg_ex_dst_ip = re.compile(r'(dst [0-9a-fA-F.:/\d{1,2}]*)')
- reg_ex_sport = re.compile(r'(sport \d{1,5})')
- reg_ex_dport = re.compile(r'(dport \d{1,5})')
- reg_ex_proto = re.compile(r'(proto \d{1,5})')
+ reg_ex_src_ip = re.compile(r"(src [0-9a-fA-F.:/\d{1,2}]*)")
+ reg_ex_dst_ip = re.compile(r"(dst [0-9a-fA-F.:/\d{1,2}]*)")
+ reg_ex_sport = re.compile(r"(sport \d{1,5})")
+ reg_ex_dport = re.compile(r"(dport \d{1,5})")
+ reg_ex_proto = re.compile(r"(proto \d{1,5})")
acl_rules = list()
- for rule in rules.split(", "):
- acl_rule = dict()
- acl_rule["is_permit"] = 1 if "permit" in rule else 0
- acl_rule["is_ipv6"] = 1 if "ipv6" in rule else 0
+ for rule in rules.split(u", "):
+ acl_rule = dict(
+ is_permit=2 if u"permit+reflect" in rule
+ else 1 if u"permit" in rule else 0,
+ src_prefix=0,
+ dst_prefix=0,
+ proto=0,
+ srcport_or_icmptype_first=0,
+ srcport_or_icmptype_last=65535,
+ dstport_or_icmpcode_first=0,
+ dstport_or_icmpcode_last=65535,
+ tcp_flags_mask=0,
+ tcp_flags_value=0
+ )
groups = re.search(reg_ex_src_ip, rule)
if groups:
- grp = groups.group(1).split(' ')[1].split('/')
- acl_rule["src_ip_addr"] = ip_address(unicode(grp[0])).packed
- acl_rule["src_ip_prefix_len"] = int(grp[1])
+ grp = groups.group(1).split(u" ")[1].split(u"/")
+ acl_rule[u"src_prefix"] = IPUtil.create_prefix_object(
+ ip_address(grp[0]), int(grp[1])
+ )
groups = re.search(reg_ex_dst_ip, rule)
if groups:
- grp = groups.group(1).split(' ')[1].split('/')
- acl_rule["dst_ip_addr"] = ip_address(unicode(grp[0])).packed
- acl_rule["dst_ip_prefix_len"] = int(grp[1])
+ grp = groups.group(1).split(u" ")[1].split(u"/")
+ acl_rule[u"dst_prefix"] = IPUtil.create_prefix_object(
+ ip_address(grp[0]), int(grp[1])
+ )
groups = re.search(reg_ex_sport, rule)
if groups:
- port = int(groups.group(1).split(' ')[1])
- acl_rule["srcport_or_icmptype_first"] = port
- acl_rule["srcport_or_icmptype_last"] = port
- else:
- acl_rule["srcport_or_icmptype_first"] = 0
- acl_rule["srcport_or_icmptype_last"] = 65535
+ port = int(groups.group(1).split(u" ")[1])
+ acl_rule[u"srcport_or_icmptype_first"] = port
+ acl_rule[u"srcport_or_icmptype_last"] = port
groups = re.search(reg_ex_dport, rule)
if groups:
- port = int(groups.group(1).split(' ')[1])
- acl_rule["dstport_or_icmpcode_first"] = port
- acl_rule["dstport_or_icmpcode_last"] = port
- else:
- acl_rule["dstport_or_icmpcode_first"] = 0
- acl_rule["dstport_or_icmpcode_last"] = 65535
+ port = int(groups.group(1).split(u" ")[1])
+ acl_rule[u"dstport_or_icmpcode_first"] = port
+ acl_rule[u"dstport_or_icmpcode_last"] = port
groups = re.search(reg_ex_proto, rule)
if groups:
proto = int(groups.group(1).split(' ')[1])
- acl_rule["proto"] = proto
- else:
- acl_rule["proto"] = 0
+ acl_rule[u"proto"] = proto
acl_rules.append(acl_rule)
Classify._acl_add_replace(
- node, acl_idx=acl_idx, rules=acl_rules, tag=tag)
+ node, acl_idx=acl_idx, rules=acl_rules, tag=tag
+ )
@staticmethod
- def add_macip_acl_multi_entries(node, rules=""):
+ def add_macip_acl_multi_entries(node, rules=u""):
"""Add a new MACIP ACL.
:param node: VPP node to set MACIP ACL on.
:type node: dict
:type rules: str
"""
- reg_ex_ip = re.compile(r'(ip [0-9a-fA-F.:/\d{1,2}]*)')
- reg_ex_mac = re.compile(r'(mac \S\S:\S\S:\S\S:\S\S:\S\S:\S\S)')
- reg_ex_mask = re.compile(r'(mask \S\S:\S\S:\S\S:\S\S:\S\S:\S\S)')
+ reg_ex_ip = re.compile(r"(ip [0-9a-fA-F.:/\d{1,2}]*)")
+ reg_ex_mac = re.compile(r"(mac \S\S:\S\S:\S\S:\S\S:\S\S:\S\S)")
+ reg_ex_mask = re.compile(r"(mask \S\S:\S\S:\S\S:\S\S:\S\S:\S\S)")
acl_rules = list()
- for rule in rules.split(", "):
- acl_rule = dict()
- acl_rule["is_permit"] = 1 if "permit" in rule else 0
- acl_rule["is_ipv6"] = 1 if "ipv6" in rule else 0
+ for rule in rules.split(u", "):
+ acl_rule = dict(
+ is_permit=2 if u"permit+reflect" in rule
+ else 1 if u"permit" in rule else 0,
+ src_mac=6*b'0',
+ src_mac_mask=6*b'0',
+ prefix=0
+ )
groups = re.search(reg_ex_mac, rule)
if groups:
- mac = groups.group(1).split(' ')[1].replace(':', '')
- acl_rule["src_mac"] = unicode(mac)
+ mac = groups.group(1).split(u" ")[1].replace(u":", u"")
+ acl_rule[u"src_mac"] = bytes.fromhex(mac)
groups = re.search(reg_ex_mask, rule)
if groups:
- mask = groups.group(1).split(' ')[1].replace(':', '')
- acl_rule["src_mac_mask"] = unicode(mask)
+ mask = groups.group(1).split(u" ")[1].replace(u":", u"")
+ acl_rule[u"src_mac_mask"] = bytes.fromhex(mask)
groups = re.search(reg_ex_ip, rule)
if groups:
- grp = groups.group(1).split(' ')[1].split('/')
- acl_rule["src_ip_addr"] = ip_address(unicode(grp[0])).packed
- acl_rule["src_ip_prefix_len"] = int(grp[1])
+ grp = groups.group(1).split(u" ")[1].split(u"/")
+ acl_rule[u"src_prefix"] = IPUtil.create_prefix_object(
+ ip_address((grp[0])), int(grp[1])
+ )
acl_rules.append(acl_rule)
:param node: VPP node.
:type node: dict
"""
- PapiSocketExecutor.dump_and_log(node, ["macip_acl_dump", ])
+ PapiSocketExecutor.dump_and_log(node, [u"macip_acl_dump", ])
@staticmethod
def add_del_macip_acl_interface(node, interface, action, acl_idx):
:type acl_idx: str or int
:raises RuntimeError: If unable to set MACIP ACL for the interface.
"""
- if isinstance(interface, basestring):
- sw_if_index = Topology.get_interface_sw_index(node, interface)
- else:
- sw_if_index = interface
-
- is_add = 1 if action == "add" else 0
-
- cmd = 'macip_acl_interface_add_del'
- err_msg = "Failed to get 'macip_acl_interface' on host {host}".format(
- host=node['host'])
+ cmd = u"macip_acl_interface_add_del"
+ err_msg = f"Failed to get 'macip_acl_interface' on host {node[u'host']}"
args = dict(
- is_add=is_add,
- sw_if_index=int(sw_if_index),
+ is_add=bool(action == u"add"),
+ sw_if_index=int(InterfaceUtil.get_interface_index(node, interface)),
acl_index=int(acl_idx)
)
with PapiSocketExecutor(node) as papi_exec:
:param node: VPP node.
:type node: dict
"""
- cmd = 'macip_acl_interface_get'
- err_msg = "Failed to get 'macip_acl_interface' on host {host}".format(
- host=node['host'])
+ cmd = u"macip_acl_interface_get"
+ err_msg = f"Failed to get 'macip_acl_interface' on host {node[u'host']}"
with PapiSocketExecutor(node) as papi_exec:
reply = papi_exec.add(cmd).get_reply(err_msg)
logger.info(reply)