Python3: resources and libraries
[csit.git] / resources / libraries / python / SRv6.py
1 # Copyright (c) 2019 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """Segment Routing over IPv6 data plane utilities library."""
15
16 from enum import IntEnum
17
18 from ipaddress import ip_address, IPv6Address
19
20 from resources.libraries.python.Constants import Constants
21 from resources.libraries.python.InterfaceUtil import InterfaceUtil
22 from resources.libraries.python.PapiExecutor import PapiSocketExecutor
23
24
25 class SRv6Behavior(IntEnum):
26     """SRv6 LocalSID supported functions."""
27     # Endpoint function
28     END = 1
29     # Endpoint function with Layer-3 cross-connect
30     END_X = 2
31     # Endpoint with decapsulation and Layer-2 cross-connect
32     END_DX2 = 5
33     # Endpoint with decapsulation and IPv6 cross-connect
34     END_DX6 = 6
35     # Endpoint with decapsulation and IPv4 cross-connect
36     END_DX4 = 7
37     # Endpoint with decapsulation and IPv6 table lookup
38     END_DT6 = 8
39     # Endpoint with decapsulation and IPv4 table lookup
40     END_DT4 = 9
41     # Endpoint to SR-unaware appliance via static proxy
42     END_AS = 20
43     # Endpoint to SR-unaware appliance via dynamic proxy
44     END_AD = 21
45     # Endpoint to SR-unaware appliance via masquerading
46     END_AM = 22
47
48
49 class SRv6PolicySteeringTypes(IntEnum):
50     """SRv6 LocalSID supported functions."""
51     SR_STEER_L2 = 2
52     SR_STEER_IPV4 = 4
53     SR_STEER_IPV6 = 6
54
55
56 class SRv6:
57     """SRv6 class."""
58
59     @staticmethod
60     def create_srv6_sid_object(ip_addr):
61         """Create SRv6 SID object.
62
63         :param ip_addr: IPv6 address.
64         :type ip_addr: str
65         :returns: SRv6 SID object.
66         :rtype: dict
67         """
68         return dict(
69             addr=IPv6Address(ip_addr).packed
70         )
71
72     @staticmethod
73     def create_srv6_sid_list(sids):
74         """Create SRv6 SID list object.
75
76         :param sids: SID IPv6 addresses.
77         :type sids: list
78         :returns: SRv6 SID list object.
79         :rtype: list
80         """
81         sid_list = [0] * 16
82
83         for index, item in enumerate(sids):
84             sid_list[index] = SRv6.create_srv6_sid_object(item)
85
86         return dict(
87             num_sids=len(sids),
88             weight=1,
89             sids=sid_list
90         )
91
92     @staticmethod
93     def configure_sr_localsid(
94             node, local_sid, behavior, interface=None, next_hop=None,
95             fib_table=None, out_if=None, in_if=None, src_addr=None,
96             sid_list=None):
97         """Create SRv6 LocalSID and binds it to a particular behaviour on
98         the given node.
99
100         :param node: Given node to create localSID on.
101         :param local_sid: LocalSID IPv6 address.
102         :param behavior: SRv6 LocalSID function.
103         :param interface: Interface name (Optional, required for
104             L2/L3 xconnects).
105         :param next_hop: Next hop IPv4/IPv6 address (Optional, required for L3
106             xconnects).
107         :param fib_table: FIB table for IPv4/IPv6 lookup (Optional, required for
108             L3 routing).
109         :param out_if: Interface name of local interface for sending traffic
110             towards the Service Function (Optional, required for SRv6 endpoint
111             to SR-unaware appliance).
112         :param in_if: Interface name of local interface receiving the traffic
113             coming back from the Service Function (Optional, required for SRv6
114             endpoint to SR-unaware appliance).
115         :param src_addr: Source address on the packets coming back on in_if
116             interface (Optional, required for SRv6 endpoint to SR-unaware
117             appliance via static proxy).
118         :param sid_list: SID list (Optional, required for SRv6 endpoint to
119             SR-unaware appliance via static proxy).
120         :type node: dict
121         :type local_sid: str
122         :type behavior: str
123         :type interface: str
124         :type next_hop: str
125         :type fib_table: str
126         :type out_if: str
127         :type in_if: str
128         :type src_addr: str
129         :type sid_list: list
130         :raises ValueError: If required parameter is missing.
131         """
132         beh = behavior.replace(u".", u"_").upper()
133         # There is no SRv6Behaviour enum defined for functions from SRv6 plugins
134         # so we need to use CLI command to configure it.
135         if beh in (getattr(SRv6Behavior, u"END_AD").name,
136                    getattr(SRv6Behavior, u"END_AS").name,
137                    getattr(SRv6Behavior, u"END_AM").name):
138             if beh == getattr(SRv6Behavior, u"END_AS").name:
139                 if next_hop is None or out_if is None or in_if is None or \
140                         src_addr is None or sid_list is None:
141                     raise ValueError(
142                         f"Required parameter(s) missing.\n"
143                         f"next_hop:{next_hop}\n "
144                         f"out_if:{out_if}\n"
145                         f"in_if:{in_if}\n"
146                         f"src_addr:{src_addr}\n"
147                         f"sid_list:{sid_list}"
148                     )
149                 sid_conf = f"next {u' next '.join(sid_list)}"
150                 params = f"nh {next_hop} oif {out_if} iif {in_if} " \
151                     f"src {src_addr} {sid_conf}"
152             else:
153                 if next_hop is None or out_if is None or in_if is None:
154                     raise ValueError(
155                         f"Required parameter(s) missing.\n"
156                         f"next_hop:{next_hop}\n"
157                         f"out_if:{out_if}\n"
158                         f"in_if:{in_if}"
159                     )
160                 params = f"nh {next_hop} oif {out_if} iif {in_if}"
161
162             cli_cmd = f"sr localsid address {local_sid} behavior {behavior}" \
163                 f"{params}"
164
165             PapiSocketExecutor.run_cli_cmd(node, cli_cmd)
166
167         cmd = u"sr_localsid_add_del"
168         args = dict(
169             is_del=0,
170             localsid=SRv6.create_srv6_sid_object(local_sid),
171             end_psp=0,
172             behavior=getattr(SRv6Behavior, beh).value,
173             sw_if_index=Constants.BITWISE_NON_ZERO,
174             vlan_index=0,
175             fib_table=0,
176             nh_addr6=0,
177             nh_addr4=0
178         )
179         err_msg = f"Failed to add SR localSID {local_sid} " \
180             f"host {node[u'host']}"
181         if beh in (getattr(SRv6Behavior, u"END_X").name,
182                    getattr(SRv6Behavior, u"END_DX4").name,
183                    getattr(SRv6Behavior, u"END_DX6").name):
184             if interface is None or next_hop is None:
185                 raise ValueError(
186                     f"Required parameter(s) missing.\n"
187                     f"interface:{interface}\n"
188                     f"next_hop:{next_hop}"
189                 )
190             args[u"sw_if_index"] = InterfaceUtil.get_interface_index(
191                 node, interface
192             )
193             next_hop = ip_address(next_hop)
194             if next_hop.version == 6:
195                 args[u"nh_addr6"] = next_hop.packed
196             else:
197                 args[u"nh_addr4"] = next_hop.packed
198         elif beh == getattr(SRv6Behavior, u"END_DX2").name:
199             if interface is None:
200                 raise ValueError(
201                     f"Required parameter missing.\ninterface: {interface}"
202                 )
203             args[u"sw_if_index"] = InterfaceUtil.get_interface_index(
204                 node, interface
205             )
206         elif beh in (getattr(SRv6Behavior, u"END_DT4").name,
207                      getattr(SRv6Behavior, u"END_DT6").name):
208             if fib_table is None:
209                 raise ValueError(
210                     f"Required parameter missing.\n"
211                     f"fib_table: {fib_table}"
212                 )
213             args[u"fib_table"] = fib_table
214
215         with PapiSocketExecutor(node) as papi_exec:
216             papi_exec.add(cmd, **args).get_reply(err_msg)
217
218     @staticmethod
219     def show_sr_localsids(node):
220         """Show SRv6 LocalSIDs on the given node.
221
222         :param node: Given node to show localSIDs on.
223         :type node: dict
224         """
225         cmd = u"sr_localsids_dump"
226         err_msg = f"Failed to get SR localSID dump on host {node[u'host']}"
227
228         with PapiSocketExecutor(node) as papi_exec:
229             papi_exec.add(cmd).get_details(err_msg)
230
231     @staticmethod
232     def configure_sr_policy(node, bsid, sid_list, mode=u"encap"):
233         """Create SRv6 policy on the given node.
234
235         :param node: Given node to create SRv6 policy on.
236         :param bsid: BindingSID - local SID IPv6 address.
237         :param sid_list: SID list.
238         :param mode: Encapsulation / insertion mode.
239         :type node: dict
240         :type bsid: str
241         :type sid_list: list
242         :type mode: str
243         """
244         cmd = u"sr_policy_add"
245         args = dict(
246             bsid_addr=IPv6Address(bsid).packed,
247             weight=1,
248             is_encap=1 if mode == u"encap" else 0,
249             sids=SRv6.create_srv6_sid_list(sid_list)
250         )
251         err_msg = f"Failed to add SR policy for BindingSID {bsid} " \
252             f"on host {node[u'host']}"
253
254         with PapiSocketExecutor(node) as papi_exec:
255             papi_exec.add(cmd, **args).get_reply(err_msg)
256
257     @staticmethod
258     def show_sr_policies(node):
259         """Show SRv6 policies on the given node.
260
261         :param node: Given node to show SRv6 policies on.
262         :type node: dict
263         """
264         cmd = u"sr_policies_dump"
265         err_msg = f"Failed to get SR policies dump on host {node[u'host']}"
266
267         with PapiSocketExecutor(node) as papi_exec:
268             papi_exec.add(cmd).get_details(err_msg)
269
270     @staticmethod
271     def _get_sr_steer_policy_args(
272             node, mode, interface=None, ip_addr=None, prefix=None):
273         """Return values of sw_if_index, mask_width, prefix_addr and
274             traffic_type for sr_steering_add_del API.
275
276         :param node: Given node to create/delete steering policy on.
277         :param mode: Mode of operation - L2 or L3.
278         :param interface: Interface name (Optional, required in case of
279             L2 mode).
280         :param ip_addr: IPv4/IPv6 address (Optional, required in case of L3
281             mode).
282         :param prefix: IP address prefix (Optional, required in case of L3
283             mode).
284         :type node: dict
285         :type mode: str
286         :type interface: str
287         :type ip_addr: str
288         :type prefix: int
289         :returns: Values for sw_if_index, mask_width, prefix_addr and
290             traffic_type
291         :rtype: tuple
292         :raises ValueError: If unsupported mode used or required parameter
293             is missing.
294         """
295         if mode.lower() == u"l2":
296             if interface is None:
297                 raise ValueError(
298                     f"Required data missing.\n"
299                     f"interface: {interface}"
300                 )
301             sw_if_index = InterfaceUtil.get_interface_index(node, interface)
302             mask_width = 0
303             prefix_addr = 16 * b"\0"
304             traffic_type = getattr(
305                 SRv6PolicySteeringTypes, u"SR_STEER_L2"
306             ).value
307         elif mode.lower() == u"l3":
308             if ip_addr is None or prefix is None:
309                 raise ValueError(
310                     f"Required data missing.\n"
311                     f"IP address:{ip_addr}\n"
312                     f"mask:{prefix}"
313                 )
314             sw_if_index = Constants.BITWISE_NON_ZERO
315             ip_addr = ip_address(ip_addr)
316             prefix_addr = ip_addr.packed
317             mask_width = int(prefix)
318             if ip_addr.version == 4:
319                 prefix_addr += 12 * b"\0"
320                 traffic_type = getattr(
321                     SRv6PolicySteeringTypes, u"SR_STEER_IPV4"
322                 ).value
323             else:
324                 traffic_type = getattr(
325                     SRv6PolicySteeringTypes, u"SR_STEER_IPV6"
326                 ).value
327         else:
328             raise ValueError(f"Unsupported mode: {mode}")
329
330         return sw_if_index, mask_width, prefix_addr, traffic_type
331
332     # TODO: Bring L1 names, arguments and defaults closer to PAPI ones.
333     @staticmethod
334     def configure_sr_steer(
335             node, mode, bsid, interface=None, ip_addr=None, prefix=None):
336         """Create SRv6 steering policy on the given node.
337
338         :param node: Given node to create steering policy on.
339         :param mode: Mode of operation - L2 or L3.
340         :param bsid: BindingSID - local SID IPv6 address.
341         :param interface: Interface name (Optional, required in case of
342             L2 mode).
343         :param ip_addr: IPv4/IPv6 address (Optional, required in case of L3
344             mode).
345         :param prefix: IP address prefix (Optional, required in case of L3
346             mode).
347         :type node: dict
348         :type mode: str
349         :type bsid: str
350         :type interface: str
351         :type ip_addr: str
352         :type prefix: int
353         :raises ValueError: If unsupported mode used or required parameter
354             is missing.
355         """
356         sw_if_index, mask_width, prefix_addr, traffic_type = \
357             SRv6._get_sr_steer_policy_args(
358                 node, mode, interface, ip_addr, prefix
359             )
360
361         cmd = u"sr_steering_add_del"
362         args = dict(
363             is_del=0,
364             bsid_addr=IPv6Address(str(bsid)).packed,
365             sr_policy_index=0,
366             table_id=0,
367             prefix_addr=prefix_addr,
368             mask_width=mask_width,
369             sw_if_index=sw_if_index,
370             traffic_type=traffic_type
371         )
372         err_msg = f"Failed to add SRv6 steering policy for BindingSID {bsid} " \
373             f"on host {node[u'host']}"
374
375         with PapiSocketExecutor(node) as papi_exec:
376             papi_exec.add(cmd, **args).get_reply(err_msg)
377
378     @staticmethod
379     def show_sr_steering_policies(node):
380         """Show SRv6 steering policies on the given node.
381
382         :param node: Given node to show SRv6 steering policies on.
383         :type node: dict
384         """
385         cmd = u"sr_steering_pol_dump"
386         err_msg = f"Failed to get SR localSID dump on host {node[u'host']}"
387
388         with PapiSocketExecutor(node) as papi_exec:
389             papi_exec.add(cmd).get_details(err_msg)
390
391     @staticmethod
392     def set_sr_encaps_source_address(node, ip6_addr):
393         """Set SRv6 encapsulation source address on the given node.
394
395         :param node: Given node to set SRv6 encapsulation source address on.
396         :param ip6_addr: Local SID IPv6 address.
397         :type node: dict
398         :type ip6_addr: str
399         """
400         cmd = u"sr_set_encap_source"
401         args = dict(
402             encaps_source=IPv6Address(ip6_addr).packed
403         )
404         err_msg = f"Failed to set SRv6 encapsulation source address " \
405             f"{ip6_addr} on host {node[u'host']}"
406
407         with PapiSocketExecutor(node) as papi_exec:
408             papi_exec.add(cmd, **args).get_reply(err_msg)