1 # Copyright (c) 2020 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
6 # http://www.apache.org/licenses/LICENSE-2.0
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
14 """NAT utilities library."""
16 from math import log2, modf
17 from pprint import pformat
18 from enum import IntEnum
20 from ipaddress import IPv4Address
21 from robot.api import logger
23 from resources.libraries.python.Constants import Constants
24 from resources.libraries.python.InterfaceUtil import InterfaceUtil
25 from resources.libraries.python.topology import Topology
26 from resources.libraries.python.PapiExecutor import PapiSocketExecutor
29 class NatConfigFlags(IntEnum):
30 """Common NAT plugin APIs"""
32 NAT_IS_TWICE_NAT = 0x01
33 NAT_IS_SELF_TWICE_NAT = 0x02
34 NAT_IS_OUT2IN_ONLY = 0x04
35 NAT_IS_ADDR_ONLY = 0x08
39 NAT_IS_EXT_HOST_VALID = 0x80
42 class NatAddrPortAllocAlg(IntEnum):
43 """NAT Address and port assignment algorithms."""
44 NAT_ALLOC_ALG_DEFAULT = 0
45 NAT_ALLOC_ALG_MAP_E = 1
46 NAT_ALLOC_ALG_PORT_RANGE = 2
50 """This class defines the methods to set NAT."""
56 def set_nat44_interface(node, interface, flag):
57 """Set inside and outside interfaces for NAT44.
59 :param node: DUT node.
60 :param interface: NAT44 interface.
61 :param flag: Interface NAT configuration flag name.
66 cmd = u"nat44_interface_add_del_feature"
68 err_msg = f"Failed to set {flag} interface {interface} for NAT44 " \
69 f"on host {node[u'host']}"
71 sw_if_index=InterfaceUtil.get_sw_if_index(node, interface),
73 flags=getattr(NatConfigFlags, flag).value
76 with PapiSocketExecutor(node) as papi_exec:
77 papi_exec.add(cmd, **args_in).get_reply(err_msg)
80 def set_nat44_interfaces(node, int_in, int_out):
81 """Set inside and outside interfaces for NAT44.
83 :param node: DUT node.
84 :param int_in: Inside interface.
85 :param int_out: Outside interface.
90 NATUtil.set_nat44_interface(node, int_in, u"NAT_IS_INSIDE")
91 NATUtil.set_nat44_interface(node, int_out, u"NAT_IS_OUTSIDE")
94 def set_nat44_address_range(
95 node, start_ip, end_ip, vrf_id=Constants.BITWISE_NON_ZERO,
97 """Set NAT44 address range.
99 The return value is a callable (zero argument Python function)
100 which can be used to reset NAT state, so repeated trial measurements
101 hit the same slow path.
103 :param node: DUT node.
104 :param start_ip: IP range start.
105 :param end_ip: IP range end.
106 :param vrf_id: VRF index (Optional).
107 :param flag: NAT flag name.
113 :returns: Resetter of the NAT state.
114 :rtype: Callable[[], None]
116 cmd = u"nat44_add_del_address_range"
117 err_msg = f"Failed to set NAT44 address range on host {node[u'host']}"
120 first_ip_address=IPv4Address(str(start_ip)).packed,
121 last_ip_address=IPv4Address(str(end_ip)).packed,
123 flags=getattr(NatConfigFlags, flag).value
126 with PapiSocketExecutor(node) as papi_exec:
127 papi_exec.add(cmd, **args_in).get_reply(err_msg)
129 # A closure, accessing the variables above.
131 """Delete and re-add the NAT range setting."""
132 with PapiSocketExecutor(node) as papi_exec:
133 args_in[u"is_add"] = False
134 papi_exec.add(cmd, **args_in)
135 args_in[u"is_add"] = True
136 papi_exec.add(cmd, **args_in)
137 papi_exec.get_replies(err_msg)
142 def show_nat_config(node):
143 """Show the NAT configuration.
145 :param node: DUT node.
148 cmd = u"nat_show_config"
149 err_msg = f"Failed to get NAT configuration on host {node[u'host']}"
151 with PapiSocketExecutor(node) as papi_exec:
152 reply = papi_exec.add(cmd).get_reply(err_msg)
154 logger.debug(f"NAT Configuration:\n{pformat(reply)}")
157 def show_nat44_summary(node):
158 """Show NAT44 summary on the specified topology node.
160 :param node: Topology node.
162 :returns: NAT44 summary data.
165 return PapiSocketExecutor.run_cli_cmd(node, u"show nat44 summary")
168 def show_nat_base_data(node):
169 """Show the NAT base data.
174 nat44_interface_addr_dump
176 nat44_static_mapping_dump
179 :param node: DUT node.
184 u"nat44_interface_addr_dump",
185 u"nat44_address_dump",
186 u"nat44_static_mapping_dump",
187 u"nat44_interface_dump",
189 PapiSocketExecutor.dump_and_log(node, cmds)
192 def show_nat_user_data(node):
193 """Show the NAT user data.
198 nat44_user_session_dump
200 :param node: DUT node.
205 u"nat44_user_session_dump",
207 PapiSocketExecutor.dump_and_log(node, cmds)
210 def compute_max_translations_per_thread(sessions, threads):
211 """Compute value of max_translations_per_thread NAT44 parameter based on
212 total number of worker threads.
214 :param sessions: Required number of NAT44 sessions.
215 :param threads: Number of worker threads.
218 :returns: Value of max_translations_per_thread NAT44 parameter.
221 rest, mult = modf(log2(sessions/(10*threads)))
222 return 2 ** (int(mult) + (1 if rest else 0)) * 10
225 def get_nat44_sessions_number(node, proto):
226 """Get number of established NAT44 sessions from actual NAT44 mapping
229 :param node: DUT node.
230 :param proto: Required protocol - TCP/UDP/ICMP.
233 :returns: Number of established NAT44 sessions.
235 :raises ValueError: If not supported protocol.
238 if proto in [u"UDP", u"TCP", u"ICMP"]:
239 for line in NATUtil.show_nat44_summary(node).splitlines():
240 sum_k, sum_v = line.split(u":") if u":" in line \
242 nat44_data[sum_k] = sum_v.strip() if isinstance(sum_v, str) \
245 raise ValueError(f"Unsupported protocol: {proto}!")
246 return nat44_data.get(f"total {proto.lower()} sessions", 0)
249 # DET44 means deterministic mode of NAT44
251 def enable_det44_plugin(node, inside_vrf=0, outside_vrf=0):
252 """Enable DET44 plugin.
254 :param node: DUT node.
255 :param inside_vrf: Inside VRF ID.
256 :param outside_vrf: Outside VRF ID.
258 :type inside_vrf: str or int
259 :type outside_vrf: str or int
261 cmd = u"det44_plugin_enable_disable"
262 err_msg = f"Failed to enable DET44 plugin on the host {node[u'host']}!"
265 inside_vrf=int(inside_vrf),
266 outside_vrf=int(outside_vrf)
269 with PapiSocketExecutor(node) as papi_exec:
270 papi_exec.add(cmd, **args_in).get_reply(err_msg)
273 def set_det44_interface(node, if_key, is_inside):
274 """Enable DET44 feature on the interface.
276 :param node: DUT node.
277 :param if_key: Interface key from topology file of interface
278 to enable DET44 feature on.
279 :param is_inside: True if interface is inside, False if outside.
282 :type is_inside: bool
284 cmd = u"det44_interface_add_del_feature"
285 err_msg = f"Failed to enable DET44 feature on the interface {if_key} " \
286 f"on the host {node[u'host']}!"
290 sw_if_index=Topology.get_interface_sw_index(node, if_key)
293 with PapiSocketExecutor(node) as papi_exec:
294 papi_exec.add(cmd, **args_in).get_reply(err_msg)
297 def set_det44_mapping(node, ip_in, subnet_in, ip_out, subnet_out):
298 """Set DET44 mapping.
300 The return value is a callable (zero argument Python function)
301 which can be used to reset NAT state, so repeated trial measurements
302 hit the same slow path.
304 :param node: DUT node.
305 :param ip_in: Inside IP.
306 :param subnet_in: Inside IP subnet.
307 :param ip_out: Outside IP.
308 :param subnet_out: Outside IP subnet.
311 :type subnet_in: str or int
313 :type subnet_out: str or int
315 cmd = u"det44_add_del_map"
316 err_msg = f"Failed to set DET44 mapping on the host {node[u'host']}!"
319 in_addr=IPv4Address(str(ip_in)).packed,
320 in_plen=int(subnet_in),
321 out_addr=IPv4Address(str(ip_out)).packed,
322 out_plen=int(subnet_out)
325 with PapiSocketExecutor(node) as papi_exec:
326 papi_exec.add(cmd, **args_in).get_reply(err_msg)
328 # A closure, accessing the variables above.
330 """Delete and re-add the deterministic NAT mapping."""
331 with PapiSocketExecutor(node) as papi_exec:
332 args_in[u"is_add"] = False
333 papi_exec.add(cmd, **args_in)
334 args_in[u"is_add"] = True
335 papi_exec.add(cmd, **args_in)
336 papi_exec.get_replies(err_msg)
341 def get_det44_mapping(node):
342 """Get DET44 mapping data.
344 :param node: DUT node.
346 :returns: Dictionary of DET44 mapping data.
349 cmd = u"det44_map_dump"
350 err_msg = f"Failed to get DET44 mapping data on the host " \
353 with PapiSocketExecutor(node) as papi_exec:
354 details = papi_exec.add(cmd, **args_in).get_reply(err_msg)
359 def get_det44_sessions_number(node):
360 """Get number of established DET44 sessions from actual DET44 mapping
362 :param node: DUT node.
364 :returns: Number of established DET44 sessions.
367 det44_data = NATUtil.get_det44_mapping(node)
368 return det44_data.get(u"ses_num", 0)
371 def show_det44(node):
380 :param node: DUT node.
384 u"det44_interface_dump",
386 u"det44_session_dump",
388 PapiSocketExecutor.dump_and_log(node, cmds)