f0a2eaf7d498c9a14833f05f7dc799abc8015759
[csit.git] / resources / libraries / python / SRv6.py
1 # Copyright (c) 2019 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """Segment Routing over IPv6 data plane utilities library."""
15
16 from enum import IntEnum
17
18 from ipaddress import ip_address, IPv6Address
19
20 from resources.libraries.python.Constants import Constants
21 from resources.libraries.python.InterfaceUtil import InterfaceUtil
22 from resources.libraries.python.PapiExecutor import PapiSocketExecutor
23
24
25 class SRv6Behavior(IntEnum):
26     """SRv6 LocalSID supported functions."""
27     # Endpoint function
28     END = 1
29     # Endpoint function with Layer-3 cross-connect
30     END_X = 2
31     # Endpoint with decapsulation and Layer-2 cross-connect
32     END_DX2 = 5
33     # Endpoint with decapsulation and IPv6 cross-connect
34     END_DX6 = 6
35     # Endpoint with decapsulation and IPv4 cross-connect
36     END_DX4 = 7
37     # Endpoint with decapsulation and IPv6 table lookup
38     END_DT6 = 8
39     # Endpoint with decapsulation and IPv4 table lookup
40     END_DT4 = 9
41     # Endpoint to SR-unaware appliance via static proxy
42     END_AS = 20
43     # Endpoint to SR-unaware appliance via dynamic proxy
44     END_AD = 21
45     # Endpoint to SR-unaware appliance via masquerading
46     END_AM = 22
47
48
49 class SRv6PolicySteeringTypes(IntEnum):
50     """SRv6 LocalSID supported functions."""
51     SR_STEER_L2 = 2
52     SR_STEER_IPV4 = 4
53     SR_STEER_IPV6 = 6
54
55
56 class SRv6:
57     """SRv6 class."""
58
59     @staticmethod
60     def create_srv6_sid_object(ip_addr):
61         """Create SRv6 SID object.
62
63         :param ip_addr: IPv6 address.
64         :type ip_addr: str
65         :returns: SRv6 SID object.
66         :rtype: dict
67         """
68         return dict(
69             addr=IPv6Address(ip_addr).packed
70         )
71
72     @staticmethod
73     def create_srv6_sid_list(sids):
74         """Create SRv6 SID list object.
75
76         :param sids: SID IPv6 addresses.
77         :type sids: list
78         :returns: SRv6 SID list object.
79         :rtype: list
80         """
81         sid_list = [0] * 16
82
83         for index, item in enumerate(sids):
84             sid_list[index] = SRv6.create_srv6_sid_object(item)
85
86         return dict(
87             num_sids=len(sids),
88             weight=1,
89             sids=sid_list
90         )
91
92     @staticmethod
93     def configure_sr_localsid(
94             node, local_sid, behavior, interface=None, next_hop=None,
95             fib_table=None, out_if=None, in_if=None, src_addr=None,
96             sid_list=None):
97         """Create SRv6 LocalSID and binds it to a particular behaviour on
98         the given node.
99
100         :param node: Given node to create localSID on.
101         :param local_sid: LocalSID IPv6 address.
102         :param behavior: SRv6 LocalSID function.
103         :param interface: Interface name (Optional, required for
104             L2/L3 xconnects).
105         :param next_hop: Next hop IPv4/IPv6 address (Optional, required for L3
106             xconnects).
107         :param fib_table: FIB table for IPv4/IPv6 lookup (Optional, required for
108             L3 routing).
109         :param out_if: Interface name of local interface for sending traffic
110             towards the Service Function (Optional, required for SRv6 endpoint
111             to SR-unaware appliance).
112         :param in_if: Interface name of local interface receiving the traffic
113             coming back from the Service Function (Optional, required for SRv6
114             endpoint to SR-unaware appliance).
115         :param src_addr: Source address on the packets coming back on in_if
116             interface (Optional, required for SRv6 endpoint to SR-unaware
117             appliance via static proxy).
118         :param sid_list: SID list (Optional, required for SRv6 endpoint to
119             SR-unaware appliance via static proxy).
120         :type node: dict
121         :type local_sid: str
122         :type behavior: str
123         :type interface: str
124         :type next_hop: str
125         :type fib_table: str
126         :type out_if: str
127         :type in_if: str
128         :type src_addr: str
129         :type sid_list: list
130         :raises ValueError: If required parameter is missing.
131         """
132         beh = behavior.replace(u".", u"_").upper()
133         # There is no SRv6Behaviour enum defined for functions from SRv6 plugins
134         # so we need to use CLI command to configure it.
135         if beh in (getattr(SRv6Behavior, u"END_AD").name,
136                    getattr(SRv6Behavior, u"END_AS").name,
137                    getattr(SRv6Behavior, u"END_AM").name):
138             if beh == getattr(SRv6Behavior, u"END_AS").name:
139                 if next_hop is None or out_if is None or in_if is None or \
140                         src_addr is None or sid_list is None:
141                     raise ValueError(
142                         f"Required parameter(s) missing.\n"
143                         f"next_hop:{next_hop}\n "
144                         f"out_if:{out_if}\n"
145                         f"in_if:{in_if}\n"
146                         f"src_addr:{src_addr}\n"
147                         f"sid_list:{sid_list}"
148                     )
149                 sid_conf = f"next {u' next '.join(sid_list)}"
150                 params = f"nh {next_hop} oif {out_if} iif {in_if} " \
151                     f"src {src_addr} {sid_conf}"
152             else:
153                 if next_hop is None or out_if is None or in_if is None:
154                     raise ValueError(
155                         f"Required parameter(s) missing.\n"
156                         f"next_hop:{next_hop}\n"
157                         f"out_if:{out_if}\n"
158                         f"in_if:{in_if}"
159                     )
160                 params = f"nh {next_hop} oif {out_if} iif {in_if}"
161
162             cli_cmd = f"sr localsid address {local_sid} behavior {behavior} " \
163                 f"{params}"
164
165             PapiSocketExecutor.run_cli_cmd(node, cli_cmd)
166             return
167
168         cmd = u"sr_localsid_add_del"
169         args = dict(
170             is_del=0,
171             localsid=SRv6.create_srv6_sid_object(local_sid),
172             end_psp=0,
173             behavior=getattr(SRv6Behavior, beh).value,
174             sw_if_index=Constants.BITWISE_NON_ZERO,
175             vlan_index=0,
176             fib_table=0,
177             nh_addr6=0,
178             nh_addr4=0
179         )
180         err_msg = f"Failed to add SR localSID {local_sid} " \
181             f"host {node[u'host']}"
182         if beh in (getattr(SRv6Behavior, u"END_X").name,
183                    getattr(SRv6Behavior, u"END_DX4").name,
184                    getattr(SRv6Behavior, u"END_DX6").name):
185             if interface is None or next_hop is None:
186                 raise ValueError(
187                     f"Required parameter(s) missing.\n"
188                     f"interface:{interface}\n"
189                     f"next_hop:{next_hop}"
190                 )
191             args[u"sw_if_index"] = InterfaceUtil.get_interface_index(
192                 node, interface
193             )
194             next_hop = ip_address(next_hop)
195             if next_hop.version == 6:
196                 args[u"nh_addr6"] = next_hop.packed
197             else:
198                 args[u"nh_addr4"] = next_hop.packed
199         elif beh == getattr(SRv6Behavior, u"END_DX2").name:
200             if interface is None:
201                 raise ValueError(
202                     f"Required parameter missing.\ninterface: {interface}"
203                 )
204             args[u"sw_if_index"] = InterfaceUtil.get_interface_index(
205                 node, interface
206             )
207         elif beh in (getattr(SRv6Behavior, u"END_DT4").name,
208                      getattr(SRv6Behavior, u"END_DT6").name):
209             if fib_table is None:
210                 raise ValueError(
211                     f"Required parameter missing.\n"
212                     f"fib_table: {fib_table}"
213                 )
214             args[u"fib_table"] = fib_table
215
216         with PapiSocketExecutor(node) as papi_exec:
217             papi_exec.add(cmd, **args).get_reply(err_msg)
218
219     @staticmethod
220     def show_sr_localsids(node):
221         """Show SRv6 LocalSIDs on the given node.
222
223         :param node: Given node to show localSIDs on.
224         :type node: dict
225         """
226         cmd = u"sr_localsids_dump"
227         err_msg = f"Failed to get SR localSID dump on host {node[u'host']}"
228
229         with PapiSocketExecutor(node) as papi_exec:
230             papi_exec.add(cmd).get_details(err_msg)
231
232     @staticmethod
233     def configure_sr_policy(node, bsid, sid_list, mode=u"encap"):
234         """Create SRv6 policy on the given node.
235
236         :param node: Given node to create SRv6 policy on.
237         :param bsid: BindingSID - local SID IPv6 address.
238         :param sid_list: SID list.
239         :param mode: Encapsulation / insertion mode.
240         :type node: dict
241         :type bsid: str
242         :type sid_list: list
243         :type mode: str
244         """
245         cmd = u"sr_policy_add"
246         args = dict(
247             bsid_addr=IPv6Address(bsid).packed,
248             weight=1,
249             is_encap=1 if mode == u"encap" else 0,
250             sids=SRv6.create_srv6_sid_list(sid_list)
251         )
252         err_msg = f"Failed to add SR policy for BindingSID {bsid} " \
253             f"on host {node[u'host']}"
254
255         with PapiSocketExecutor(node) as papi_exec:
256             papi_exec.add(cmd, **args).get_reply(err_msg)
257
258     @staticmethod
259     def show_sr_policies(node):
260         """Show SRv6 policies on the given node.
261
262         :param node: Given node to show SRv6 policies on.
263         :type node: dict
264         """
265         cmd = u"sr_policies_dump"
266         err_msg = f"Failed to get SR policies dump on host {node[u'host']}"
267
268         with PapiSocketExecutor(node) as papi_exec:
269             papi_exec.add(cmd).get_details(err_msg)
270
271     @staticmethod
272     def _get_sr_steer_policy_args(
273             node, mode, interface=None, ip_addr=None, prefix=None):
274         """Return values of sw_if_index, mask_width, prefix_addr and
275             traffic_type for sr_steering_add_del API.
276
277         :param node: Given node to create/delete steering policy on.
278         :param mode: Mode of operation - L2 or L3.
279         :param interface: Interface name (Optional, required in case of
280             L2 mode).
281         :param ip_addr: IPv4/IPv6 address (Optional, required in case of L3
282             mode).
283         :param prefix: IP address prefix (Optional, required in case of L3
284             mode).
285         :type node: dict
286         :type mode: str
287         :type interface: str
288         :type ip_addr: str
289         :type prefix: int
290         :returns: Values for sw_if_index, mask_width, prefix_addr and
291             traffic_type
292         :rtype: tuple
293         :raises ValueError: If unsupported mode used or required parameter
294             is missing.
295         """
296         if mode.lower() == u"l2":
297             if interface is None:
298                 raise ValueError(
299                     f"Required data missing.\n"
300                     f"interface: {interface}"
301                 )
302             sw_if_index = InterfaceUtil.get_interface_index(node, interface)
303             mask_width = 0
304             prefix_addr = 16 * b"\0"
305             traffic_type = getattr(
306                 SRv6PolicySteeringTypes, u"SR_STEER_L2"
307             ).value
308         elif mode.lower() == u"l3":
309             if ip_addr is None or prefix is None:
310                 raise ValueError(
311                     f"Required data missing.\n"
312                     f"IP address:{ip_addr}\n"
313                     f"mask:{prefix}"
314                 )
315             sw_if_index = Constants.BITWISE_NON_ZERO
316             ip_addr = ip_address(ip_addr)
317             prefix_addr = ip_addr.packed
318             mask_width = int(prefix)
319             if ip_addr.version == 4:
320                 prefix_addr += 12 * b"\0"
321                 traffic_type = getattr(
322                     SRv6PolicySteeringTypes, u"SR_STEER_IPV4"
323                 ).value
324             else:
325                 traffic_type = getattr(
326                     SRv6PolicySteeringTypes, u"SR_STEER_IPV6"
327                 ).value
328         else:
329             raise ValueError(f"Unsupported mode: {mode}")
330
331         return sw_if_index, mask_width, prefix_addr, traffic_type
332
333     # TODO: Bring L1 names, arguments and defaults closer to PAPI ones.
334     @staticmethod
335     def configure_sr_steer(
336             node, mode, bsid, interface=None, ip_addr=None, prefix=None):
337         """Create SRv6 steering policy on the given node.
338
339         :param node: Given node to create steering policy on.
340         :param mode: Mode of operation - L2 or L3.
341         :param bsid: BindingSID - local SID IPv6 address.
342         :param interface: Interface name (Optional, required in case of
343             L2 mode).
344         :param ip_addr: IPv4/IPv6 address (Optional, required in case of L3
345             mode).
346         :param prefix: IP address prefix (Optional, required in case of L3
347             mode).
348         :type node: dict
349         :type mode: str
350         :type bsid: str
351         :type interface: str
352         :type ip_addr: str
353         :type prefix: int
354         :raises ValueError: If unsupported mode used or required parameter
355             is missing.
356         """
357         sw_if_index, mask_width, prefix_addr, traffic_type = \
358             SRv6._get_sr_steer_policy_args(
359                 node, mode, interface, ip_addr, prefix
360             )
361
362         cmd = u"sr_steering_add_del"
363         args = dict(
364             is_del=0,
365             bsid_addr=IPv6Address(str(bsid)).packed,
366             sr_policy_index=0,
367             table_id=0,
368             prefix_addr=prefix_addr,
369             mask_width=mask_width,
370             sw_if_index=sw_if_index,
371             traffic_type=traffic_type
372         )
373         err_msg = f"Failed to add SRv6 steering policy for BindingSID {bsid} " \
374             f"on host {node[u'host']}"
375
376         with PapiSocketExecutor(node) as papi_exec:
377             papi_exec.add(cmd, **args).get_reply(err_msg)
378
379     @staticmethod
380     def show_sr_steering_policies(node):
381         """Show SRv6 steering policies on the given node.
382
383         :param node: Given node to show SRv6 steering policies on.
384         :type node: dict
385         """
386         cmd = u"sr_steering_pol_dump"
387         err_msg = f"Failed to get SR localSID dump on host {node[u'host']}"
388
389         with PapiSocketExecutor(node) as papi_exec:
390             papi_exec.add(cmd).get_details(err_msg)
391
392     @staticmethod
393     def set_sr_encaps_source_address(node, ip6_addr):
394         """Set SRv6 encapsulation source address on the given node.
395
396         :param node: Given node to set SRv6 encapsulation source address on.
397         :param ip6_addr: Local SID IPv6 address.
398         :type node: dict
399         :type ip6_addr: str
400         """
401         cmd = u"sr_set_encap_source"
402         args = dict(
403             encaps_source=IPv6Address(ip6_addr).packed
404         )
405         err_msg = f"Failed to set SRv6 encapsulation source address " \
406             f"{ip6_addr} on host {node[u'host']}"
407
408         with PapiSocketExecutor(node) as papi_exec:
409             papi_exec.add(cmd, **args).get_reply(err_msg)