CSIT-1597 API cleanup: srv6
[csit.git] / resources / libraries / python / SRv6.py
1 # Copyright (c) 2020 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """Segment Routing over IPv6 data plane utilities library."""
15
16 from enum import IntEnum
17
18 from ipaddress import ip_address, IPv6Address
19
20 from resources.libraries.python.Constants import Constants
21 from resources.libraries.python.InterfaceUtil import InterfaceUtil
22 from resources.libraries.python.IPAddress import IPAddress
23 from resources.libraries.python.IPUtil import IPUtil
24 from resources.libraries.python.PapiExecutor import PapiSocketExecutor
25
26
27 class SRv6Behavior(IntEnum):
28     """SRv6 LocalSID supported functions."""
29     # Endpoint function
30     END = 1
31     # Endpoint function with Layer-3 cross-connect
32     END_X = 2
33     # Endpoint with decapsulation and Layer-2 cross-connect
34     END_DX2 = 5
35     # Endpoint with decapsulation and IPv6 cross-connect
36     END_DX6 = 6
37     # Endpoint with decapsulation and IPv4 cross-connect
38     END_DX4 = 7
39     # Endpoint with decapsulation and IPv6 table lookup
40     END_DT6 = 8
41     # Endpoint with decapsulation and IPv4 table lookup
42     END_DT4 = 9
43     # Endpoint to SR-unaware appliance via static proxy
44     END_AS = 20
45     # Endpoint to SR-unaware appliance via dynamic proxy
46     END_AD = 21
47     # Endpoint to SR-unaware appliance via masquerading
48     END_AM = 22
49
50
51 class SRv6PolicySteeringTypes(IntEnum):
52     """SRv6 steering types."""
53     SR_STEER_L2 = 2
54     SR_STEER_IPV4 = 4
55     SR_STEER_IPV6 = 6
56
57
58 class SRv6:
59     """SRv6 class."""
60
61     @staticmethod
62     def create_srv6_sid_list(sids):
63         """Create SRv6 SID list object.
64
65         :param sids: SID IPv6 addresses.
66         :type sids: list
67         :returns: SRv6 SID list object.
68         :rtype: dict
69         """
70         sid_list = [IPv6Address(sid).packed for sid in sids]
71
72         return dict(
73             num_sids=len(sid_list),
74             weight=1,
75             sids=sid_list + (16 - len(sid_list)) * [IPv6Address(0).packed]
76         )
77
78     @staticmethod
79     def configure_sr_localsid(
80             node, local_sid, behavior, interface=None, next_hop=None,
81             fib_table=None, out_if=None, in_if=None, src_addr=None,
82             sid_list=None):
83         """Create SRv6 LocalSID and binds it to a particular behaviour on
84         the given node.
85
86         :param node: Given node to create localSID on.
87         :param local_sid: LocalSID IPv6 address.
88         :param behavior: SRv6 LocalSID function.
89         :param interface: Interface name (Optional, required for
90             L2/L3 xconnects).
91         :param next_hop: Next hop IPv4/IPv6 address (Optional, required for L3
92             xconnects).
93         :param fib_table: FIB table for IPv4/IPv6 lookup (Optional, required for
94             L3 routing).
95         :param out_if: Interface name of local interface for sending traffic
96             towards the Service Function (Optional, required for SRv6 endpoint
97             to SR-unaware appliance).
98         :param in_if: Interface name of local interface receiving the traffic
99             coming back from the Service Function (Optional, required for SRv6
100             endpoint to SR-unaware appliance).
101         :param src_addr: Source address on the packets coming back on in_if
102             interface (Optional, required for SRv6 endpoint to SR-unaware
103             appliance via static proxy).
104         :param sid_list: SID list (Optional, required for SRv6 endpoint to
105             SR-unaware appliance via static proxy).
106         :type node: dict
107         :type local_sid: str
108         :type behavior: str
109         :type interface: str
110         :type next_hop: str
111         :type fib_table: str
112         :type out_if: str
113         :type in_if: str
114         :type src_addr: str
115         :type sid_list: list
116         :raises ValueError: If required parameter is missing.
117         """
118         beh = behavior.replace(u".", u"_").upper()
119         # There is no SRv6Behaviour enum defined for functions from SRv6 plugins
120         # so we need to use CLI command to configure it.
121         if beh in (getattr(SRv6Behavior, u"END_AD").name,
122                    getattr(SRv6Behavior, u"END_AS").name,
123                    getattr(SRv6Behavior, u"END_AM").name):
124             if beh == getattr(SRv6Behavior, u"END_AS").name:
125                 if next_hop is None or out_if is None or in_if is None or \
126                         src_addr is None or sid_list is None:
127                     raise ValueError(
128                         f"Required parameter(s) missing.\n"
129                         f"next_hop:{next_hop}\n "
130                         f"out_if:{out_if}\n"
131                         f"in_if:{in_if}\n"
132                         f"src_addr:{src_addr}\n"
133                         f"sid_list:{sid_list}"
134                     )
135                 sid_conf = f"next {u' next '.join(sid_list)}"
136                 params = f"nh {next_hop} oif {out_if} iif {in_if} " \
137                     f"src {src_addr} {sid_conf}"
138             else:
139                 if next_hop is None or out_if is None or in_if is None:
140                     raise ValueError(
141                         f"Required parameter(s) missing.\n"
142                         f"next_hop:{next_hop}\n"
143                         f"out_if:{out_if}\n"
144                         f"in_if:{in_if}"
145                     )
146                 params = f"nh {next_hop} oif {out_if} iif {in_if}"
147
148             cli_cmd = f"sr localsid address {local_sid} behavior {behavior} " \
149                 f"{params}"
150
151             PapiSocketExecutor.run_cli_cmd(node, cli_cmd)
152             return
153
154         cmd = u"sr_localsid_add_del"
155         args = dict(
156             is_del=False,
157             localsid=IPv6Address(local_sid).packed,
158             end_psp=False,
159             behavior=getattr(SRv6Behavior, beh).value,
160             sw_if_index=Constants.BITWISE_NON_ZERO,
161             vlan_index=0,
162             fib_table=0,
163             nh_addr=0
164         )
165         err_msg = f"Failed to add SR localSID {local_sid} " \
166             f"host {node[u'host']}"
167         if beh in (getattr(SRv6Behavior, u"END_X").name,
168                    getattr(SRv6Behavior, u"END_DX4").name,
169                    getattr(SRv6Behavior, u"END_DX6").name):
170             if interface is None or next_hop is None:
171                 raise ValueError(
172                     f"Required parameter(s) missing.\n"
173                     f"interface:{interface}\n"
174                     f"next_hop:{next_hop}"
175                 )
176             args[u"sw_if_index"] = InterfaceUtil.get_interface_index(
177                 node, interface
178             )
179             args[u"nh_addr"] = IPAddress.create_ip_address_object(
180                 ip_address(next_hop)
181             )
182         elif beh == getattr(SRv6Behavior, u"END_DX2").name:
183             if interface is None:
184                 raise ValueError(
185                     f"Required parameter missing.\ninterface: {interface}"
186                 )
187             args[u"sw_if_index"] = InterfaceUtil.get_interface_index(
188                 node, interface
189             )
190         elif beh in (getattr(SRv6Behavior, u"END_DT4").name,
191                      getattr(SRv6Behavior, u"END_DT6").name):
192             if fib_table is None:
193                 raise ValueError(
194                     f"Required parameter missing.\n"
195                     f"fib_table: {fib_table}"
196                 )
197             args[u"fib_table"] = fib_table
198
199         with PapiSocketExecutor(node) as papi_exec:
200             papi_exec.add(cmd, **args).get_reply(err_msg)
201
202     @staticmethod
203     def show_sr_localsids(node):
204         """Show SRv6 LocalSIDs on the given node.
205
206         :param node: Given node to show localSIDs on.
207         :type node: dict
208         """
209         cmd = u"sr_localsids_dump"
210         err_msg = f"Failed to get SR localSID dump on host {node[u'host']}"
211
212         with PapiSocketExecutor(node) as papi_exec:
213             papi_exec.add(cmd).get_details(err_msg)
214
215     @staticmethod
216     def configure_sr_policy(node, bsid, sid_list, mode=u"encap"):
217         """Create SRv6 policy on the given node.
218
219         :param node: Given node to create SRv6 policy on.
220         :param bsid: BindingSID - local SID IPv6 address.
221         :param sid_list: SID list.
222         :param mode: Encapsulation / insertion mode.
223         :type node: dict
224         :type bsid: str
225         :type sid_list: list
226         :type mode: str
227         """
228         cmd = u"sr_policy_add"
229         args = dict(
230             bsid_addr=IPv6Address(bsid).packed,
231             weight=1,
232             is_encap=bool(mode == u"encap"),
233             is_spray=False,
234             sids=SRv6.create_srv6_sid_list(sid_list)
235         )
236         err_msg = f"Failed to add SR policy for BindingSID {bsid} " \
237             f"on host {node[u'host']}"
238
239         with PapiSocketExecutor(node) as papi_exec:
240             papi_exec.add(cmd, **args).get_reply(err_msg)
241
242     @staticmethod
243     def show_sr_policies(node):
244         """Show SRv6 policies on the given node.
245
246         :param node: Given node to show SRv6 policies on.
247         :type node: dict
248         """
249         cmd = u"sr_policies_dump"
250         err_msg = f"Failed to get SR policies dump on host {node[u'host']}"
251
252         with PapiSocketExecutor(node) as papi_exec:
253             papi_exec.add(cmd).get_details(err_msg)
254
255     @staticmethod
256     def _get_sr_steer_policy_args(
257             node, mode, interface=None, ip_addr=None, prefix=None):
258         """Return values of sw_if_index, mask_width, prefix_addr and
259             traffic_type for sr_steering_add_del API.
260
261         :param node: Given node to create/delete steering policy on.
262         :param mode: Mode of operation - L2 or L3.
263         :param interface: Interface name (Optional, required in case of
264             L2 mode).
265         :param ip_addr: IPv4/IPv6 address (Optional, required in case of L3
266             mode).
267         :param prefix: IP address prefix (Optional, required in case of L3
268             mode).
269         :type node: dict
270         :type mode: str
271         :type interface: str
272         :type ip_addr: str
273         :type prefix: int
274         :returns: Values for sw_if_index, prefix and traffic_type
275         :rtype: tuple
276         :raises ValueError: If unsupported mode used or required parameter
277             is missing.
278         """
279         if mode.lower() == u"l2":
280             if interface is None:
281                 raise ValueError(
282                     f"Required data missing.\n"
283                     f"interface: {interface}"
284                 )
285             sw_if_index = InterfaceUtil.get_interface_index(node, interface)
286             prefix = 0
287             traffic_type = getattr(
288                 SRv6PolicySteeringTypes, u"SR_STEER_L2"
289             ).value
290         elif mode.lower() == u"l3":
291             if ip_addr is None or prefix is None:
292                 raise ValueError(
293                     f"Required data missing.\n"
294                     f"IP address:{ip_addr}\n"
295                     f"mask:{prefix}"
296                 )
297             sw_if_index = Constants.BITWISE_NON_ZERO
298             ip_addr = ip_address(ip_addr)
299             prefix = IPUtil.create_prefix_object(ip_addr, int(prefix))
300             traffic_type = getattr(
301                     SRv6PolicySteeringTypes, u"SR_STEER_IPV4"
302                 ).value if ip_addr.version == 4 else getattr(
303                     SRv6PolicySteeringTypes, u"SR_STEER_IPV6"
304                 ).value
305         else:
306             raise ValueError(f"Unsupported mode: {mode}")
307
308         return sw_if_index, prefix, traffic_type
309
310     # TODO: Bring L1 names, arguments and defaults closer to PAPI ones.
311     @staticmethod
312     def configure_sr_steer(
313             node, mode, bsid, interface=None, ip_addr=None, prefix=None):
314         """Create SRv6 steering policy on the given node.
315
316         :param node: Given node to create steering policy on.
317         :param mode: Mode of operation - L2 or L3.
318         :param bsid: BindingSID - local SID IPv6 address.
319         :param interface: Interface name (Optional, required in case of
320             L2 mode).
321         :param ip_addr: IPv4/IPv6 address (Optional, required in case of L3
322             mode).
323         :param prefix: IP address prefix (Optional, required in case of L3
324             mode).
325         :type node: dict
326         :type mode: str
327         :type bsid: str
328         :type interface: str
329         :type ip_addr: str
330         :type prefix: int
331         :raises ValueError: If unsupported mode used or required parameter
332             is missing.
333         """
334         sw_if_index, prefix, traffic_type = SRv6._get_sr_steer_policy_args(
335                 node, mode, interface, ip_addr, prefix
336             )
337
338         cmd = u"sr_steering_add_del"
339         args = dict(
340             is_del=False,
341             bsid_addr=IPv6Address(str(bsid)).packed,
342             sr_policy_index=0,
343             table_id=0,
344             prefix=prefix,
345             sw_if_index=sw_if_index,
346             traffic_type=traffic_type
347         )
348         err_msg = f"Failed to add SRv6 steering policy for BindingSID {bsid} " \
349             f"on host {node[u'host']}"
350
351         with PapiSocketExecutor(node) as papi_exec:
352             papi_exec.add(cmd, **args).get_reply(err_msg)
353
354     @staticmethod
355     def show_sr_steering_policies(node):
356         """Show SRv6 steering policies on the given node.
357
358         :param node: Given node to show SRv6 steering policies on.
359         :type node: dict
360         """
361         cmd = u"sr_steering_pol_dump"
362         err_msg = f"Failed to get SR localSID dump on host {node[u'host']}"
363
364         with PapiSocketExecutor(node) as papi_exec:
365             papi_exec.add(cmd).get_details(err_msg)
366
367     @staticmethod
368     def set_sr_encaps_source_address(node, ip6_addr):
369         """Set SRv6 encapsulation source address on the given node.
370
371         :param node: Given node to set SRv6 encapsulation source address on.
372         :param ip6_addr: Local SID IPv6 address.
373         :type node: dict
374         :type ip6_addr: str
375         """
376         cmd = u"sr_set_encap_source"
377         args = dict(
378             encaps_source=IPv6Address(ip6_addr).packed
379         )
380         err_msg = f"Failed to set SRv6 encapsulation source address " \
381             f"{ip6_addr} on host {node[u'host']}"
382
383         with PapiSocketExecutor(node) as papi_exec:
384             papi_exec.add(cmd, **args).get_reply(err_msg)