import binascii
import re
-from socket import AF_INET, AF_INET6, inet_aton, inet_pton
+from ipaddress import ip_address
from robot.api import logger
from resources.libraries.python.topology import Topology
-from resources.libraries.python.PapiExecutor import PapiExecutor
+from resources.libraries.python.PapiExecutor import PapiSocketExecutor
class Classify(object):
:returns MAC ACL mask in hexstring format.
:rtype: str
"""
+ if ether_type:
+ end = 28
+ elif src_mac:
+ end = 24
+ else:
+ end = 12
+
return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
- dst_mac.replace(':', ''), src_mac.replace(':', ''), ether_type)).\
- rstrip('0')
+ dst_mac.replace(':', ''), src_mac.replace(':', ''),
+ ether_type))[0:end]
@staticmethod
def _build_ip_mask(proto='', src_ip='', dst_ip='', src_port='',
:returns: IP mask in hexstring format.
:rtype: str
"""
+ if dst_port:
+ end = 48
+ elif src_port:
+ end = 44
+ elif dst_ip:
+ end = 40
+ elif src_ip:
+ end = 32
+ else:
+ end = 20
+
return ('{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}'.format(
- proto, src_ip, dst_ip, src_port, dst_port)).rstrip('0')
+ proto, src_ip, dst_ip, src_port, dst_port))[0:end]
@staticmethod
def _build_ip6_mask(next_hdr='', src_ip='', dst_ip='', src_port='',
:returns: IPv6 ACL mask in hexstring format.
:rtype: str
"""
+ if dst_port:
+ end = 88
+ elif src_port:
+ end = 84
+ elif dst_ip:
+ end = 80
+ elif src_ip:
+ end = 48
+ else:
+ end = 14
+
return ('{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}'.format(
- next_hdr, src_ip, dst_ip, src_port, dst_port)).rstrip('0')
+ next_hdr, src_ip, dst_ip, src_port, dst_port))[0:end]
@staticmethod
def _build_mac_match(dst_mac='', src_mac='', ether_type=''):
:returns: MAC ACL match data in hexstring format.
:rtype: str
"""
- if dst_mac:
- dst_mac = dst_mac.replace(':', '')
- if src_mac:
- src_mac = src_mac.replace(':', '')
+ if ether_type:
+ end = 28
+ elif src_mac:
+ end = 24
+ else:
+ end = 12
return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
- dst_mac, src_mac, ether_type)).rstrip('0')
+ dst_mac.replace(':', ''), src_mac.replace(':', ''),
+ ether_type))[0:end]
@staticmethod
def _build_ip_match(proto=0, src_ip='', dst_ip='', src_port=0, dst_port=0):
:rtype: str
"""
if src_ip:
- src_ip = binascii.hexlify(inet_aton(src_ip))
+ src_ip = binascii.hexlify(ip_address(unicode(src_ip)).packed)
if dst_ip:
- dst_ip = binascii.hexlify(inet_aton(dst_ip))
+ dst_ip = binascii.hexlify(ip_address(unicode(dst_ip)).packed)
+ if dst_port:
+ end = 48
+ elif src_port:
+ end = 44
+ elif dst_ip:
+ end = 40
+ elif src_ip:
+ end = 32
+ else:
+ end = 20
return ('{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}'.format(
hex(proto)[2:], src_ip, dst_ip, hex(src_port)[2:],
- hex(dst_port)[2:])).rstrip('0')
+ hex(dst_port)[2:]))[0:end]
@staticmethod
def _build_ip6_match(next_hdr=0, src_ip='', dst_ip='', src_port=0,
"""Build IPv6 ACL match data in hexstring format.
:param next_hdr: Next header number with valid option "x".
- :param src_ip: Source ip6 address with format of "xxx:xxxx::xxxx".
+ :param src_ip: Source ip6 address with format of "xxxx:xxxx::xxxx".
:param dst_ip: Destination ip6 address with format of
- "xxx:xxxx::xxxx".
+ "xxxx:xxxx::xxxx".
:param src_port: Source port number "x".
:param dst_port: Destination port number "x".
:type next_hdr: int
:rtype: str
"""
if src_ip:
- src_ip = binascii.hexlify(inet_pton(AF_INET6, src_ip))
+ src_ip = binascii.hexlify(ip_address(unicode(src_ip)).packed)
if dst_ip:
- dst_ip = binascii.hexlify(inet_pton(AF_INET6, dst_ip))
+ dst_ip = binascii.hexlify(ip_address(unicode(dst_ip)).packed)
+ if dst_port:
+ end = 88
+ elif src_port:
+ end = 84
+ elif dst_ip:
+ end = 80
+ elif src_ip:
+ end = 48
+ else:
+ end = 14
return ('{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}'.format(
hex(next_hdr)[2:], src_ip, dst_ip, hex(src_port)[2:],
- hex(dst_port)[2:])).rstrip('0')
+ hex(dst_port)[2:]))[0:end]
@staticmethod
def _classify_add_del_table(node, is_add, mask, match_n_vectors=1,
err_msg = "Failed to create a classify table on host {host}".format(
host=node['host'])
- with PapiExecutor(node) as papi_exec:
- data = papi_exec.add(cmd, **args).get_replies(err_msg).\
- verify_reply(err_msg=err_msg)
+ with PapiSocketExecutor(node) as papi_exec:
+ reply = papi_exec.add(cmd, **args).get_reply(err_msg)
- return int(data["new_table_index"]), int(data["skip_n_vectors"]),\
- int(data["match_n_vectors"])
+ return int(reply["new_table_index"]), int(reply["skip_n_vectors"]),\
+ int(reply["match_n_vectors"])
@staticmethod
def _classify_add_del_session(node, is_add, table_index, match,
err_msg = "Failed to create a classify session on host {host}".format(
host=node['host'])
- with PapiExecutor(node) as papi_exec:
- papi_exec.add(cmd, **args).get_replies(err_msg). \
- verify_reply(err_msg=err_msg)
+ with PapiSocketExecutor(node) as papi_exec:
+ papi_exec.add(cmd, **args).get_reply(err_msg)
@staticmethod
def _macip_acl_add(node, rules, tag=""):
err_msg = "Failed to create a classify session on host {host}".format(
host=node['host'])
- with PapiExecutor(node) as papi_exec:
- papi_exec.add(cmd, **args).get_replies(err_msg). \
- verify_reply(err_msg=err_msg)
+ with PapiSocketExecutor(node) as papi_exec:
+ papi_exec.add(cmd, **args).get_reply(err_msg)
@staticmethod
def _acl_interface_set_acl_list(node, sw_if_index, acl_type, acls):
err_msg = "Failed to set acl list for interface {idx} on host {host}".\
format(idx=sw_if_index, host=node['host'])
- with PapiExecutor(node) as papi_exec:
- papi_exec.add(cmd, **args).get_replies(err_msg). \
- verify_reply(err_msg=err_msg)
+ with PapiSocketExecutor(node) as papi_exec:
+ papi_exec.add(cmd, **args).get_reply(err_msg)
@staticmethod
def _acl_add_replace(node, acl_idx, rules, tag=""):
"""
cmd = "acl_add_replace"
args = dict(
- tag=tag,
+ tag=tag.encode("utf-8"),
acl_index=4294967295 if acl_idx is None else acl_idx,
count=len(rules),
r=rules
err_msg = "Failed to add/replace acls on host {host}".format(
host=node['host'])
- with PapiExecutor(node) as papi_exec:
- papi_exec.add(cmd, **args).get_replies(err_msg). \
- verify_reply(err_msg=err_msg)
+ with PapiSocketExecutor(node) as papi_exec:
+ papi_exec.add(cmd, **args).get_reply(err_msg)
@staticmethod
def vpp_creates_classify_table_l3(node, ip_version, direction, ip_addr):
ip4=Classify._build_ip_mask,
ip6=Classify._build_ip6_mask
)
- if ip_version == "ip4":
- ip_addr = binascii.hexlify(inet_aton(ip_addr))
- elif ip_version == "ip6":
- ip_addr = binascii.hexlify(inet_pton(AF_INET6, ip_addr))
+ if ip_version == "ip4" or ip_version == "ip6":
+ ip_addr = binascii.hexlify(ip_address(unicode(ip_addr)).packed)
else:
raise ValueError("IP version {ver} is not supported.".
format(ver=ip_version))
args = dict(
table_id=int(table_index)
)
- with PapiExecutor(node) as papi_exec:
- data = papi_exec.add(cmd, **args).get_replies(err_msg).\
- verify_reply(err_msg=err_msg)
-
- return data
+ with PapiSocketExecutor(node) as papi_exec:
+ reply = papi_exec.add(cmd, **args).get_reply(err_msg)
+ return reply
@staticmethod
def get_classify_session_data(node, table_index):
:returns: List of classify session settings.
:rtype: list or dict
"""
+ cmd = "classify_session_dump"
args = dict(
table_id=int(table_index)
)
- with PapiExecutor(node) as papi_exec:
- dump = papi_exec.add("classify_session_dump", **args).\
- get_dump().reply[0]["api_reply"]["classify_session_details"]
+ with PapiSocketExecutor(node) as papi_exec:
+ details = papi_exec.add(cmd, **args).get_details()
- return dump
+ return details
@staticmethod
def vpp_log_plugin_acl_settings(node):
:param node: VPP node.
:type node: dict
"""
- PapiExecutor.dump_and_log(node, ["acl_dump", ])
+ PapiSocketExecutor.dump_and_log(node, ["acl_dump", ])
@staticmethod
def vpp_log_plugin_acl_interface_assignment(node):
:param node: VPP node.
:type node: dict
"""
- PapiExecutor.dump_and_log(node, ["acl_interface_list_dump", ])
+ PapiSocketExecutor.dump_and_log(node, ["acl_interface_list_dump", ])
@staticmethod
def set_acl_list_for_interface(node, interface, acl_type, acl_idx=None):
acls=acls)
@staticmethod
- def add_replace_acl_multi_entries(node, acl_idx=None, rules=None):
+ def add_replace_acl_multi_entries(node, acl_idx=None, rules=None, tag=""):
"""Add a new ACL or replace the existing one. To replace an existing
ACL, pass the ID of this ACL.
:param node: VPP node to set ACL on.
:param acl_idx: ID of ACL. (Optional)
:param rules: Required rules. (Optional)
+ :param tag: ACL tag (Optional).
:type node: dict
:type acl_idx: int
:type rules: str
+ :type tag: str
"""
reg_ex_src_ip = re.compile(r'(src [0-9a-fA-F.:/\d{1,2}]*)')
reg_ex_dst_ip = re.compile(r'(dst [0-9a-fA-F.:/\d{1,2}]*)')
reg_ex_sport = re.compile(r'(sport \d{1,5})')
reg_ex_dport = re.compile(r'(dport \d{1,5})')
+ reg_ex_proto = re.compile(r'(proto \d{1,5})')
acl_rules = list()
for rule in rules.split(", "):
groups = re.search(reg_ex_src_ip, rule)
if groups:
grp = groups.group(1).split(' ')[1].split('/')
- acl_rule["src_ip_addr"] = str(inet_pton(
- AF_INET6 if acl_rule["is_ipv6"] else AF_INET, grp[0]))
+ acl_rule["src_ip_addr"] = ip_address(unicode(grp[0])).packed
acl_rule["src_ip_prefix_len"] = int(grp[1])
groups = re.search(reg_ex_dst_ip, rule)
if groups:
grp = groups.group(1).split(' ')[1].split('/')
- acl_rule["dst_ip_addr"] = str(inet_pton(
- AF_INET6 if acl_rule["is_ipv6"] else AF_INET, grp[0]))
+ acl_rule["dst_ip_addr"] = ip_address(unicode(grp[0])).packed
acl_rule["dst_ip_prefix_len"] = int(grp[1])
groups = re.search(reg_ex_sport, rule)
port = int(groups.group(1).split(' ')[1])
acl_rule["srcport_or_icmptype_first"] = port
acl_rule["srcport_or_icmptype_last"] = port
+ else:
+ acl_rule["srcport_or_icmptype_first"] = 0
+ acl_rule["srcport_or_icmptype_last"] = 65535
groups = re.search(reg_ex_dport, rule)
if groups:
port = int(groups.group(1).split(' ')[1])
acl_rule["dstport_or_icmpcode_first"] = port
acl_rule["dstport_or_icmpcode_last"] = port
+ else:
+ acl_rule["dstport_or_icmpcode_first"] = 0
+ acl_rule["dstport_or_icmpcode_last"] = 65535
- acl_rule["proto"] = 0
+ groups = re.search(reg_ex_proto, rule)
+ if groups:
+ proto = int(groups.group(1).split(' ')[1])
+ acl_rule["proto"] = proto
+ else:
+ acl_rule["proto"] = 0
acl_rules.append(acl_rule)
- Classify._acl_add_replace(node, acl_idx=acl_idx, rules=acl_rules)
+ Classify._acl_add_replace(
+ node, acl_idx=acl_idx, rules=acl_rules, tag=tag)
@staticmethod
def add_macip_acl_multi_entries(node, rules=""):
groups = re.search(reg_ex_mac, rule)
if groups:
mac = groups.group(1).split(' ')[1].replace(':', '')
- acl_rule["src_mac"] = unicode(mac)
+ acl_rule["src_mac"] = binascii.unhexlify(unicode(mac))
groups = re.search(reg_ex_mask, rule)
if groups:
mask = groups.group(1).split(' ')[1].replace(':', '')
- acl_rule["src_mac_mask"] = unicode(mask)
+ acl_rule["src_mac_mask"] = binascii.unhexlify(unicode(mask))
groups = re.search(reg_ex_ip, rule)
if groups:
grp = groups.group(1).split(' ')[1].split('/')
- acl_rule["src_ip_addr"] = str(inet_pton(
- AF_INET6 if acl_rule["is_ipv6"] else AF_INET, grp[0]))
+ acl_rule["src_ip_addr"] = ip_address(unicode(grp[0])).packed
acl_rule["src_ip_prefix_len"] = int(grp[1])
acl_rules.append(acl_rule)
:param node: VPP node.
:type node: dict
"""
- PapiExecutor.dump_and_log(node, ["macip_acl_dump", ])
+ PapiSocketExecutor.dump_and_log(node, ["macip_acl_dump", ])
@staticmethod
def add_del_macip_acl_interface(node, interface, action, acl_idx):
sw_if_index=int(sw_if_index),
acl_index=int(acl_idx)
)
- with PapiExecutor(node) as papi_exec:
- papi_exec.add(cmd, **args).get_replies(err_msg).\
- verify_reply(err_msg=err_msg)
+ with PapiSocketExecutor(node) as papi_exec:
+ papi_exec.add(cmd, **args).get_reply(err_msg)
@staticmethod
def vpp_log_macip_acl_interface_assignment(node):
cmd = 'macip_acl_interface_get'
err_msg = "Failed to get 'macip_acl_interface' on host {host}".format(
host=node['host'])
- with PapiExecutor(node) as papi_exec:
- rpl = papi_exec.add(cmd).get_replies(err_msg).reply[0]["api_reply"]
- logger.info(rpl["macip_acl_interface_get_reply"])
+ with PapiSocketExecutor(node) as papi_exec:
+ reply = papi_exec.add(cmd).get_reply(err_msg)
+ logger.info(reply)