1 # Copyright (c) 2020 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
6 # http://www.apache.org/licenses/LICENSE-2.0
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
14 """NAT utilities library."""
16 from math import log2, modf
17 from pprint import pformat
18 from enum import IntEnum
20 from ipaddress import IPv4Address
21 from robot.api import logger
23 from resources.libraries.python.Constants import Constants
24 from resources.libraries.python.InterfaceUtil import InterfaceUtil
25 from resources.libraries.python.topology import Topology
26 from resources.libraries.python.PapiExecutor import PapiSocketExecutor
29 class NatConfigFlags(IntEnum):
30 """Common NAT plugin APIs"""
32 NAT_IS_TWICE_NAT = 0x01
33 NAT_IS_SELF_TWICE_NAT = 0x02
34 NAT_IS_OUT2IN_ONLY = 0x04
35 NAT_IS_ADDR_ONLY = 0x08
39 NAT_IS_EXT_HOST_VALID = 0x80
42 class NatAddrPortAllocAlg(IntEnum):
43 """NAT Address and port assignment algorithms."""
44 NAT_ALLOC_ALG_DEFAULT = 0
45 NAT_ALLOC_ALG_MAP_E = 1
46 NAT_ALLOC_ALG_PORT_RANGE = 2
50 """This class defines the methods to set NAT."""
56 def set_nat44_interface(node, interface, flag):
57 """Set inside and outside interfaces for NAT44.
59 :param node: DUT node.
60 :param interface: NAT44 interface.
61 :param flag: Interface NAT configuration flag name.
66 cmd = u"nat44_interface_add_del_feature"
68 err_msg = f"Failed to set {flag} interface {interface} for NAT44 " \
69 f"on host {node[u'host']}"
71 sw_if_index=InterfaceUtil.get_sw_if_index(node, interface),
73 flags=getattr(NatConfigFlags, flag).value
76 with PapiSocketExecutor(node) as papi_exec:
77 papi_exec.add(cmd, **args_in).get_reply(err_msg)
80 def set_nat44_interfaces(node, int_in, int_out):
81 """Set inside and outside interfaces for NAT44.
83 :param node: DUT node.
84 :param int_in: Inside interface.
85 :param int_out: Outside interface.
90 NATUtil.set_nat44_interface(node, int_in, u"NAT_IS_INSIDE")
91 NATUtil.set_nat44_interface(node, int_out, u"NAT_IS_OUTSIDE")
94 def set_nat44_address_range(
95 node, start_ip, end_ip, vrf_id=Constants.BITWISE_NON_ZERO,
97 """Set NAT44 address range.
99 :param node: DUT node.
100 :param start_ip: IP range start.
101 :param end_ip: IP range end.
102 :param vrf_id: VRF index (Optional).
103 :param flag: NAT flag name.
110 cmd = u"nat44_add_del_address_range"
111 err_msg = f"Failed to set NAT44 address range on host {node[u'host']}"
114 first_ip_address=IPv4Address(str(start_ip)).packed,
115 last_ip_address=IPv4Address(str(end_ip)).packed,
117 flags=getattr(NatConfigFlags, flag).value
120 with PapiSocketExecutor(node) as papi_exec:
121 papi_exec.add(cmd, **args_in).get_reply(err_msg)
124 def show_nat_config(node):
125 """Show the NAT configuration.
127 :param node: DUT node.
130 cmd = u"nat_show_config"
131 err_msg = f"Failed to get NAT configuration on host {node[u'host']}"
133 with PapiSocketExecutor(node) as papi_exec:
134 reply = papi_exec.add(cmd).get_reply(err_msg)
136 logger.debug(f"NAT Configuration:\n{pformat(reply)}")
139 def show_nat44_summary(node):
140 """Show NAT44 summary on the specified topology node.
142 :param node: Topology node.
144 :returns: NAT44 summary data.
147 return PapiSocketExecutor.run_cli_cmd(node, u"show nat44 summary")
150 def show_nat_base_data(node):
151 """Show the NAT base data.
156 nat44_interface_addr_dump
158 nat44_static_mapping_dump
161 :param node: DUT node.
166 u"nat44_interface_addr_dump",
167 u"nat44_address_dump",
168 u"nat44_static_mapping_dump",
169 u"nat44_interface_dump",
171 PapiSocketExecutor.dump_and_log(node, cmds)
174 def show_nat_user_data(node):
175 """Show the NAT user data.
180 nat44_user_session_dump
182 :param node: DUT node.
187 u"nat44_user_session_dump",
189 PapiSocketExecutor.dump_and_log(node, cmds)
192 def compute_max_translations_per_thread(sessions, threads):
193 """Compute value of max_translations_per_thread NAT44 parameter based on
194 total number of worker threads.
196 :param sessions: Required number of NAT44 sessions.
197 :param threads: Number of worker threads.
200 :returns: Value of max_translations_per_thread NAT44 parameter.
203 rest, mult = modf(log2(sessions/(10*threads)))
204 return 2 ** (int(mult) + (1 if rest else 0)) * 10
207 def get_nat44_sessions_number(node, proto):
208 """Get number of established NAT44 sessions from actual NAT44 mapping
211 :param node: DUT node.
212 :param proto: Required protocol - TCP/UDP/ICMP.
215 :returns: Number of established NAT44 sessions.
217 :raises ValueError: If not supported protocol.
220 if proto in [u"UDP", u"TCP", u"ICMP"]:
221 for line in NATUtil.show_nat44_summary(node).splitlines():
222 sum_k, sum_v = line.split(u":") if u":" in line \
224 nat44_data[sum_k] = sum_v.strip() if isinstance(sum_v, str) \
227 raise ValueError(f"Unsupported protocol: {proto}!")
228 return nat44_data.get(f"total {proto.lower()} sessions", 0)
231 # DET44 means deterministic mode of NAT44
233 def enable_det44_plugin(node, inside_vrf=0, outside_vrf=0):
234 """Enable DET44 plugin.
236 :param node: DUT node.
237 :param inside_vrf: Inside VRF ID.
238 :param outside_vrf: Outside VRF ID.
240 :type inside_vrf: str or int
241 :type outside_vrf: str or int
243 cmd = u"det44_plugin_enable_disable"
244 err_msg = f"Failed to enable DET44 plugin on the host {node[u'host']}!"
247 inside_vrf=int(inside_vrf),
248 outside_vrf=int(outside_vrf)
251 with PapiSocketExecutor(node) as papi_exec:
252 papi_exec.add(cmd, **args_in).get_reply(err_msg)
255 def set_det44_interface(node, if_key, is_inside):
256 """Enable DET44 feature on the interface.
258 :param node: DUT node.
259 :param if_key: Interface key from topology file of interface
260 to enable DET44 feature on.
261 :param is_inside: True if interface is inside, False if outside.
264 :type is_inside: bool
266 cmd = u"det44_interface_add_del_feature"
267 err_msg = f"Failed to enable DET44 feature on the interface {if_key} " \
268 f"on the host {node[u'host']}!"
272 sw_if_index=Topology.get_interface_sw_index(node, if_key)
275 with PapiSocketExecutor(node) as papi_exec:
276 papi_exec.add(cmd, **args_in).get_reply(err_msg)
279 def set_det44_mapping(node, ip_in, subnet_in, ip_out, subnet_out):
280 """Set DET44 mapping.
282 :param node: DUT node.
283 :param ip_in: Inside IP.
284 :param subnet_in: Inside IP subnet.
285 :param ip_out: Outside IP.
286 :param subnet_out: Outside IP subnet.
289 :type subnet_in: str or int
291 :type subnet_out: str or int
293 cmd = u"det44_add_del_map"
294 err_msg = f"Failed to set DET44 mapping on the host {node[u'host']}!"
297 in_addr=IPv4Address(str(ip_in)).packed,
298 in_plen=int(subnet_in),
299 out_addr=IPv4Address(str(ip_out)).packed,
300 out_plen=int(subnet_out)
303 with PapiSocketExecutor(node) as papi_exec:
304 papi_exec.add(cmd, **args_in).get_reply(err_msg)
307 def get_det44_mapping(node):
308 """Get DET44 mapping data.
310 :param node: DUT node.
312 :returns: Dictionary of DET44 mapping data.
315 cmd = u"det44_map_dump"
316 err_msg = f"Failed to get DET44 mapping data on the host " \
319 with PapiSocketExecutor(node) as papi_exec:
320 details = papi_exec.add(cmd, **args_in).get_reply(err_msg)
325 def get_det44_sessions_number(node):
326 """Get number of established DET44 sessions from actual DET44 mapping
329 :param node: DUT node.
331 :returns: Number of established DET44 sessions.
334 det44_data = NATUtil.get_det44_mapping(node)
336 return det44_data.get(u"ses_num", 0)
339 def show_det44(node):
348 :param node: DUT node.
352 u"det44_interface_dump",
354 u"det44_session_dump",
356 PapiSocketExecutor.dump_and_log(node, cmds)