8a5d8c1404f5f005b3578c98ad87812061995ece
[csit.git] / resources / libraries / python / NATUtil.py
1 # Copyright (c) 2021 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """NAT utilities library."""
15
16 from math import log2, modf
17 from pprint import pformat
18 from enum import IntEnum
19
20 from ipaddress import IPv4Address
21 from robot.api import logger
22
23 from resources.libraries.python.Constants import Constants
24 from resources.libraries.python.InterfaceUtil import InterfaceUtil
25 from resources.libraries.python.topology import Topology
26 from resources.libraries.python.PapiExecutor import PapiSocketExecutor
27
28
29 class NatConfigFlags(IntEnum):
30     """NAT plugin configuration flags"""
31     NAT_IS_NONE = 0x00
32     NAT_IS_TWICE_NAT = 0x01
33     NAT_IS_SELF_TWICE_NAT = 0x02
34     NAT_IS_OUT2IN_ONLY = 0x04
35     NAT_IS_ADDR_ONLY = 0x08
36     NAT_IS_OUTSIDE = 0x10
37     NAT_IS_INSIDE = 0x20
38     NAT_IS_STATIC = 0x40
39     NAT_IS_EXT_HOST_VALID = 0x80
40
41
42 class Nat44ConfigFlags(IntEnum):
43     """NAT44 configuration flags"""
44     NAT44_IS_ENDPOINT_INDEPENDENT = 0x00
45     NAT44_IS_ENDPOINT_DEPENDENT = 0x01
46     NAT44_IS_STATIC_MAPPING_ONLY = 0x02
47     NAT44_IS_CONNECTION_TRACKING = 0x04
48     NAT44_IS_OUT2IN_DPO = 0x08
49
50
51 class NatAddrPortAllocAlg(IntEnum):
52     """NAT Address and port assignment algorithms."""
53     NAT_ALLOC_ALG_DEFAULT = 0
54     NAT_ALLOC_ALG_MAP_E = 1
55     NAT_ALLOC_ALG_PORT_RANGE = 2
56
57
58 class NATUtil:
59     """This class defines the methods to set NAT."""
60
61     def __init__(self):
62         pass
63
64     @staticmethod
65     def enable_nat44_plugin(
66             node, inside_vrf=0, outside_vrf=0, users=0, user_memory=0,
67             sessions=0, session_memory=0, user_sessions=0, mode=u""):
68         """Enable NAT44 plugin.
69
70         :param node: DUT node.
71         :param inside_vrf: Inside VRF ID.
72         :param outside_vrf: Outside VRF ID.
73         :param users: Maximum number of users. Used only in endpoint-independent
74             mode.
75         :param user_memory: User memory size - overwrite auto calculated hash
76             allocation parameter if non-zero.
77         :param sessions: Maximum number of sessions.
78         :param session_memory: Session memory size - overwrite auto calculated
79             hash allocation parameter if non-zero.
80         :param user_sessions: Maximum number of sessions per user. Used only in
81             endpoint-independent mode.
82         :param mode: NAT44 mode. Valid values:
83             - endpoint-independent
84             - endpoint-dependent
85             - static-mapping-only
86             - connection-tracking
87             - out2in-dpo
88         :type node: dict
89         :type inside_vrf: str or int
90         :type outside_vrf: str or int
91         :type users: str or int
92         :type user_memory: str or int
93         :type sessions: str or int
94         :type session_memory: str or int
95         :type user_sessions: str or int
96         :type mode: str
97         """
98         cmd = u"nat44_plugin_enable_disable"
99         err_msg = f"Failed to enable NAT44 plugin on the host {node[u'host']}!"
100         args_in = dict(
101             enable=True,
102             inside_vrf=int(inside_vrf),
103             outside_vrf=int(outside_vrf),
104             users=int(users),
105             user_memory=int(user_memory),
106             sessions=int(sessions),
107             session_memory=int(session_memory),
108             user_sessions=int(user_sessions),
109             flags=getattr(
110                 Nat44ConfigFlags,
111                 f"NAT44_IS_{mode.replace(u'-', u'_').upper()}"
112             ).value
113         )
114
115         with PapiSocketExecutor(node) as papi_exec:
116             papi_exec.add(cmd, **args_in).get_reply(err_msg)
117
118     @staticmethod
119     def set_nat44_interface(node, interface, flag):
120         """Set inside and outside interfaces for NAT44.
121
122         :param node: DUT node.
123         :param interface: NAT44 interface.
124         :param flag: Interface NAT configuration flag name.
125         :type node: dict
126         :type interface: str
127         :type flag: str
128         """
129         cmd = u"nat44_interface_add_del_feature"
130
131         err_msg = f"Failed to set {flag} interface {interface} for NAT44 " \
132             f"on host {node[u'host']}"
133         args_in = dict(
134             sw_if_index=InterfaceUtil.get_sw_if_index(node, interface),
135             is_add=1,
136             flags=getattr(NatConfigFlags, flag).value
137         )
138
139         with PapiSocketExecutor(node) as papi_exec:
140             papi_exec.add(cmd, **args_in).get_reply(err_msg)
141
142     @staticmethod
143     def set_nat44_interfaces(node, int_in, int_out):
144         """Set inside and outside interfaces for NAT44.
145
146         :param node: DUT node.
147         :param int_in: Inside interface.
148         :param int_out: Outside interface.
149         :type node: dict
150         :type int_in: str
151         :type int_out: str
152         """
153         NATUtil.set_nat44_interface(node, int_in, u"NAT_IS_INSIDE")
154         NATUtil.set_nat44_interface(node, int_out, u"NAT_IS_OUTSIDE")
155
156     @staticmethod
157     def set_nat44_address_range(
158             node, start_ip, end_ip, vrf_id=Constants.BITWISE_NON_ZERO,
159             flag=u"NAT_IS_NONE"):
160         """Set NAT44 address range.
161
162         The return value is a callable (zero argument Python function)
163         which can be used to reset NAT state, so repeated trial measurements
164         hit the same slow path.
165
166         :param node: DUT node.
167         :param start_ip: IP range start.
168         :param end_ip: IP range end.
169         :param vrf_id: VRF index (Optional).
170         :param flag: NAT flag name.
171         :type node: dict
172         :type start_ip: str
173         :type end_ip: str
174         :type vrf_id: int
175         :type flag: str
176         :returns: Resetter of the NAT state.
177         :rtype: Callable[[], None]
178         """
179         cmd = u"nat44_add_del_address_range"
180         err_msg = f"Failed to set NAT44 address range on host {node[u'host']}"
181         args_in = dict(
182             is_add=True,
183             first_ip_address=IPv4Address(str(start_ip)).packed,
184             last_ip_address=IPv4Address(str(end_ip)).packed,
185             vrf_id=vrf_id,
186             flags=getattr(NatConfigFlags, flag).value
187         )
188
189         with PapiSocketExecutor(node) as papi_exec:
190             papi_exec.add(cmd, **args_in).get_reply(err_msg)
191
192         # A closure, accessing the variables above.
193         def resetter():
194             """Delete and re-add the NAT range setting."""
195             with PapiSocketExecutor(node) as papi_exec:
196                 args_in[u"is_add"] = False
197                 papi_exec.add(cmd, **args_in)
198                 args_in[u"is_add"] = True
199                 papi_exec.add(cmd, **args_in)
200                 papi_exec.get_replies(err_msg)
201
202         return resetter
203
204     @staticmethod
205     def show_nat44_config(node):
206         """Show the NAT44 plugin running configuration.
207
208         :param node: DUT node.
209         :type node: dict
210         """
211         cmd = u"nat44_show_running_config"
212         err_msg = f"Failed to get NAT44 configuration on host {node[u'host']}"
213
214         try:
215             with PapiSocketExecutor(node) as papi_exec:
216                 reply = papi_exec.add(cmd).get_reply(err_msg)
217         except AssertionError:
218             # Perhaps VPP is an older version
219             old_cmd = u"nat_show_config"
220             with PapiSocketExecutor(node) as papi_exec:
221                 reply = papi_exec.add(old_cmd).get_reply(err_msg)
222
223         logger.debug(f"NAT44 Configuration:\n{pformat(reply)}")
224
225     @staticmethod
226     def show_nat44_summary(node):
227         """Show NAT44 summary on the specified topology node.
228
229         :param node: Topology node.
230         :type node: dict
231         :returns: NAT44 summary data.
232         :rtype: str
233         """
234         return PapiSocketExecutor.run_cli_cmd(node, u"show nat44 summary")
235
236     @staticmethod
237     def show_nat_base_data(node):
238         """Show the NAT base data.
239
240         Used data sources:
241
242             nat_worker_dump
243             nat44_interface_addr_dump
244             nat44_address_dump
245             nat44_static_mapping_dump
246             nat44_interface_dump
247
248         :param node: DUT node.
249         :type node: dict
250         """
251         cmds = [
252             u"nat_worker_dump",
253             u"nat44_interface_addr_dump",
254             u"nat44_address_dump",
255             u"nat44_static_mapping_dump",
256             u"nat44_interface_dump",
257         ]
258         PapiSocketExecutor.dump_and_log(node, cmds)
259
260     @staticmethod
261     def show_nat_user_data(node):
262         """Show the NAT user data.
263
264         Used data sources:
265
266             nat44_user_dump
267             nat44_user_session_dump
268
269         :param node: DUT node.
270         :type node: dict
271         """
272         cmds = [
273             u"nat44_user_dump",
274             u"nat44_user_session_dump",
275         ]
276         PapiSocketExecutor.dump_and_log(node, cmds)
277
278     @staticmethod
279     def compute_max_translations_per_thread(sessions, threads):
280         """Compute value of max_translations_per_thread NAT44 parameter based on
281         total number of worker threads.
282
283         :param sessions: Required number of NAT44 sessions.
284         :param threads: Number of worker threads.
285         :type sessions: int
286         :type threads: int
287         :returns: Value of max_translations_per_thread NAT44 parameter.
288         :rtype: int
289         """
290         # vpp-device tests have not dedicated physical core so
291         # ${dp_count_int} == 0 but we need to use one thread
292         threads = 1 if not int(threads) else int(threads)
293         rest, mult = modf(log2(sessions/(10*threads)))
294         return 2 ** (int(mult) + (1 if rest else 0)) * 10
295
296     @staticmethod
297     def get_nat44_sessions_number(node, proto):
298         """Get number of established NAT44 sessions from actual NAT44 mapping
299         data.
300
301         :param node: DUT node.
302         :param proto: Required protocol - TCP/UDP/ICMP.
303         :type node: dict
304         :type proto: str
305         :returns: Number of established NAT44 sessions.
306         :rtype: int
307         :raises ValueError: If not supported protocol.
308         """
309         nat44_data = dict()
310         if proto in [u"UDP", u"TCP", u"ICMP"]:
311             for line in NATUtil.show_nat44_summary(node).splitlines():
312                 sum_k, sum_v = line.split(u":") if u":" in line \
313                     else (line, None)
314                 nat44_data[sum_k] = sum_v.strip() if isinstance(sum_v, str) \
315                     else sum_v
316         else:
317             raise ValueError(f"Unsupported protocol: {proto}!")
318         return nat44_data.get(f"total {proto.lower()} sessions", 0)
319
320     # DET44 PAPI calls
321     # DET44 means deterministic mode of NAT44
322     @staticmethod
323     def enable_det44_plugin(node, inside_vrf=0, outside_vrf=0):
324         """Enable DET44 plugin.
325
326         :param node: DUT node.
327         :param inside_vrf: Inside VRF ID.
328         :param outside_vrf: Outside VRF ID.
329         :type node: dict
330         :type inside_vrf: str or int
331         :type outside_vrf: str or int
332         """
333         cmd = u"det44_plugin_enable_disable"
334         err_msg = f"Failed to enable DET44 plugin on the host {node[u'host']}!"
335         args_in = dict(
336             enable=True,
337             inside_vrf=int(inside_vrf),
338             outside_vrf=int(outside_vrf)
339         )
340
341         with PapiSocketExecutor(node) as papi_exec:
342             papi_exec.add(cmd, **args_in).get_reply(err_msg)
343
344     @staticmethod
345     def set_det44_interface(node, if_key, is_inside):
346         """Enable DET44 feature on the interface.
347
348         :param node: DUT node.
349         :param if_key: Interface key from topology file of interface
350             to enable DET44 feature on.
351         :param is_inside: True if interface is inside, False if outside.
352         :type node: dict
353         :type if_key: str
354         :type is_inside: bool
355         """
356         cmd = u"det44_interface_add_del_feature"
357         err_msg = f"Failed to enable DET44 feature on the interface {if_key} " \
358             f"on the host {node[u'host']}!"
359         args_in = dict(
360             is_add=True,
361             is_inside=is_inside,
362             sw_if_index=Topology.get_interface_sw_index(node, if_key)
363         )
364
365         with PapiSocketExecutor(node) as papi_exec:
366             papi_exec.add(cmd, **args_in).get_reply(err_msg)
367
368     @staticmethod
369     def set_det44_mapping(node, ip_in, subnet_in, ip_out, subnet_out):
370         """Set DET44 mapping.
371
372         The return value is a callable (zero argument Python function)
373         which can be used to reset NAT state, so repeated trial measurements
374         hit the same slow path.
375
376         :param node: DUT node.
377         :param ip_in: Inside IP.
378         :param subnet_in: Inside IP subnet.
379         :param ip_out: Outside IP.
380         :param subnet_out: Outside IP subnet.
381         :type node: dict
382         :type ip_in: str
383         :type subnet_in: str or int
384         :type ip_out: str
385         :type subnet_out: str or int
386         """
387         cmd = u"det44_add_del_map"
388         err_msg = f"Failed to set DET44 mapping on the host {node[u'host']}!"
389         args_in = dict(
390             is_add=True,
391             in_addr=IPv4Address(str(ip_in)).packed,
392             in_plen=int(subnet_in),
393             out_addr=IPv4Address(str(ip_out)).packed,
394             out_plen=int(subnet_out)
395         )
396
397         with PapiSocketExecutor(node) as papi_exec:
398             papi_exec.add(cmd, **args_in).get_reply(err_msg)
399
400         # A closure, accessing the variables above.
401         def resetter():
402             """Delete and re-add the deterministic NAT mapping."""
403             with PapiSocketExecutor(node) as papi_exec:
404                 args_in[u"is_add"] = False
405                 papi_exec.add(cmd, **args_in)
406                 args_in[u"is_add"] = True
407                 papi_exec.add(cmd, **args_in)
408                 papi_exec.get_replies(err_msg)
409
410         return resetter
411
412     @staticmethod
413     def get_det44_mapping(node):
414         """Get DET44 mapping data.
415
416         :param node: DUT node.
417         :type node: dict
418         :returns: Dictionary of DET44 mapping data.
419         :rtype: dict
420         """
421         cmd = u"det44_map_dump"
422         err_msg = f"Failed to get DET44 mapping data on the host " \
423             f"{node[u'host']}!"
424         args_in = dict()
425         with PapiSocketExecutor(node) as papi_exec:
426             details = papi_exec.add(cmd, **args_in).get_reply(err_msg)
427
428         return details
429
430     @staticmethod
431     def get_det44_sessions_number(node):
432         """Get number of established DET44 sessions from actual DET44 mapping
433         data.
434         :param node: DUT node.
435         :type node: dict
436         :returns: Number of established DET44 sessions.
437         :rtype: int
438         """
439         det44_data = NATUtil.get_det44_mapping(node)
440         return det44_data.get(u"ses_num", 0)
441
442     @staticmethod
443     def show_det44(node):
444         """Show DET44 data.
445
446         Used data sources:
447
448             det44_interface_dump
449             det44_map_dump
450             det44_session_dump
451
452         :param node: DUT node.
453         :type node: dict
454         """
455         cmds = [
456             u"det44_interface_dump",
457             u"det44_map_dump",
458             u"det44_session_dump",
459         ]
460         PapiSocketExecutor.dump_and_log(node, cmds)