620e14aea166e12ef7339bdaca8ed5b909605d77
[csit.git] / resources / libraries / python / NATUtil.py
1 # Copyright (c) 2020 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """NAT utilities library."""
15
16 from math import log2, modf
17 from pprint import pformat
18 from enum import IntEnum
19
20 from ipaddress import IPv4Address
21 from robot.api import logger
22
23 from resources.libraries.python.Constants import Constants
24 from resources.libraries.python.InterfaceUtil import InterfaceUtil
25 from resources.libraries.python.topology import Topology
26 from resources.libraries.python.PapiExecutor import PapiSocketExecutor
27
28
29 class NatConfigFlags(IntEnum):
30     """Common NAT plugin APIs"""
31     NAT_IS_NONE = 0x00
32     NAT_IS_TWICE_NAT = 0x01
33     NAT_IS_SELF_TWICE_NAT = 0x02
34     NAT_IS_OUT2IN_ONLY = 0x04
35     NAT_IS_ADDR_ONLY = 0x08
36     NAT_IS_OUTSIDE = 0x10
37     NAT_IS_INSIDE = 0x20
38     NAT_IS_STATIC = 0x40
39     NAT_IS_EXT_HOST_VALID = 0x80
40
41
42 class NatAddrPortAllocAlg(IntEnum):
43     """NAT Address and port assignment algorithms."""
44     NAT_ALLOC_ALG_DEFAULT = 0
45     NAT_ALLOC_ALG_MAP_E = 1
46     NAT_ALLOC_ALG_PORT_RANGE = 2
47
48
49 class NATUtil:
50     """This class defines the methods to set NAT."""
51
52     def __init__(self):
53         pass
54
55     @staticmethod
56     def set_nat44_interface(node, interface, flag):
57         """Set inside and outside interfaces for NAT44.
58
59         :param node: DUT node.
60         :param interface: NAT44 interface.
61         :param flag: Interface NAT configuration flag name.
62         :type node: dict
63         :type interface: str
64         :type flag: str
65         """
66         cmd = u"nat44_interface_add_del_feature"
67
68         err_msg = f"Failed to set {flag} interface {interface} for NAT44 " \
69             f"on host {node[u'host']}"
70         args_in = dict(
71             sw_if_index=InterfaceUtil.get_sw_if_index(node, interface),
72             is_add=1,
73             flags=getattr(NatConfigFlags, flag).value
74         )
75
76         with PapiSocketExecutor(node) as papi_exec:
77             papi_exec.add(cmd, **args_in).get_reply(err_msg)
78
79     @staticmethod
80     def set_nat44_interfaces(node, int_in, int_out):
81         """Set inside and outside interfaces for NAT44.
82
83         :param node: DUT node.
84         :param int_in: Inside interface.
85         :param int_out: Outside interface.
86         :type node: dict
87         :type int_in: str
88         :type int_out: str
89         """
90         NATUtil.set_nat44_interface(node, int_in, u"NAT_IS_INSIDE")
91         NATUtil.set_nat44_interface(node, int_out, u"NAT_IS_OUTSIDE")
92
93     @staticmethod
94     def set_nat44_address_range(
95             node, start_ip, end_ip, vrf_id=Constants.BITWISE_NON_ZERO,
96             flag=u"NAT_IS_NONE"):
97         """Set NAT44 address range.
98
99         :param node: DUT node.
100         :param start_ip: IP range start.
101         :param end_ip: IP range end.
102         :param vrf_id: VRF index (Optional).
103         :param flag: NAT flag name.
104         :type node: dict
105         :type start_ip: str
106         :type end_ip: str
107         :type vrf_id: int
108         :type flag: str
109         """
110         cmd = u"nat44_add_del_address_range"
111         err_msg = f"Failed to set NAT44 address range on host {node[u'host']}"
112         args_in = dict(
113             is_add=True,
114             first_ip_address=IPv4Address(str(start_ip)).packed,
115             last_ip_address=IPv4Address(str(end_ip)).packed,
116             vrf_id=vrf_id,
117             flags=getattr(NatConfigFlags, flag).value
118         )
119
120         with PapiSocketExecutor(node) as papi_exec:
121             papi_exec.add(cmd, **args_in).get_reply(err_msg)
122
123     @staticmethod
124     def show_nat_config(node):
125         """Show the NAT configuration.
126
127         :param node: DUT node.
128         :type node: dict
129         """
130         cmd = u"nat_show_config"
131         err_msg = f"Failed to get NAT configuration on host {node[u'host']}"
132
133         with PapiSocketExecutor(node) as papi_exec:
134             reply = papi_exec.add(cmd).get_reply(err_msg)
135
136         logger.debug(f"NAT Configuration:\n{pformat(reply)}")
137
138     @staticmethod
139     def show_nat44_summary(node):
140         """Show NAT44 summary on the specified topology node.
141
142         :param node: Topology node.
143         :type node: dict
144         :returns: NAT44 summary data.
145         :rtype: str
146         """
147         return PapiSocketExecutor.run_cli_cmd(node, u"show nat44 summary")
148
149     @staticmethod
150     def show_nat_base_data(node):
151         """Show the NAT base data.
152
153         Used data sources:
154
155             nat_worker_dump
156             nat44_interface_addr_dump
157             nat44_address_dump
158             nat44_static_mapping_dump
159             nat44_interface_dump
160
161         :param node: DUT node.
162         :type node: dict
163         """
164         cmds = [
165             u"nat_worker_dump",
166             u"nat44_interface_addr_dump",
167             u"nat44_address_dump",
168             u"nat44_static_mapping_dump",
169             u"nat44_interface_dump",
170         ]
171         PapiSocketExecutor.dump_and_log(node, cmds)
172
173     @staticmethod
174     def show_nat_user_data(node):
175         """Show the NAT user data.
176
177         Used data sources:
178
179             nat44_user_dump
180             nat44_user_session_dump
181
182         :param node: DUT node.
183         :type node: dict
184         """
185         cmds = [
186             u"nat44_user_dump",
187             u"nat44_user_session_dump",
188         ]
189         PapiSocketExecutor.dump_and_log(node, cmds)
190
191     @staticmethod
192     def compute_max_translations_per_thread(sessions, threads):
193         """Compute value of max_translations_per_thread NAT44 parameter based on
194         total number of worker threads.
195
196         :param sessions: Required number of NAT44 sessions.
197         :param threads: Number of worker threads.
198         :type sessions: int
199         :type threads: int
200         :returns: Value of max_translations_per_thread NAT44 parameter.
201         :rtype: int
202         """
203         rest, mult = modf(log2(sessions/(10*threads)))
204         return 2 ** (int(mult) + (1 if rest else 0)) * 10
205
206     @staticmethod
207     def get_nat44_sessions_number(node, proto):
208         """Get number of established NAT44 sessions from actual NAT44 mapping
209         data.
210
211         :param node: DUT node.
212         :param proto: Required protocol - TCP/UDP/ICMP.
213         :type node: dict
214         :type proto: str
215         :returns: Number of established NAT44 sessions.
216         :rtype: int
217         :raises ValueError: If not supported protocol.
218         """
219         nat44_data = dict()
220         if proto in [u"UDP", u"TCP", u"ICMP"]:
221             for line in NATUtil.show_nat44_summary(node).splitlines():
222                 sum_k, sum_v = line.split(u":") if u":" in line \
223                     else (line, None)
224                 nat44_data[sum_k] = sum_v.strip() if isinstance(sum_v, str) \
225                     else sum_v
226         else:
227             raise ValueError(f"Unsupported protocol: {proto}!")
228         return nat44_data.get(f"total {proto.lower()} sessions", 0)
229
230     # DET44 PAPI calls
231     # DET44 means deterministic mode of NAT44
232     @staticmethod
233     def enable_det44_plugin(node, inside_vrf=0, outside_vrf=0):
234         """Enable DET44 plugin.
235
236         :param node: DUT node.
237         :param inside_vrf: Inside VRF ID.
238         :param outside_vrf: Outside VRF ID.
239         :type node: dict
240         :type inside_vrf: str or int
241         :type outside_vrf: str or int
242         """
243         cmd = u"det44_plugin_enable_disable"
244         err_msg = f"Failed to enable DET44 plugin on the host {node[u'host']}!"
245         args_in = dict(
246             enable=True,
247             inside_vrf=int(inside_vrf),
248             outside_vrf=int(outside_vrf)
249         )
250
251         with PapiSocketExecutor(node) as papi_exec:
252             papi_exec.add(cmd, **args_in).get_reply(err_msg)
253
254     @staticmethod
255     def set_det44_interface(node, if_key, is_inside):
256         """Enable DET44 feature on the interface.
257
258         :param node: DUT node.
259         :param if_key: Interface key from topology file of interface
260             to enable DET44 feature on.
261         :param is_inside: True if interface is inside, False if outside.
262         :type node: dict
263         :type if_key: str
264         :type is_inside: bool
265         """
266         cmd = u"det44_interface_add_del_feature"
267         err_msg = f"Failed to enable DET44 feature on the interface {if_key} " \
268             f"on the host {node[u'host']}!"
269         args_in = dict(
270             is_add=True,
271             is_inside=is_inside,
272             sw_if_index=Topology.get_interface_sw_index(node, if_key)
273         )
274
275         with PapiSocketExecutor(node) as papi_exec:
276             papi_exec.add(cmd, **args_in).get_reply(err_msg)
277
278     @staticmethod
279     def set_det44_mapping(node, ip_in, subnet_in, ip_out, subnet_out):
280         """Set DET44 mapping.
281
282         :param node: DUT node.
283         :param ip_in: Inside IP.
284         :param subnet_in: Inside IP subnet.
285         :param ip_out: Outside IP.
286         :param subnet_out: Outside IP subnet.
287         :type node: dict
288         :type ip_in: str
289         :type subnet_in: str or int
290         :type ip_out: str
291         :type subnet_out: str or int
292         """
293         cmd = u"det44_add_del_map"
294         err_msg = f"Failed to set DET44 mapping on the host {node[u'host']}!"
295         args_in = dict(
296             is_add=True,
297             in_addr=IPv4Address(str(ip_in)).packed,
298             in_plen=int(subnet_in),
299             out_addr=IPv4Address(str(ip_out)).packed,
300             out_plen=int(subnet_out)
301         )
302
303         with PapiSocketExecutor(node) as papi_exec:
304             papi_exec.add(cmd, **args_in).get_reply(err_msg)
305
306     @staticmethod
307     def get_det44_mapping(node):
308         """Get DET44 mapping data.
309
310         :param node: DUT node.
311         :type node: dict
312         :returns: Dictionary of DET44 mapping data.
313         :rtype: dict
314         """
315         cmd = u"det44_map_dump"
316         err_msg = f"Failed to get DET44 mapping data on the host " \
317             f"{node[u'host']}!"
318         args_in = dict()
319         with PapiSocketExecutor(node) as papi_exec:
320             details = papi_exec.add(cmd, **args_in).get_reply(err_msg)
321
322         return details
323
324     @staticmethod
325     def get_det44_sessions_number(node):
326         """Get number of established DET44 sessions from actual DET44 mapping
327         data.
328
329         :param node: DUT node.
330         :type node: dict
331         :returns: Number of established DET44 sessions.
332         :rtype: int
333         """
334         det44_data = NATUtil.get_det44_mapping(node)
335
336         return det44_data.get(u"ses_num", 0)
337
338     @staticmethod
339     def show_det44(node):
340         """Show DET44 data.
341
342         Used data sources:
343
344             det44_interface_dump
345             det44_map_dump
346             det44_session_dump
347
348         :param node: DUT node.
349         :type node: dict
350         """
351         cmds = [
352             u"det44_interface_dump",
353             u"det44_map_dump",
354             u"det44_session_dump",
355         ]
356         PapiSocketExecutor.dump_and_log(node, cmds)