Support existing test types with ASTF
[csit.git] / resources / libraries / python / NATUtil.py
1 # Copyright (c) 2020 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """NAT utilities library."""
15
16 from math import log2, modf
17 from pprint import pformat
18 from enum import IntEnum
19
20 from ipaddress import IPv4Address
21 from robot.api import logger
22
23 from resources.libraries.python.Constants import Constants
24 from resources.libraries.python.InterfaceUtil import InterfaceUtil
25 from resources.libraries.python.topology import Topology
26 from resources.libraries.python.PapiExecutor import PapiSocketExecutor
27
28
29 class NatConfigFlags(IntEnum):
30     """Common NAT plugin APIs"""
31     NAT_IS_NONE = 0x00
32     NAT_IS_TWICE_NAT = 0x01
33     NAT_IS_SELF_TWICE_NAT = 0x02
34     NAT_IS_OUT2IN_ONLY = 0x04
35     NAT_IS_ADDR_ONLY = 0x08
36     NAT_IS_OUTSIDE = 0x10
37     NAT_IS_INSIDE = 0x20
38     NAT_IS_STATIC = 0x40
39     NAT_IS_EXT_HOST_VALID = 0x80
40
41
42 class NatAddrPortAllocAlg(IntEnum):
43     """NAT Address and port assignment algorithms."""
44     NAT_ALLOC_ALG_DEFAULT = 0
45     NAT_ALLOC_ALG_MAP_E = 1
46     NAT_ALLOC_ALG_PORT_RANGE = 2
47
48
49 class NATUtil:
50     """This class defines the methods to set NAT."""
51
52     def __init__(self):
53         pass
54
55     @staticmethod
56     def set_nat44_interface(node, interface, flag):
57         """Set inside and outside interfaces for NAT44.
58
59         :param node: DUT node.
60         :param interface: NAT44 interface.
61         :param flag: Interface NAT configuration flag name.
62         :type node: dict
63         :type interface: str
64         :type flag: str
65         """
66         cmd = u"nat44_interface_add_del_feature"
67
68         err_msg = f"Failed to set {flag} interface {interface} for NAT44 " \
69             f"on host {node[u'host']}"
70         args_in = dict(
71             sw_if_index=InterfaceUtil.get_sw_if_index(node, interface),
72             is_add=1,
73             flags=getattr(NatConfigFlags, flag).value
74         )
75
76         with PapiSocketExecutor(node) as papi_exec:
77             papi_exec.add(cmd, **args_in).get_reply(err_msg)
78
79     @staticmethod
80     def set_nat44_interfaces(node, int_in, int_out):
81         """Set inside and outside interfaces for NAT44.
82
83         :param node: DUT node.
84         :param int_in: Inside interface.
85         :param int_out: Outside interface.
86         :type node: dict
87         :type int_in: str
88         :type int_out: str
89         """
90         NATUtil.set_nat44_interface(node, int_in, u"NAT_IS_INSIDE")
91         NATUtil.set_nat44_interface(node, int_out, u"NAT_IS_OUTSIDE")
92
93     @staticmethod
94     def set_nat44_address_range(
95             node, start_ip, end_ip, vrf_id=Constants.BITWISE_NON_ZERO,
96             flag=u"NAT_IS_NONE"):
97         """Set NAT44 address range.
98
99         The return value is a callable (zero argument Python function)
100         which can be used to reset NAT state, so repeated trial measurements
101         hit the same slow path.
102
103         :param node: DUT node.
104         :param start_ip: IP range start.
105         :param end_ip: IP range end.
106         :param vrf_id: VRF index (Optional).
107         :param flag: NAT flag name.
108         :type node: dict
109         :type start_ip: str
110         :type end_ip: str
111         :type vrf_id: int
112         :type flag: str
113         :returns: Resetter of the NAT state.
114         :rtype: Callable[[], None]
115         """
116         cmd = u"nat44_add_del_address_range"
117         err_msg = f"Failed to set NAT44 address range on host {node[u'host']}"
118         args_in = dict(
119             is_add=True,
120             first_ip_address=IPv4Address(str(start_ip)).packed,
121             last_ip_address=IPv4Address(str(end_ip)).packed,
122             vrf_id=vrf_id,
123             flags=getattr(NatConfigFlags, flag).value
124         )
125
126         with PapiSocketExecutor(node) as papi_exec:
127             papi_exec.add(cmd, **args_in).get_reply(err_msg)
128
129         # A closure, accessing the variables above.
130         def resetter():
131             """Delete and re-add the NAT range setting."""
132             with PapiSocketExecutor(node) as papi_exec:
133                 args_in[u"is_add"] = False
134                 papi_exec.add(cmd, **args_in)
135                 args_in[u"is_add"] = True
136                 papi_exec.add(cmd, **args_in)
137                 papi_exec.get_replies(err_msg)
138
139         return resetter
140
141     @staticmethod
142     def show_nat_config(node):
143         """Show the NAT configuration.
144
145         :param node: DUT node.
146         :type node: dict
147         """
148         cmd = u"nat_show_config"
149         err_msg = f"Failed to get NAT configuration on host {node[u'host']}"
150
151         with PapiSocketExecutor(node) as papi_exec:
152             reply = papi_exec.add(cmd).get_reply(err_msg)
153
154         logger.debug(f"NAT Configuration:\n{pformat(reply)}")
155
156     @staticmethod
157     def show_nat44_summary(node):
158         """Show NAT44 summary on the specified topology node.
159
160         :param node: Topology node.
161         :type node: dict
162         :returns: NAT44 summary data.
163         :rtype: str
164         """
165         return PapiSocketExecutor.run_cli_cmd(node, u"show nat44 summary")
166
167     @staticmethod
168     def show_nat_base_data(node):
169         """Show the NAT base data.
170
171         Used data sources:
172
173             nat_worker_dump
174             nat44_interface_addr_dump
175             nat44_address_dump
176             nat44_static_mapping_dump
177             nat44_interface_dump
178
179         :param node: DUT node.
180         :type node: dict
181         """
182         cmds = [
183             u"nat_worker_dump",
184             u"nat44_interface_addr_dump",
185             u"nat44_address_dump",
186             u"nat44_static_mapping_dump",
187             u"nat44_interface_dump",
188         ]
189         PapiSocketExecutor.dump_and_log(node, cmds)
190
191     @staticmethod
192     def show_nat_user_data(node):
193         """Show the NAT user data.
194
195         Used data sources:
196
197             nat44_user_dump
198             nat44_user_session_dump
199
200         :param node: DUT node.
201         :type node: dict
202         """
203         cmds = [
204             u"nat44_user_dump",
205             u"nat44_user_session_dump",
206         ]
207         PapiSocketExecutor.dump_and_log(node, cmds)
208
209     @staticmethod
210     def compute_max_translations_per_thread(sessions, threads):
211         """Compute value of max_translations_per_thread NAT44 parameter based on
212         total number of worker threads.
213
214         :param sessions: Required number of NAT44 sessions.
215         :param threads: Number of worker threads.
216         :type sessions: int
217         :type threads: int
218         :returns: Value of max_translations_per_thread NAT44 parameter.
219         :rtype: int
220         """
221         rest, mult = modf(log2(sessions/(10*threads)))
222         return 2 ** (int(mult) + (1 if rest else 0)) * 10
223
224     @staticmethod
225     def get_nat44_sessions_number(node, proto):
226         """Get number of established NAT44 sessions from actual NAT44 mapping
227         data.
228
229         :param node: DUT node.
230         :param proto: Required protocol - TCP/UDP/ICMP.
231         :type node: dict
232         :type proto: str
233         :returns: Number of established NAT44 sessions.
234         :rtype: int
235         :raises ValueError: If not supported protocol.
236         """
237         nat44_data = dict()
238         if proto in [u"UDP", u"TCP", u"ICMP"]:
239             for line in NATUtil.show_nat44_summary(node).splitlines():
240                 sum_k, sum_v = line.split(u":") if u":" in line \
241                     else (line, None)
242                 nat44_data[sum_k] = sum_v.strip() if isinstance(sum_v, str) \
243                     else sum_v
244         else:
245             raise ValueError(f"Unsupported protocol: {proto}!")
246         return nat44_data.get(f"total {proto.lower()} sessions", 0)
247
248     # DET44 PAPI calls
249     # DET44 means deterministic mode of NAT44
250     @staticmethod
251     def enable_det44_plugin(node, inside_vrf=0, outside_vrf=0):
252         """Enable DET44 plugin.
253
254         :param node: DUT node.
255         :param inside_vrf: Inside VRF ID.
256         :param outside_vrf: Outside VRF ID.
257         :type node: dict
258         :type inside_vrf: str or int
259         :type outside_vrf: str or int
260         """
261         cmd = u"det44_plugin_enable_disable"
262         err_msg = f"Failed to enable DET44 plugin on the host {node[u'host']}!"
263         args_in = dict(
264             enable=True,
265             inside_vrf=int(inside_vrf),
266             outside_vrf=int(outside_vrf)
267         )
268
269         with PapiSocketExecutor(node) as papi_exec:
270             papi_exec.add(cmd, **args_in).get_reply(err_msg)
271
272     @staticmethod
273     def set_det44_interface(node, if_key, is_inside):
274         """Enable DET44 feature on the interface.
275
276         :param node: DUT node.
277         :param if_key: Interface key from topology file of interface
278             to enable DET44 feature on.
279         :param is_inside: True if interface is inside, False if outside.
280         :type node: dict
281         :type if_key: str
282         :type is_inside: bool
283         """
284         cmd = u"det44_interface_add_del_feature"
285         err_msg = f"Failed to enable DET44 feature on the interface {if_key} " \
286             f"on the host {node[u'host']}!"
287         args_in = dict(
288             is_add=True,
289             is_inside=is_inside,
290             sw_if_index=Topology.get_interface_sw_index(node, if_key)
291         )
292
293         with PapiSocketExecutor(node) as papi_exec:
294             papi_exec.add(cmd, **args_in).get_reply(err_msg)
295
296     @staticmethod
297     def set_det44_mapping(node, ip_in, subnet_in, ip_out, subnet_out):
298         """Set DET44 mapping.
299
300         The return value is a callable (zero argument Python function)
301         which can be used to reset NAT state, so repeated trial measurements
302         hit the same slow path.
303
304         :param node: DUT node.
305         :param ip_in: Inside IP.
306         :param subnet_in: Inside IP subnet.
307         :param ip_out: Outside IP.
308         :param subnet_out: Outside IP subnet.
309         :type node: dict
310         :type ip_in: str
311         :type subnet_in: str or int
312         :type ip_out: str
313         :type subnet_out: str or int
314         """
315         cmd = u"det44_add_del_map"
316         err_msg = f"Failed to set DET44 mapping on the host {node[u'host']}!"
317         args_in = dict(
318             is_add=True,
319             in_addr=IPv4Address(str(ip_in)).packed,
320             in_plen=int(subnet_in),
321             out_addr=IPv4Address(str(ip_out)).packed,
322             out_plen=int(subnet_out)
323         )
324
325         with PapiSocketExecutor(node) as papi_exec:
326             papi_exec.add(cmd, **args_in).get_reply(err_msg)
327
328         # A closure, accessing the variables above.
329         def resetter():
330             """Delete and re-add the deterministic NAT mapping."""
331             with PapiSocketExecutor(node) as papi_exec:
332                 args_in[u"is_add"] = False
333                 papi_exec.add(cmd, **args_in)
334                 args_in[u"is_add"] = True
335                 papi_exec.add(cmd, **args_in)
336                 papi_exec.get_replies(err_msg)
337
338         return resetter
339
340     @staticmethod
341     def get_det44_mapping(node):
342         """Get DET44 mapping data.
343
344         :param node: DUT node.
345         :type node: dict
346         :returns: Dictionary of DET44 mapping data.
347         :rtype: dict
348         """
349         cmd = u"det44_map_dump"
350         err_msg = f"Failed to get DET44 mapping data on the host " \
351             f"{node[u'host']}!"
352         args_in = dict()
353         with PapiSocketExecutor(node) as papi_exec:
354             details = papi_exec.add(cmd, **args_in).get_reply(err_msg)
355
356         return details
357
358     @staticmethod
359     def get_det44_sessions_number(node):
360         """Get number of established DET44 sessions from actual DET44 mapping
361         data.
362         :param node: DUT node.
363         :type node: dict
364         :returns: Number of established DET44 sessions.
365         :rtype: int
366         """
367         det44_data = NATUtil.get_det44_mapping(node)
368         return det44_data.get(u"ses_num", 0)
369
370     @staticmethod
371     def show_det44(node):
372         """Show DET44 data.
373
374         Used data sources:
375
376             det44_interface_dump
377             det44_map_dump
378             det44_session_dump
379
380         :param node: DUT node.
381         :type node: dict
382         """
383         cmds = [
384             u"det44_interface_dump",
385             u"det44_map_dump",
386             u"det44_session_dump",
387         ]
388         PapiSocketExecutor.dump_and_log(node, cmds)