X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=resources%2Flibraries%2Fpython%2FNATUtil.py;h=857870393e648bdefa1410632f2ce1a993c6a1d2;hb=3289b49cd2f08f9aae43959e0d5778617379388f;hp=ea13c9411fd58a7107e74a440c65f9d47e66bd1a;hpb=2bdb1dc59ba736546a6663cbaf55b6edbcac6eab;p=csit.git diff --git a/resources/libraries/python/NATUtil.py b/resources/libraries/python/NATUtil.py index ea13c9411f..857870393e 100644 --- a/resources/libraries/python/NATUtil.py +++ b/resources/libraries/python/NATUtil.py @@ -1,4 +1,4 @@ -# Copyright (c) 2017 Cisco and/or its affiliates. +# Copyright (c) 2020 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: @@ -13,15 +13,68 @@ """NAT utilities library.""" -from resources.libraries.python.VatExecutor import VatTerminal, VatExecutor +from pprint import pformat +from enum import IntEnum +from ipaddress import IPv4Address +from robot.api import logger -class NATUtil(object): +from resources.libraries.python.Constants import Constants +from resources.libraries.python.InterfaceUtil import InterfaceUtil +from resources.libraries.python.topology import Topology +from resources.libraries.python.PapiExecutor import PapiSocketExecutor + + +class NatConfigFlags(IntEnum): + """Common NAT plugin APIs""" + NAT_IS_NONE = 0x00 + NAT_IS_TWICE_NAT = 0x01 + NAT_IS_SELF_TWICE_NAT = 0x02 + NAT_IS_OUT2IN_ONLY = 0x04 + NAT_IS_ADDR_ONLY = 0x08 + NAT_IS_OUTSIDE = 0x10 + NAT_IS_INSIDE = 0x20 + NAT_IS_STATIC = 0x40 + NAT_IS_EXT_HOST_VALID = 0x80 + + +class NatAddrPortAllocAlg(IntEnum): + """NAT Address and port assignment algorithms.""" + NAT_ALLOC_ALG_DEFAULT = 0 + NAT_ALLOC_ALG_MAP_E = 1 + NAT_ALLOC_ALG_PORT_RANGE = 2 + + +class NATUtil: """This class defines the methods to set NAT.""" def __init__(self): pass + @staticmethod + def set_nat44_interface(node, interface, flag): + """Set inside and outside interfaces for NAT44. + + :param node: DUT node. + :param interface: NAT44 interface. + :param flag: Interface NAT configuration flag name. + :type node: dict + :type interface: str + :type flag: str + """ + cmd = u"nat44_interface_add_del_feature" + + err_msg = f"Failed to set {flag} interface {interface} for NAT44 " \ + f"on host {node[u'host']}" + args_in = dict( + sw_if_index=InterfaceUtil.get_sw_if_index(node, interface), + is_add=1, + flags=getattr(NatConfigFlags, flag).value + ) + + with PapiSocketExecutor(node) as papi_exec: + papi_exec.add(cmd, **args_in).get_reply(err_msg) + @staticmethod def set_nat44_interfaces(node, int_in, int_out): """Set inside and outside interfaces for NAT44. @@ -32,230 +85,230 @@ class NATUtil(object): :type node: dict :type int_in: str :type int_out: str - :returns: Response of the command. - :rtype: str - :raises RuntimeError: If setting of inside and outside interfaces for - NAT44 fails. """ + NATUtil.set_nat44_interface(node, int_in, u"NAT_IS_INSIDE") + NATUtil.set_nat44_interface(node, int_out, u"NAT_IS_OUTSIDE") - try: - with VatTerminal(node, json_param=False) as vat: - response = vat.vat_terminal_exec_cmd_from_template( - 'nat/nat44_set_interfaces.vat', - int_in=int_in, int_out=int_out) - return response - except: - raise RuntimeError("Setting of inside and outside interfaces for " - "NAT failed!") + @staticmethod + def set_nat44_address_range( + node, start_ip, end_ip, vrf_id=Constants.BITWISE_NON_ZERO, + flag=u"NAT_IS_NONE"): + """Set NAT44 address range. + + :param node: DUT node. + :param start_ip: IP range start. + :param end_ip: IP range end. + :param vrf_id: VRF index (Optional). + :param flag: NAT flag name. + :type node: dict + :type start_ip: str + :type end_ip: str + :type vrf_id: int + :type flag: str + """ + cmd = u"nat44_add_del_address_range" + err_msg = f"Failed to set NAT44 address range on host {node[u'host']}" + args_in = dict( + is_add=True, + first_ip_address=IPv4Address(str(start_ip)).packed, + last_ip_address=IPv4Address(str(end_ip)).packed, + vrf_id=vrf_id, + flags=getattr(NatConfigFlags, flag).value + ) + + with PapiSocketExecutor(node) as papi_exec: + papi_exec.add(cmd, **args_in).get_reply(err_msg) @staticmethod - def set_nat44_deterministic(node, ip_in, subnet_in, ip_out, subnet_out): - """Set deterministic behaviour of NAT44. + def show_nat_config(node): + """Show the NAT configuration. :param node: DUT node. - :param ip_in: Inside IP. - :param subnet_in: Inside IP subnet. - :param ip_out: Outside IP. - :param subnet_out: Outside IP subnet. :type node: dict - :type ip_in: str - :type subnet_in: str or int - :type ip_out: str - :type subnet_out: str or int - :returns: Response of the command. - :rtype: str - :raises RuntimeError: If setting of deterministic behaviour of NAT44 - fails. """ + cmd = u"nat_show_config" + err_msg = f"Failed to get NAT configuration on host {node[u'host']}" + + with PapiSocketExecutor(node) as papi_exec: + reply = papi_exec.add(cmd).get_reply(err_msg) + + logger.debug(f"NAT Configuration:\n{pformat(reply)}") - try: - with VatTerminal(node, json_param=False) as vat: - response = vat.vat_terminal_exec_cmd_from_template( - 'nat/nat44_set_deterministic.vat', - ip_in=ip_in, subnet_in=subnet_in, - ip_out=ip_out, subnet_out=subnet_out) - return response - except: - raise RuntimeError("Setting of deterministic behaviour of NAT " - "failed!") + @staticmethod + def show_nat44_summary(node): + """Show NAT44 summary on the specified topology node. + + :param node: Topology node. + :type node: dict + """ + PapiSocketExecutor.run_cli_cmd(node, u"show nat44 summary") @staticmethod - def set_nat_workers(node, lcores): - """Set NAT workers. + def show_nat_base_data(node): + """Show the NAT base data. + + Used data sources: + + nat_worker_dump + nat44_interface_addr_dump + nat44_address_dump + nat44_static_mapping_dump + nat44_interface_dump :param node: DUT node. - :param lcores: list of cores, format: range e.g. 1-5 or list of ranges - e.g.: 1-5,18-22. :type node: dict - :type lcores: str - :returns: Response of the command. - :rtype: str - :raises RuntimeError: If setting of NAT workers fails. """ + cmds = [ + u"nat_worker_dump", + u"nat44_interface_addr_dump", + u"nat44_address_dump", + u"nat44_static_mapping_dump", + u"nat44_interface_dump", + ] + PapiSocketExecutor.dump_and_log(node, cmds) + + @staticmethod + def show_nat_user_data(node): + """Show the NAT user data. - try: - with VatTerminal(node, json_param=False) as vat: - response = vat.vat_terminal_exec_cmd_from_template( - 'nat/nat_set_workers.vat', lcores=lcores) - return response - except: - raise RuntimeError("Setting of NAT workers failed!") + Used data sources: + nat44_user_dump + nat44_user_session_dump + + :param node: DUT node. + :type node: dict + """ + cmds = [ + u"nat44_user_dump", + u"nat44_user_session_dump", + ] + PapiSocketExecutor.dump_and_log(node, cmds) + + # DET44 PAPI calls + # DET44 means deterministic mode of NAT44 @staticmethod - def show_nat(node): - """Show the NAT settings. + def enable_det44_plugin(node, inside_vrf=0, outside_vrf=0): + """Enable DET44 plugin. :param node: DUT node. + :param inside_vrf: Inside VRF ID. + :param outside_vrf: Outside VRF ID. :type node: dict - :returns: Response of the command. - :rtype: str - :raises RuntimeError: If getting of NAT settings fails. + :type inside_vrf: str or int + :type outside_vrf: str or int """ + cmd = u"det44_plugin_enable_disable" + err_msg = f"Failed to enable DET44 plugin on the host {node[u'host']}!" + args_in = dict( + enable=True, + inside_vrf=int(inside_vrf), + outside_vrf=int(outside_vrf) + ) - try: - with VatTerminal(node, json_param=False) as vat: - response = vat.vat_terminal_exec_cmd_from_template( - 'nat/nat_show_nat.vat') - return response - except: - raise RuntimeError("Getting of NAT settings failed!") + with PapiSocketExecutor(node) as papi_exec: + papi_exec.add(cmd, **args_in).get_reply(err_msg) @staticmethod - def show_nat44_deterministic_forward(node, ip_addr): - """Show forward IP address and port(s). + def set_det44_interface(node, if_key, is_inside): + """Enable DET44 feature on the interface. :param node: DUT node. - :param ip_addr: IP address. + :param if_key: Interface key from topology file of interface + to enable DET44 feature on. + :param is_inside: True if interface is inside, False if outside. :type node: dict - :type ip_addr: str - :returns: Response of the command. - :rtype: str - :raises RuntimeError: If command 'exec snat deterministic forward' - fails. + :type if_key: str + :type is_inside: bool """ + cmd = u"det44_interface_add_del_feature" + err_msg = f"Failed to enable DET44 feature on the interface {if_key} " \ + f"on the host {node[u'host']}!" + args_in = dict( + is_add=True, + is_inside=is_inside, + sw_if_index=Topology.get_interface_sw_index(node, if_key) + ) + + with PapiSocketExecutor(node) as papi_exec: + papi_exec.add(cmd, **args_in).get_reply(err_msg) - try: - with VatTerminal(node, json_param=False) as vat: - response = vat.vat_terminal_exec_cmd_from_template( - 'nat/nat44_deterministic_forward.vat', ip=ip_addr) - return response - except: - raise RuntimeError("Command 'exec nat44 deterministic forward {ip}'" - " failed!".format(ip=ip_addr)) + @staticmethod + def set_det44_mapping(node, ip_in, subnet_in, ip_out, subnet_out): + """Set DET44 mapping. + + :param node: DUT node. + :param ip_in: Inside IP. + :param subnet_in: Inside IP subnet. + :param ip_out: Outside IP. + :param subnet_out: Outside IP subnet. + :type node: dict + :type ip_in: str + :type subnet_in: str or int + :type ip_out: str + :type subnet_out: str or int + """ + cmd = u"det44_add_del_map" + err_msg = f"Failed to set DET44 mapping on the host {node[u'host']}!" + args_in = dict( + is_add=True, + in_addr=IPv4Address(str(ip_in)).packed, + in_plen=int(subnet_in), + out_addr=IPv4Address(str(ip_out)).packed, + out_plen=int(subnet_out) + ) + + with PapiSocketExecutor(node) as papi_exec: + papi_exec.add(cmd, **args_in).get_reply(err_msg) @staticmethod - def show_nat44_deterministic_reverse(node, ip_addr, port): - """Show reverse IP address. + def get_det44_mapping(node): + """Get DET44 mapping data. :param node: DUT node. - :param ip_addr: IP address. - :param port: Port. :type node: dict - :type ip_addr: str - :type port: str or int - :returns: Response of the command. - :rtype: str - :raises RuntimeError: If command 'exec snat deterministic reverse' - fails. + :returns: Dictionary of DET44 mapping data. + :rtype: dict """ + cmd = u"det44_map_dump" + err_msg = f"Failed to get DET44 mapping data on the host " \ + f"{node[u'host']}!" + args_in = dict() + with PapiSocketExecutor(node) as papi_exec: + details = papi_exec.add(cmd, **args_in).get_reply(err_msg) - try: - with VatTerminal(node, json_param=False) as vat: - response = vat.vat_terminal_exec_cmd_from_template( - 'nat/nat44_deterministic_reverse.vat', - ip=ip_addr, port=port) - return response - except: - raise RuntimeError( - "Command 'exec nat44 deterministic reverse {ip}:{port}'" - " failed!".format(ip=ip_addr, port=port)) + return details @staticmethod - def get_nat_static_mappings(node): - """Get NAT static mappings from VPP node. + def get_det44_sessions_number(node): + """Get number of established DET44 sessions from actual DET44 mapping + data. - :param node: VPP node. + :param node: DUT node. :type node: dict - :returns: List of static mappings. - :rtype: list - :raises RuntimeError: If the output is not as expected. + :returns: Number of established DET44 sessions. + :rtype: int """ + det44_data = NATUtil.get_det44_mapping(node) - vat = VatExecutor() - # JSON output not supported for this command - vat.execute_script('nat/snat_mapping_dump.vat', node, json_out=False) - - stdout = vat.get_script_stdout() - lines = stdout.split("\n") - - data = [] - # lines[0,1] are table and column headers - for line in lines[2::]: - # Ignore extra data after NAT table - if "snat_static_mapping_dump error: Misc" in line or "vat#" in line: - continue - items = line.split(" ") - while "" in items: - items.remove("") - if len(items) == 0: - continue - elif len(items) == 4: - # no ports were returned - data.append({ - "local_address": items[0], - "remote_address": items[1], - "vrf": items[2], - "protocol": items[3] - }) - elif len(items) == 6: - data.append({ - "local_address": items[0], - "local_port": items[1], - "remote_address": items[2], - "remote_port": items[3], - "vrf": items[4], - "protocol": items[5] - }) - else: - raise RuntimeError("Unexpected output from snat_mapping_dump.") - - return data + return det44_data.get(u"ses_num", 0) @staticmethod - def get_nat_interfaces(node): - """Get list of interfaces configured with NAT from VPP node. + def show_det44(node): + """Show DET44 data. + + Used data sources: - :param node: VPP node. + det44_interface_dump + det44_map_dump + det44_session_dump + + :param node: DUT node. :type node: dict - :returns: List of interfaces on the node that are configured with NAT. - :rtype: list - :raises RuntimeError: If the output is not as expected. """ - - vat = VatExecutor() - # JSON output not supported for this command - vat.execute_script('nat/snat_interface_dump.vat', node, - json_out=False) - - stdout = vat.get_script_stdout() - lines = stdout.split("\n") - - data = [] - for line in lines: - items = line.split(" ") - for trash in ("", "vat#"): - while trash in items: - items.remove(trash) - if len(items) == 0: - continue - elif len(items) == 3: - data.append({ - # items[0] is the table header - "sw_if_index" - "sw_if_index": items[1], - "direction": items[2] - }) - else: - raise RuntimeError( - "Unexpected output from snat_interface_dump.") - - return data + cmds = [ + u"det44_interface_dump", + u"det44_map_dump", + u"det44_session_dump", + ] + PapiSocketExecutor.dump_and_log(node, cmds)