CSIT-1597 NAT44 API: dynamic config
[csit.git] / resources / libraries / python / NATUtil.py
1 # Copyright (c) 2020 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """NAT utilities library."""
15
16 from math import log2, modf
17 from pprint import pformat
18 from enum import IntEnum
19
20 from ipaddress import IPv4Address
21 from robot.api import logger
22
23 from resources.libraries.python.Constants import Constants
24 from resources.libraries.python.InterfaceUtil import InterfaceUtil
25 from resources.libraries.python.topology import Topology
26 from resources.libraries.python.PapiExecutor import PapiSocketExecutor
27
28
29 class NatConfigFlags(IntEnum):
30     """NAT plugin configuration flags"""
31     NAT_IS_NONE = 0x00
32     NAT_IS_TWICE_NAT = 0x01
33     NAT_IS_SELF_TWICE_NAT = 0x02
34     NAT_IS_OUT2IN_ONLY = 0x04
35     NAT_IS_ADDR_ONLY = 0x08
36     NAT_IS_OUTSIDE = 0x10
37     NAT_IS_INSIDE = 0x20
38     NAT_IS_STATIC = 0x40
39     NAT_IS_EXT_HOST_VALID = 0x80
40
41
42 class Nat44ConfigFlags(IntEnum):
43     """NAT44 configuration flags"""
44     NAT44_IS_ENDPOINT_INDEPENDENT = 0x00
45     NAT44_IS_ENDPOINT_DEPENDENT = 0x01
46     NAT44_IS_STATIC_MAPPING_ONLY = 0x02
47     NAT44_IS_CONNECTION_TRACKING = 0x04
48     NAT44_IS_OUT2IN_DPO = 0x08
49
50
51 class NatAddrPortAllocAlg(IntEnum):
52     """NAT Address and port assignment algorithms."""
53     NAT_ALLOC_ALG_DEFAULT = 0
54     NAT_ALLOC_ALG_MAP_E = 1
55     NAT_ALLOC_ALG_PORT_RANGE = 2
56
57
58 class NATUtil:
59     """This class defines the methods to set NAT."""
60
61     def __init__(self):
62         pass
63
64     @staticmethod
65     def enable_nat44_plugin(
66             node, inside_vrf=0, outside_vrf=0, users=0, user_memory=0,
67             sessions=0, session_memory=0, user_sessions=0, mode=u""):
68         """Enable NAT44 plugin.
69
70         :param node: DUT node.
71         :param inside_vrf: Inside VRF ID.
72         :param outside_vrf: Outside VRF ID.
73         :param users: Maximum number of users. Used only in endpoint-independent
74             mode.
75         :param user_memory: User memory size - overwrite auto calculated hash
76             allocation parameter if non-zero.
77         :param sessions: Maximum number of sessions.
78         :param session_memory: Session memory size - overwrite auto calculated
79             hash allocation parameter if non-zero.
80         :param user_sessions: Maximum number of sessions per user. Used only in
81             endpoint-independent mode.
82         :param mode: NAT44 mode. Valid values:
83             - endpoint-independent
84             - endpoint-dependent
85             - static-mapping-only
86             - connection-tracking
87             - out2in-dpo
88         :type node: dict
89         :type inside_vrf: str or int
90         :type outside_vrf: str or int
91         :type users: str or int
92         :type user_memory: str or int
93         :type sessions: str or int
94         :type session_memory: str or int
95         :type user_sessions: str or int
96         :type mode: str
97         """
98         cmd = u"nat44_plugin_enable_disable"
99         err_msg = f"Failed to enable NAT44 plugin on the host {node[u'host']}!"
100         args_in = dict(
101             enable=True,
102             inside_vrf=int(inside_vrf),
103             outside_vrf=int(outside_vrf),
104             users=int(users),
105             user_memory=int(user_memory),
106             sessions=int(sessions),
107             session_memory=int(session_memory),
108             user_sessions=int(user_sessions),
109             flags=getattr(
110                 Nat44ConfigFlags,
111                 f"NAT44_IS_{mode.replace(u'-', u'_').upper()}"
112             ).value
113         )
114
115         with PapiSocketExecutor(node) as papi_exec:
116             papi_exec.add(cmd, **args_in).get_reply(err_msg)
117
118     @staticmethod
119     def set_nat44_interface(node, interface, flag):
120         """Set inside and outside interfaces for NAT44.
121
122         :param node: DUT node.
123         :param interface: NAT44 interface.
124         :param flag: Interface NAT configuration flag name.
125         :type node: dict
126         :type interface: str
127         :type flag: str
128         """
129         cmd = u"nat44_interface_add_del_feature"
130
131         err_msg = f"Failed to set {flag} interface {interface} for NAT44 " \
132             f"on host {node[u'host']}"
133         args_in = dict(
134             sw_if_index=InterfaceUtil.get_sw_if_index(node, interface),
135             is_add=1,
136             flags=getattr(NatConfigFlags, flag).value
137         )
138
139         with PapiSocketExecutor(node) as papi_exec:
140             papi_exec.add(cmd, **args_in).get_reply(err_msg)
141
142     @staticmethod
143     def set_nat44_interfaces(node, int_in, int_out):
144         """Set inside and outside interfaces for NAT44.
145
146         :param node: DUT node.
147         :param int_in: Inside interface.
148         :param int_out: Outside interface.
149         :type node: dict
150         :type int_in: str
151         :type int_out: str
152         """
153         NATUtil.set_nat44_interface(node, int_in, u"NAT_IS_INSIDE")
154         NATUtil.set_nat44_interface(node, int_out, u"NAT_IS_OUTSIDE")
155
156     @staticmethod
157     def set_nat44_address_range(
158             node, start_ip, end_ip, vrf_id=Constants.BITWISE_NON_ZERO,
159             flag=u"NAT_IS_NONE"):
160         """Set NAT44 address range.
161
162         :param node: DUT node.
163         :param start_ip: IP range start.
164         :param end_ip: IP range end.
165         :param vrf_id: VRF index (Optional).
166         :param flag: NAT flag name.
167         :type node: dict
168         :type start_ip: str
169         :type end_ip: str
170         :type vrf_id: int
171         :type flag: str
172         """
173         cmd = u"nat44_add_del_address_range"
174         err_msg = f"Failed to set NAT44 address range on host {node[u'host']}"
175         args_in = dict(
176             is_add=True,
177             first_ip_address=IPv4Address(str(start_ip)).packed,
178             last_ip_address=IPv4Address(str(end_ip)).packed,
179             vrf_id=vrf_id,
180             flags=getattr(NatConfigFlags, flag).value
181         )
182
183         with PapiSocketExecutor(node) as papi_exec:
184             papi_exec.add(cmd, **args_in).get_reply(err_msg)
185
186     @staticmethod
187     def show_nat_config(node):
188         """Show the NAT configuration.
189
190         :param node: DUT node.
191         :type node: dict
192         """
193         cmd = u"nat_show_config"
194         err_msg = f"Failed to get NAT configuration on host {node[u'host']}"
195
196         with PapiSocketExecutor(node) as papi_exec:
197             reply = papi_exec.add(cmd).get_reply(err_msg)
198
199         logger.debug(f"NAT Configuration:\n{pformat(reply)}")
200
201     @staticmethod
202     def show_nat44_summary(node):
203         """Show NAT44 summary on the specified topology node.
204
205         :param node: Topology node.
206         :type node: dict
207         :returns: NAT44 summary data.
208         :rtype: str
209         """
210         return PapiSocketExecutor.run_cli_cmd(node, u"show nat44 summary")
211
212     @staticmethod
213     def show_nat_base_data(node):
214         """Show the NAT base data.
215
216         Used data sources:
217
218             nat_worker_dump
219             nat44_interface_addr_dump
220             nat44_address_dump
221             nat44_static_mapping_dump
222             nat44_interface_dump
223
224         :param node: DUT node.
225         :type node: dict
226         """
227         cmds = [
228             u"nat_worker_dump",
229             u"nat44_interface_addr_dump",
230             u"nat44_address_dump",
231             u"nat44_static_mapping_dump",
232             u"nat44_interface_dump",
233         ]
234         PapiSocketExecutor.dump_and_log(node, cmds)
235
236     @staticmethod
237     def show_nat_user_data(node):
238         """Show the NAT user data.
239
240         Used data sources:
241
242             nat44_user_dump
243             nat44_user_session_dump
244
245         :param node: DUT node.
246         :type node: dict
247         """
248         cmds = [
249             u"nat44_user_dump",
250             u"nat44_user_session_dump",
251         ]
252         PapiSocketExecutor.dump_and_log(node, cmds)
253
254     @staticmethod
255     def compute_max_translations_per_thread(sessions, threads):
256         """Compute value of max_translations_per_thread NAT44 parameter based on
257         total number of worker threads.
258
259         :param sessions: Required number of NAT44 sessions.
260         :param threads: Number of worker threads.
261         :type sessions: int
262         :type threads: int
263         :returns: Value of max_translations_per_thread NAT44 parameter.
264         :rtype: int
265         """
266         # vpp-device tests have not dedicated physical core so
267         # ${thr_count_int} == 0 but we need to use one thread
268         threads = 1 if not int(threads) else int(threads)
269         rest, mult = modf(log2(sessions/(10*threads)))
270         return 2 ** (int(mult) + (1 if rest else 0)) * 10
271
272     @staticmethod
273     def get_nat44_sessions_number(node, proto):
274         """Get number of established NAT44 sessions from actual NAT44 mapping
275         data.
276
277         :param node: DUT node.
278         :param proto: Required protocol - TCP/UDP/ICMP.
279         :type node: dict
280         :type proto: str
281         :returns: Number of established NAT44 sessions.
282         :rtype: int
283         :raises ValueError: If not supported protocol.
284         """
285         nat44_data = dict()
286         if proto in [u"UDP", u"TCP", u"ICMP"]:
287             for line in NATUtil.show_nat44_summary(node).splitlines():
288                 sum_k, sum_v = line.split(u":") if u":" in line \
289                     else (line, None)
290                 nat44_data[sum_k] = sum_v.strip() if isinstance(sum_v, str) \
291                     else sum_v
292         else:
293             raise ValueError(f"Unsupported protocol: {proto}!")
294         return nat44_data.get(f"total {proto.lower()} sessions", 0)
295
296     # DET44 PAPI calls
297     # DET44 means deterministic mode of NAT44
298     @staticmethod
299     def enable_det44_plugin(node, inside_vrf=0, outside_vrf=0):
300         """Enable DET44 plugin.
301
302         :param node: DUT node.
303         :param inside_vrf: Inside VRF ID.
304         :param outside_vrf: Outside VRF ID.
305         :type node: dict
306         :type inside_vrf: str or int
307         :type outside_vrf: str or int
308         """
309         cmd = u"det44_plugin_enable_disable"
310         err_msg = f"Failed to enable DET44 plugin on the host {node[u'host']}!"
311         args_in = dict(
312             enable=True,
313             inside_vrf=int(inside_vrf),
314             outside_vrf=int(outside_vrf)
315         )
316
317         with PapiSocketExecutor(node) as papi_exec:
318             papi_exec.add(cmd, **args_in).get_reply(err_msg)
319
320     @staticmethod
321     def set_det44_interface(node, if_key, is_inside):
322         """Enable DET44 feature on the interface.
323
324         :param node: DUT node.
325         :param if_key: Interface key from topology file of interface
326             to enable DET44 feature on.
327         :param is_inside: True if interface is inside, False if outside.
328         :type node: dict
329         :type if_key: str
330         :type is_inside: bool
331         """
332         cmd = u"det44_interface_add_del_feature"
333         err_msg = f"Failed to enable DET44 feature on the interface {if_key} " \
334             f"on the host {node[u'host']}!"
335         args_in = dict(
336             is_add=True,
337             is_inside=is_inside,
338             sw_if_index=Topology.get_interface_sw_index(node, if_key)
339         )
340
341         with PapiSocketExecutor(node) as papi_exec:
342             papi_exec.add(cmd, **args_in).get_reply(err_msg)
343
344     @staticmethod
345     def set_det44_mapping(node, ip_in, subnet_in, ip_out, subnet_out):
346         """Set DET44 mapping.
347
348         :param node: DUT node.
349         :param ip_in: Inside IP.
350         :param subnet_in: Inside IP subnet.
351         :param ip_out: Outside IP.
352         :param subnet_out: Outside IP subnet.
353         :type node: dict
354         :type ip_in: str
355         :type subnet_in: str or int
356         :type ip_out: str
357         :type subnet_out: str or int
358         """
359         cmd = u"det44_add_del_map"
360         err_msg = f"Failed to set DET44 mapping on the host {node[u'host']}!"
361         args_in = dict(
362             is_add=True,
363             in_addr=IPv4Address(str(ip_in)).packed,
364             in_plen=int(subnet_in),
365             out_addr=IPv4Address(str(ip_out)).packed,
366             out_plen=int(subnet_out)
367         )
368
369         with PapiSocketExecutor(node) as papi_exec:
370             papi_exec.add(cmd, **args_in).get_reply(err_msg)
371
372     @staticmethod
373     def get_det44_mapping(node):
374         """Get DET44 mapping data.
375
376         :param node: DUT node.
377         :type node: dict
378         :returns: Dictionary of DET44 mapping data.
379         :rtype: dict
380         """
381         cmd = u"det44_map_dump"
382         err_msg = f"Failed to get DET44 mapping data on the host " \
383             f"{node[u'host']}!"
384         args_in = dict()
385         with PapiSocketExecutor(node) as papi_exec:
386             details = papi_exec.add(cmd, **args_in).get_reply(err_msg)
387
388         return details
389
390     @staticmethod
391     def get_det44_sessions_number(node):
392         """Get number of established DET44 sessions from actual DET44 mapping
393         data.
394
395         :param node: DUT node.
396         :type node: dict
397         :returns: Number of established DET44 sessions.
398         :rtype: int
399         """
400         det44_data = NATUtil.get_det44_mapping(node)
401
402         return det44_data.get(u"ses_num", 0)
403
404     @staticmethod
405     def show_det44(node):
406         """Show DET44 data.
407
408         Used data sources:
409
410             det44_interface_dump
411             det44_map_dump
412             det44_session_dump
413
414         :param node: DUT node.
415         :type node: dict
416         """
417         cmds = [
418             u"det44_interface_dump",
419             u"det44_map_dump",
420             u"det44_session_dump",
421         ]
422         PapiSocketExecutor.dump_and_log(node, cmds)