-# Copyright (c) 2018 Cisco and/or its affiliates.
+# Copyright (c) 2020 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
"""NAT utilities library."""
-from resources.libraries.python.VatExecutor import VatTerminal, VatExecutor
+from math import log2, modf
+from pprint import pformat
+from enum import IntEnum
+from ipaddress import IPv4Address
+from robot.api import logger
-class NATUtil(object):
+from resources.libraries.python.Constants import Constants
+from resources.libraries.python.InterfaceUtil import InterfaceUtil
+from resources.libraries.python.topology import Topology
+from resources.libraries.python.PapiExecutor import PapiSocketExecutor
+
+
+class NatConfigFlags(IntEnum):
+ """NAT plugin configuration flags"""
+ NAT_IS_NONE = 0x00
+ NAT_IS_TWICE_NAT = 0x01
+ NAT_IS_SELF_TWICE_NAT = 0x02
+ NAT_IS_OUT2IN_ONLY = 0x04
+ NAT_IS_ADDR_ONLY = 0x08
+ NAT_IS_OUTSIDE = 0x10
+ NAT_IS_INSIDE = 0x20
+ NAT_IS_STATIC = 0x40
+ NAT_IS_EXT_HOST_VALID = 0x80
+
+
+class Nat44ConfigFlags(IntEnum):
+ """NAT44 configuration flags"""
+ NAT44_IS_ENDPOINT_INDEPENDENT = 0x00
+ NAT44_IS_ENDPOINT_DEPENDENT = 0x01
+ NAT44_IS_STATIC_MAPPING_ONLY = 0x02
+ NAT44_IS_CONNECTION_TRACKING = 0x04
+ NAT44_IS_OUT2IN_DPO = 0x08
+
+
+class NatAddrPortAllocAlg(IntEnum):
+ """NAT Address and port assignment algorithms."""
+ NAT_ALLOC_ALG_DEFAULT = 0
+ NAT_ALLOC_ALG_MAP_E = 1
+ NAT_ALLOC_ALG_PORT_RANGE = 2
+
+
+class NATUtil:
"""This class defines the methods to set NAT."""
def __init__(self):
pass
+ @staticmethod
+ def enable_nat44_plugin(
+ node, inside_vrf=0, outside_vrf=0, users=0, user_memory=0,
+ sessions=0, session_memory=0, user_sessions=0, mode=u""):
+ """Enable NAT44 plugin.
+
+ :param node: DUT node.
+ :param inside_vrf: Inside VRF ID.
+ :param outside_vrf: Outside VRF ID.
+ :param users: Maximum number of users. Used only in endpoint-independent
+ mode.
+ :param user_memory: User memory size - overwrite auto calculated hash
+ allocation parameter if non-zero.
+ :param sessions: Maximum number of sessions.
+ :param session_memory: Session memory size - overwrite auto calculated
+ hash allocation parameter if non-zero.
+ :param user_sessions: Maximum number of sessions per user. Used only in
+ endpoint-independent mode.
+ :param mode: NAT44 mode. Valid values:
+ - endpoint-independent
+ - endpoint-dependent
+ - static-mapping-only
+ - connection-tracking
+ - out2in-dpo
+ :type node: dict
+ :type inside_vrf: str or int
+ :type outside_vrf: str or int
+ :type users: str or int
+ :type user_memory: str or int
+ :type sessions: str or int
+ :type session_memory: str or int
+ :type user_sessions: str or int
+ :type mode: str
+ """
+ cmd = u"nat44_plugin_enable_disable"
+ err_msg = f"Failed to enable NAT44 plugin on the host {node[u'host']}!"
+ args_in = dict(
+ enable=True,
+ inside_vrf=int(inside_vrf),
+ outside_vrf=int(outside_vrf),
+ users=int(users),
+ user_memory=int(user_memory),
+ sessions=int(sessions),
+ session_memory=int(session_memory),
+ user_sessions=int(user_sessions),
+ flags=getattr(
+ Nat44ConfigFlags,
+ f"NAT44_IS_{mode.replace(u'-', u'_').upper()}"
+ ).value
+ )
+
+ with PapiSocketExecutor(node) as papi_exec:
+ papi_exec.add(cmd, **args_in).get_reply(err_msg)
+
+ @staticmethod
+ def set_nat44_interface(node, interface, flag):
+ """Set inside and outside interfaces for NAT44.
+
+ :param node: DUT node.
+ :param interface: NAT44 interface.
+ :param flag: Interface NAT configuration flag name.
+ :type node: dict
+ :type interface: str
+ :type flag: str
+ """
+ cmd = u"nat44_interface_add_del_feature"
+
+ err_msg = f"Failed to set {flag} interface {interface} for NAT44 " \
+ f"on host {node[u'host']}"
+ args_in = dict(
+ sw_if_index=InterfaceUtil.get_sw_if_index(node, interface),
+ is_add=1,
+ flags=getattr(NatConfigFlags, flag).value
+ )
+
+ with PapiSocketExecutor(node) as papi_exec:
+ papi_exec.add(cmd, **args_in).get_reply(err_msg)
+
@staticmethod
def set_nat44_interfaces(node, int_in, int_out):
"""Set inside and outside interfaces for NAT44.
:type node: dict
:type int_in: str
:type int_out: str
- :returns: Response of the command.
- :rtype: str
- :raises RuntimeError: If setting of inside and outside interfaces for
- NAT44 fails.
"""
+ NATUtil.set_nat44_interface(node, int_in, u"NAT_IS_INSIDE")
+ NATUtil.set_nat44_interface(node, int_out, u"NAT_IS_OUTSIDE")
+
+ @staticmethod
+ def set_nat44_address_range(
+ node, start_ip, end_ip, vrf_id=Constants.BITWISE_NON_ZERO,
+ flag=u"NAT_IS_NONE"):
+ """Set NAT44 address range.
- try:
- with VatTerminal(node, json_param=False) as vat:
- response = vat.vat_terminal_exec_cmd_from_template(
- 'nat/nat44_set_interfaces.vat',
- int_in=int_in, int_out=int_out)
- return response
- except:
- raise RuntimeError("Setting of inside and outside interfaces for "
- "NAT failed!")
+ :param node: DUT node.
+ :param start_ip: IP range start.
+ :param end_ip: IP range end.
+ :param vrf_id: VRF index (Optional).
+ :param flag: NAT flag name.
+ :type node: dict
+ :type start_ip: str
+ :type end_ip: str
+ :type vrf_id: int
+ :type flag: str
+ """
+ cmd = u"nat44_add_del_address_range"
+ err_msg = f"Failed to set NAT44 address range on host {node[u'host']}"
+ args_in = dict(
+ is_add=True,
+ first_ip_address=IPv4Address(str(start_ip)).packed,
+ last_ip_address=IPv4Address(str(end_ip)).packed,
+ vrf_id=vrf_id,
+ flags=getattr(NatConfigFlags, flag).value
+ )
+
+ with PapiSocketExecutor(node) as papi_exec:
+ papi_exec.add(cmd, **args_in).get_reply(err_msg)
@staticmethod
- def set_nat44_deterministic(node, ip_in, subnet_in, ip_out, subnet_out):
- """Set deterministic behaviour of NAT44.
+ def show_nat_config(node):
+ """Show the NAT configuration.
:param node: DUT node.
- :param ip_in: Inside IP.
- :param subnet_in: Inside IP subnet.
- :param ip_out: Outside IP.
- :param subnet_out: Outside IP subnet.
:type node: dict
- :type ip_in: str
- :type subnet_in: str or int
- :type ip_out: str
- :type subnet_out: str or int
- :returns: Response of the command.
- :rtype: str
- :raises RuntimeError: If setting of deterministic behaviour of NAT44
- fails.
"""
+ cmd = u"nat_show_config"
+ err_msg = f"Failed to get NAT configuration on host {node[u'host']}"
+
+ with PapiSocketExecutor(node) as papi_exec:
+ reply = papi_exec.add(cmd).get_reply(err_msg)
- try:
- with VatTerminal(node, json_param=False) as vat:
- response = vat.vat_terminal_exec_cmd_from_template(
- 'nat/nat44_set_deterministic.vat',
- ip_in=ip_in, subnet_in=subnet_in,
- ip_out=ip_out, subnet_out=subnet_out)
- return response
- except:
- raise RuntimeError("Setting of deterministic behaviour of NAT "
- "failed!")
+ logger.debug(f"NAT Configuration:\n{pformat(reply)}")
@staticmethod
- def set_nat_workers(node, lcores):
- """Set NAT workers.
+ def show_nat44_summary(node):
+ """Show NAT44 summary on the specified topology node.
- :param node: DUT node.
- :param lcores: List of cores, format: range e.g. 1-5 or list of ranges
- e.g.: 1-5,18-22.
+ :param node: Topology node.
:type node: dict
- :type lcores: str
- :returns: Response of the command.
+ :returns: NAT44 summary data.
:rtype: str
- :raises RuntimeError: If setting of NAT workers fails.
"""
+ return PapiSocketExecutor.run_cli_cmd(node, u"show nat44 summary")
+
+ @staticmethod
+ def show_nat_base_data(node):
+ """Show the NAT base data.
+
+ Used data sources:
+
+ nat_worker_dump
+ nat44_interface_addr_dump
+ nat44_address_dump
+ nat44_static_mapping_dump
+ nat44_interface_dump
- try:
- with VatTerminal(node, json_param=False) as vat:
- response = vat.vat_terminal_exec_cmd_from_template(
- 'nat/nat_set_workers.vat', lcores=lcores)
- return response
- except:
- raise RuntimeError("Setting of NAT workers failed!")
+ :param node: DUT node.
+ :type node: dict
+ """
+ cmds = [
+ u"nat_worker_dump",
+ u"nat44_interface_addr_dump",
+ u"nat44_address_dump",
+ u"nat44_static_mapping_dump",
+ u"nat44_interface_dump",
+ ]
+ PapiSocketExecutor.dump_and_log(node, cmds)
@staticmethod
- def show_nat(node):
- """Show the NAT settings.
+ def show_nat_user_data(node):
+ """Show the NAT user data.
+
+ Used data sources:
+
+ nat44_user_dump
+ nat44_user_session_dump
:param node: DUT node.
:type node: dict
- :returns: Response of the command.
- :rtype: str
- :raises RuntimeError: If getting of NAT settings fails.
"""
+ cmds = [
+ u"nat44_user_dump",
+ u"nat44_user_session_dump",
+ ]
+ PapiSocketExecutor.dump_and_log(node, cmds)
- try:
- with VatTerminal(node, json_param=False) as vat:
- response = vat.vat_terminal_exec_cmd_from_template(
- 'nat/nat_show_nat.vat')
- return response
- except:
- raise RuntimeError("Getting of NAT settings failed!")
+ @staticmethod
+ def compute_max_translations_per_thread(sessions, threads):
+ """Compute value of max_translations_per_thread NAT44 parameter based on
+ total number of worker threads.
+
+ :param sessions: Required number of NAT44 sessions.
+ :param threads: Number of worker threads.
+ :type sessions: int
+ :type threads: int
+ :returns: Value of max_translations_per_thread NAT44 parameter.
+ :rtype: int
+ """
+ # vpp-device tests have not dedicated physical core so
+ # ${thr_count_int} == 0 but we need to use one thread
+ threads = 1 if not int(threads) else int(threads)
+ rest, mult = modf(log2(sessions/(10*threads)))
+ return 2 ** (int(mult) + (1 if rest else 0)) * 10
@staticmethod
- def show_nat44_deterministic_forward(node, ip_addr):
- """Show forward IP address and port(s).
+ def get_nat44_sessions_number(node, proto):
+ """Get number of established NAT44 sessions from actual NAT44 mapping
+ data.
:param node: DUT node.
- :param ip_addr: IP address.
+ :param proto: Required protocol - TCP/UDP/ICMP.
:type node: dict
- :type ip_addr: str
- :returns: Response of the command.
- :rtype: str
- :raises RuntimeError: If command 'exec snat deterministic forward'
- fails.
+ :type proto: str
+ :returns: Number of established NAT44 sessions.
+ :rtype: int
+ :raises ValueError: If not supported protocol.
+ """
+ nat44_data = dict()
+ if proto in [u"UDP", u"TCP", u"ICMP"]:
+ for line in NATUtil.show_nat44_summary(node).splitlines():
+ sum_k, sum_v = line.split(u":") if u":" in line \
+ else (line, None)
+ nat44_data[sum_k] = sum_v.strip() if isinstance(sum_v, str) \
+ else sum_v
+ else:
+ raise ValueError(f"Unsupported protocol: {proto}!")
+ return nat44_data.get(f"total {proto.lower()} sessions", 0)
+
+ # DET44 PAPI calls
+ # DET44 means deterministic mode of NAT44
+ @staticmethod
+ def enable_det44_plugin(node, inside_vrf=0, outside_vrf=0):
+ """Enable DET44 plugin.
+
+ :param node: DUT node.
+ :param inside_vrf: Inside VRF ID.
+ :param outside_vrf: Outside VRF ID.
+ :type node: dict
+ :type inside_vrf: str or int
+ :type outside_vrf: str or int
"""
+ cmd = u"det44_plugin_enable_disable"
+ err_msg = f"Failed to enable DET44 plugin on the host {node[u'host']}!"
+ args_in = dict(
+ enable=True,
+ inside_vrf=int(inside_vrf),
+ outside_vrf=int(outside_vrf)
+ )
- try:
- with VatTerminal(node, json_param=False) as vat:
- response = vat.vat_terminal_exec_cmd_from_template(
- 'nat/nat44_deterministic_forward.vat', ip=ip_addr)
- return response
- except:
- raise RuntimeError("Command 'exec nat44 deterministic forward {ip}'"
- " failed!".format(ip=ip_addr))
+ with PapiSocketExecutor(node) as papi_exec:
+ papi_exec.add(cmd, **args_in).get_reply(err_msg)
@staticmethod
- def show_nat44_deterministic_reverse(node, ip_addr, port):
- """Show reverse IP address.
+ def set_det44_interface(node, if_key, is_inside):
+ """Enable DET44 feature on the interface.
:param node: DUT node.
- :param ip_addr: IP address.
- :param port: Port.
+ :param if_key: Interface key from topology file of interface
+ to enable DET44 feature on.
+ :param is_inside: True if interface is inside, False if outside.
:type node: dict
- :type ip_addr: str
- :type port: str or int
- :returns: Response of the command.
- :rtype: str
- :raises RuntimeError: If command 'exec snat deterministic reverse'
- fails.
+ :type if_key: str
+ :type is_inside: bool
"""
+ cmd = u"det44_interface_add_del_feature"
+ err_msg = f"Failed to enable DET44 feature on the interface {if_key} " \
+ f"on the host {node[u'host']}!"
+ args_in = dict(
+ is_add=True,
+ is_inside=is_inside,
+ sw_if_index=Topology.get_interface_sw_index(node, if_key)
+ )
+
+ with PapiSocketExecutor(node) as papi_exec:
+ papi_exec.add(cmd, **args_in).get_reply(err_msg)
- try:
- with VatTerminal(node, json_param=False) as vat:
- response = vat.vat_terminal_exec_cmd_from_template(
- 'nat/nat44_deterministic_reverse.vat',
- ip=ip_addr, port=port)
- return response
- except:
- raise RuntimeError(
- "Command 'exec nat44 deterministic reverse {ip}:{port}'"
- " failed!".format(ip=ip_addr, port=port))
+ @staticmethod
+ def set_det44_mapping(node, ip_in, subnet_in, ip_out, subnet_out):
+ """Set DET44 mapping.
+
+ :param node: DUT node.
+ :param ip_in: Inside IP.
+ :param subnet_in: Inside IP subnet.
+ :param ip_out: Outside IP.
+ :param subnet_out: Outside IP subnet.
+ :type node: dict
+ :type ip_in: str
+ :type subnet_in: str or int
+ :type ip_out: str
+ :type subnet_out: str or int
+ """
+ cmd = u"det44_add_del_map"
+ err_msg = f"Failed to set DET44 mapping on the host {node[u'host']}!"
+ args_in = dict(
+ is_add=True,
+ in_addr=IPv4Address(str(ip_in)).packed,
+ in_plen=int(subnet_in),
+ out_addr=IPv4Address(str(ip_out)).packed,
+ out_plen=int(subnet_out)
+ )
+
+ with PapiSocketExecutor(node) as papi_exec:
+ papi_exec.add(cmd, **args_in).get_reply(err_msg)
@staticmethod
- def get_nat_static_mappings(node):
- """Get NAT static mappings from VPP node.
+ def get_det44_mapping(node):
+ """Get DET44 mapping data.
- :param node: VPP node.
+ :param node: DUT node.
:type node: dict
- :returns: List of static mappings.
- :rtype: list
- :raises RuntimeError: If the output is not as expected.
+ :returns: Dictionary of DET44 mapping data.
+ :rtype: dict
"""
+ cmd = u"det44_map_dump"
+ err_msg = f"Failed to get DET44 mapping data on the host " \
+ f"{node[u'host']}!"
+ args_in = dict()
+ with PapiSocketExecutor(node) as papi_exec:
+ details = papi_exec.add(cmd, **args_in).get_reply(err_msg)
- vat = VatExecutor()
- # JSON output not supported for this command
- vat.execute_script('nat/snat_mapping_dump.vat', node, json_out=False)
-
- stdout = vat.get_script_stdout()
- lines = stdout.split("\n")
-
- data = []
- # lines[0,1] are table and column headers
- for line in lines[2::]:
- # Ignore extra data after NAT table
- if "snat_static_mapping_dump error: Misc" in line or "vat#" in line:
- continue
- items = line.split(" ")
- while "" in items:
- items.remove("")
- if not items:
- continue
- if len(items) == 4:
- # no ports were returned
- data.append({
- "local_address": items[0],
- "remote_address": items[1],
- "vrf": items[2],
- "protocol": items[3]
- })
- elif len(items) == 6:
- data.append({
- "local_address": items[0],
- "local_port": items[1],
- "remote_address": items[2],
- "remote_port": items[3],
- "vrf": items[4],
- "protocol": items[5]
- })
- else:
- raise RuntimeError("Unexpected output from snat_mapping_dump.")
-
- return data
+ return details
@staticmethod
- def get_nat_interfaces(node):
- """Get list of interfaces configured with NAT from VPP node.
+ def get_det44_sessions_number(node):
+ """Get number of established DET44 sessions from actual DET44 mapping
+ data.
- :param node: VPP node.
+ :param node: DUT node.
:type node: dict
- :returns: List of interfaces on the node that are configured with NAT.
- :rtype: list
- :raises RuntimeError: If the output is not as expected.
+ :returns: Number of established DET44 sessions.
+ :rtype: int
"""
+ det44_data = NATUtil.get_det44_mapping(node)
+
+ return det44_data.get(u"ses_num", 0)
+
+ @staticmethod
+ def show_det44(node):
+ """Show DET44 data.
- vat = VatExecutor()
- # JSON output not supported for this command
- vat.execute_script('nat/snat_interface_dump.vat', node,
- json_out=False)
-
- stdout = vat.get_script_stdout()
- lines = stdout.split("\n")
-
- data = []
- for line in lines:
- items = line.split(" ")
- for trash in ("", "vat#"):
- while trash in items:
- items.remove(trash)
- if not items:
- continue
- if len(items) == 3:
- data.append({
- # items[0] is the table header - "sw_if_index"
- "sw_if_index": items[1],
- "direction": items[2]
- })
- else:
- raise RuntimeError(
- "Unexpected output from snat_interface_dump.")
-
- return data
+ Used data sources:
+
+ det44_interface_dump
+ det44_map_dump
+ det44_session_dump
+
+ :param node: DUT node.
+ :type node: dict
+ """
+ cmds = [
+ u"det44_interface_dump",
+ u"det44_map_dump",
+ u"det44_session_dump",
+ ]
+ PapiSocketExecutor.dump_and_log(node, cmds)