+ # A closure, accessing the variables above.
+ def resetter():
+ """Delete and re-add the NAT range setting."""
+ with PapiSocketExecutor(node) as papi_exec:
+ args_in[u"is_add"] = False
+ papi_exec.add(cmd, **args_in)
+ args_in[u"is_add"] = True
+ papi_exec.add(cmd, **args_in)
+ papi_exec.get_replies(err_msg)
+
+ return resetter
+
+ @staticmethod
+ def show_nat44_config(node):
+ """Show the NAT44 plugin running configuration.
+
+ :param node: DUT node.
+ :type node: dict
+ """
+ cmd = u"nat44_show_running_config"
+ err_msg = f"Failed to get NAT44 configuration on host {node[u'host']}"
+
+ with PapiSocketExecutor(node) as papi_exec:
+ reply = papi_exec.add(cmd).get_reply(err_msg)
+
+ logger.debug(f"NAT44 Configuration:\n{pformat(reply)}")
+
+ @staticmethod
+ def show_nat44_summary(node):
+ """Show NAT44 summary on the specified topology node.
+
+ :param node: Topology node.
+ :type node: dict
+ :returns: NAT44 summary data.
+ :rtype: str
+ """
+ return PapiSocketExecutor.run_cli_cmd(node, u"show nat44 summary")
+
+ @staticmethod
+ def show_nat_base_data(node):
+ """Show the NAT base data.
+
+ Used data sources:
+
+ nat_worker_dump
+ nat44_interface_addr_dump
+ nat44_address_dump
+ nat44_static_mapping_dump
+ nat44_interface_dump
+
+ :param node: DUT node.
+ :type node: dict
+ """
+ cmds = [
+ u"nat_worker_dump",
+ u"nat44_interface_addr_dump",
+ u"nat44_address_dump",
+ u"nat44_static_mapping_dump",
+ u"nat44_interface_dump",
+ ]
+ PapiSocketExecutor.dump_and_log(node, cmds)
+
+ @staticmethod
+ def show_nat_user_data(node):
+ """Show the NAT user data.
+
+ Used data sources:
+
+ nat44_user_dump
+ nat44_user_session_dump
+
+ :param node: DUT node.
+ :type node: dict
+ """
+ cmds = [
+ u"nat44_user_dump",
+ u"nat44_user_session_dump",
+ ]
+ PapiSocketExecutor.dump_and_log(node, cmds)
+
+ @staticmethod
+ def compute_max_translations_per_thread(sessions, threads):
+ """Compute value of max_translations_per_thread NAT44 parameter based on
+ total number of worker threads.
+
+ :param sessions: Required number of NAT44 sessions.
+ :param threads: Number of worker threads.
+ :type sessions: int
+ :type threads: int
+ :returns: Value of max_translations_per_thread NAT44 parameter.
+ :rtype: int
+ """
+ # vpp-device tests have not dedicated physical core so
+ # ${dp_count_int} == 0 but we need to use one thread
+ threads = 1 if not int(threads) else int(threads)
+ rest, mult = modf(log2(sessions/(10*threads)))
+ return 2 ** (int(mult) + (1 if rest else 0)) * 10
+
+ @staticmethod
+ def get_nat44_sessions_number(node, proto):
+ """Get number of established NAT44 sessions from actual NAT44 mapping
+ data.
+
+ :param node: DUT node.
+ :param proto: Required protocol - TCP/UDP/ICMP.
+ :type node: dict
+ :type proto: str
+ :returns: Number of established NAT44 sessions.
+ :rtype: int
+ :raises ValueError: If not supported protocol.
+ """
+ nat44_data = dict()
+ if proto in [u"UDP", u"TCP", u"ICMP"]:
+ for line in NATUtil.show_nat44_summary(node).splitlines():
+ sum_k, sum_v = line.split(u":") if u":" in line \
+ else (line, None)
+ nat44_data[sum_k] = sum_v.strip() if isinstance(sum_v, str) \
+ else sum_v
+ else:
+ raise ValueError(f"Unsupported protocol: {proto}!")
+ return nat44_data.get(f"total {proto.lower()} sessions", 0)
+
+ # DET44 PAPI calls
+ # DET44 means deterministic mode of NAT44
+ @staticmethod
+ def enable_det44_plugin(node, inside_vrf=0, outside_vrf=0):
+ """Enable DET44 plugin.
+
+ :param node: DUT node.
+ :param inside_vrf: Inside VRF ID.
+ :param outside_vrf: Outside VRF ID.
+ :type node: dict
+ :type inside_vrf: str or int
+ :type outside_vrf: str or int
+ """
+ cmd = u"det44_plugin_enable_disable"
+ err_msg = f"Failed to enable DET44 plugin on the host {node[u'host']}!"