feat(jobspec): Unify soak jobspecs
[csit.git] / resources / libraries / python / SRv6.py
1 # Copyright (c) 2024 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """Segment Routing over IPv6 data plane utilities library."""
15
16 from enum import IntEnum
17
18 from ipaddress import ip_address, IPv6Address
19
20 from resources.libraries.python.Constants import Constants
21 from resources.libraries.python.InterfaceUtil import InterfaceUtil
22 from resources.libraries.python.IPAddress import IPAddress
23 from resources.libraries.python.IPUtil import IPUtil
24 from resources.libraries.python.PapiExecutor import PapiSocketExecutor
25
26
27 class SRv6Behavior(IntEnum):
28     """SRv6 LocalSID supported functions."""
29     # Endpoint function
30     END = 1
31     # Endpoint function with Layer-3 cross-connect
32     END_X = 2
33     # Endpoint with decapsulation and Layer-2 cross-connect
34     END_DX2 = 5
35     # Endpoint with decapsulation and IPv6 cross-connect
36     END_DX6 = 6
37     # Endpoint with decapsulation and IPv4 cross-connect
38     END_DX4 = 7
39     # Endpoint with decapsulation and IPv6 table lookup
40     END_DT6 = 8
41     # Endpoint with decapsulation and IPv4 table lookup
42     END_DT4 = 9
43     # Endpoint to SR-unaware appliance via static proxy
44     END_AS = 20
45     # Endpoint to SR-unaware appliance via dynamic proxy
46     END_AD = 21
47     # Endpoint to SR-unaware appliance via masquerading
48     END_AM = 22
49
50
51 class SRv6PolicySteeringTypes(IntEnum):
52     """SRv6 steering types."""
53     SR_STEER_L2 = 2
54     SR_STEER_IPV4 = 4
55     SR_STEER_IPV6 = 6
56
57
58 class SRv6:
59     """SRv6 class."""
60
61     @staticmethod
62     def create_srv6_sid_list(sids):
63         """Create SRv6 SID list object.
64
65         :param sids: SID IPv6 addresses.
66         :type sids: list
67         :returns: SRv6 SID list object.
68         :rtype: dict
69         """
70         sid_list = [IPv6Address(sid).packed for sid in sids]
71
72         return dict(
73             num_sids=len(sid_list),
74             weight=1,
75             sids=sid_list + (16 - len(sid_list)) * [IPv6Address(0).packed]
76         )
77
78     @staticmethod
79     def configure_sr_localsid(
80             node, local_sid, behavior, interface=None, next_hop=None,
81             fib_table=None, out_if=None, in_if=None, src_addr=None,
82             sid_list=None):
83         """Create SRv6 LocalSID and binds it to a particular behaviour on
84         the given node.
85
86         :param node: Given node to create localSID on.
87         :param local_sid: LocalSID IPv6 address.
88         :param behavior: SRv6 LocalSID function.
89         :param interface: Interface name (Optional, required for
90             L2/L3 xconnects).
91         :param next_hop: Next hop IPv4/IPv6 address (Optional, required for L3
92             xconnects).
93         :param fib_table: FIB table for IPv4/IPv6 lookup (Optional, required for
94             L3 routing).
95         :param out_if: Interface name of local interface for sending traffic
96             towards the Service Function (Optional, required for SRv6 endpoint
97             to SR-unaware appliance).
98         :param in_if: Interface name of local interface receiving the traffic
99             coming back from the Service Function (Optional, required for SRv6
100             endpoint to SR-unaware appliance).
101         :param src_addr: Source address on the packets coming back on in_if
102             interface (Optional, required for SRv6 endpoint to SR-unaware
103             appliance via static proxy).
104         :param sid_list: SID list (Optional, required for SRv6 endpoint to
105             SR-unaware appliance via static proxy).
106         :type node: dict
107         :type local_sid: str
108         :type behavior: str
109         :type interface: str
110         :type next_hop: str
111         :type fib_table: str
112         :type out_if: str
113         :type in_if: str
114         :type src_addr: str
115         :type sid_list: list
116         :raises ValueError: If required parameter is missing.
117         """
118         beh = behavior.replace(u".", u"_").upper()
119         # There is no SRv6Behaviour enum defined for functions from SRv6 plugins
120         # so we need to use CLI command to configure it.
121         if beh in (getattr(SRv6Behavior, u"END_AD").name,
122                    getattr(SRv6Behavior, u"END_AS").name,
123                    getattr(SRv6Behavior, u"END_AM").name):
124             if beh == getattr(SRv6Behavior, u"END_AS").name:
125                 if next_hop is None or out_if is None or in_if is None or \
126                         src_addr is None or sid_list is None:
127                     raise ValueError(
128                         f"Required parameter(s) missing.\n"
129                         f"next_hop:{next_hop}\n "
130                         f"out_if:{out_if}\n"
131                         f"in_if:{in_if}\n"
132                         f"src_addr:{src_addr}\n"
133                         f"sid_list:{sid_list}"
134                     )
135                 sid_conf = f"next {u' next '.join(sid_list)}"
136                 params = f"nh {next_hop} oif {out_if} iif {in_if} " \
137                     f"src {src_addr} {sid_conf}"
138             else:
139                 if next_hop is None or out_if is None or in_if is None:
140                     raise ValueError(
141                         f"Required parameter(s) missing.\n"
142                         f"next_hop:{next_hop}\n"
143                         f"out_if:{out_if}\n"
144                         f"in_if:{in_if}"
145                     )
146                 params = f"nh {next_hop} oif {out_if} iif {in_if}"
147
148             cli_cmd = f"sr localsid address {local_sid} behavior {behavior} " \
149                 f"{params}"
150
151             PapiSocketExecutor.run_cli_cmd(node, cli_cmd)
152             return
153
154         cmd = u"sr_localsid_add_del"
155         args = dict(
156             is_del=False,
157             localsid=IPv6Address(local_sid).packed,
158             end_psp=False,
159             behavior=getattr(SRv6Behavior, beh).value,
160             sw_if_index=Constants.BITWISE_NON_ZERO,
161             vlan_index=0,
162             fib_table=0,
163             nh_addr=0
164         )
165         err_msg = f"Failed to add SR localSID {local_sid} " \
166             f"host {node[u'host']}"
167         if beh in (getattr(SRv6Behavior, u"END_X").name,
168                    getattr(SRv6Behavior, u"END_DX4").name,
169                    getattr(SRv6Behavior, u"END_DX6").name):
170             if interface is None or next_hop is None:
171                 raise ValueError(
172                     f"Required parameter(s) missing.\n"
173                     f"interface:{interface}\n"
174                     f"next_hop:{next_hop}"
175                 )
176             args[u"sw_if_index"] = InterfaceUtil.get_interface_index(
177                 node, interface
178             )
179             args[u"nh_addr"] = IPAddress.create_ip_address_object(
180                 ip_address(next_hop)
181             )
182         elif beh == getattr(SRv6Behavior, u"END_DX2").name:
183             if interface is None:
184                 raise ValueError(
185                     f"Required parameter missing.\ninterface: {interface}"
186                 )
187             args[u"sw_if_index"] = InterfaceUtil.get_interface_index(
188                 node, interface
189             )
190         elif beh in (getattr(SRv6Behavior, u"END_DT4").name,
191                      getattr(SRv6Behavior, u"END_DT6").name):
192             if fib_table is None:
193                 raise ValueError(
194                     f"Required parameter missing.\n"
195                     f"fib_table: {fib_table}"
196                 )
197             args[u"fib_table"] = fib_table
198
199         with PapiSocketExecutor(node) as papi_exec:
200             papi_exec.add(cmd, **args).get_reply(err_msg)
201
202     @staticmethod
203     def show_sr_localsids(node):
204         """Show SRv6 LocalSIDs on the given node.
205
206         :param node: Given node to show localSIDs on.
207         :type node: dict
208         """
209         cmd = u"sr_localsids_dump"
210         PapiSocketExecutor.dump_and_log(node, (cmd,))
211
212     @staticmethod
213     def configure_sr_policy(node, bsid, sid_list, mode=u"encap"):
214         """Create SRv6 policy on the given node.
215
216         :param node: Given node to create SRv6 policy on.
217         :param bsid: BindingSID - local SID IPv6 address.
218         :param sid_list: SID list.
219         :param mode: Encapsulation / insertion mode.
220         :type node: dict
221         :type bsid: str
222         :type sid_list: list
223         :type mode: str
224         """
225         cmd = u"sr_policy_add_v2"
226         args = dict(
227             bsid_addr=IPv6Address(bsid).packed,
228             weight=1,
229             is_encap=bool(mode == u"encap"),
230             type=0,  # Neither SPRAY nor TEF are needed yet.
231             sids=SRv6.create_srv6_sid_list(sid_list),
232             # encap_src is optional, do not set yet.
233         )
234         err_msg = f"Failed to add SR policy for BindingSID {bsid} " \
235             f"on host {node[u'host']}"
236
237         with PapiSocketExecutor(node) as papi_exec:
238             papi_exec.add(cmd, **args).get_reply(err_msg)
239
240     @staticmethod
241     def show_sr_policies(node):
242         """Show SRv6 policies on the given node.
243
244         :param node: Given node to show SRv6 policies on.
245         :type node: dict
246         """
247         cmd = u"sr_policies_v2_dump"
248         PapiSocketExecutor.dump_and_log(node, (cmd,))
249
250     @staticmethod
251     def _get_sr_steer_policy_args(
252             node, mode, interface=None, ip_addr=None, prefix=None):
253         """Return values of sw_if_index, mask_width, prefix_addr and
254             traffic_type for sr_steering_add_del API.
255
256         :param node: Given node to create/delete steering policy on.
257         :param mode: Mode of operation - L2 or L3.
258         :param interface: Interface name (Optional, required in case of
259             L2 mode).
260         :param ip_addr: IPv4/IPv6 address (Optional, required in case of L3
261             mode).
262         :param prefix: IP address prefix (Optional, required in case of L3
263             mode).
264         :type node: dict
265         :type mode: str
266         :type interface: str
267         :type ip_addr: str
268         :type prefix: int
269         :returns: Values for sw_if_index, prefix and traffic_type
270         :rtype: tuple
271         :raises ValueError: If unsupported mode used or required parameter
272             is missing.
273         """
274         if mode.lower() == u"l2":
275             if interface is None:
276                 raise ValueError(
277                     f"Required data missing.\n"
278                     f"interface: {interface}"
279                 )
280             sw_if_index = InterfaceUtil.get_interface_index(node, interface)
281             prefix = 0
282             traffic_type = getattr(
283                 SRv6PolicySteeringTypes, u"SR_STEER_L2"
284             ).value
285         elif mode.lower() == u"l3":
286             if ip_addr is None or prefix is None:
287                 raise ValueError(
288                     f"Required data missing.\n"
289                     f"IP address:{ip_addr}\n"
290                     f"mask:{prefix}"
291                 )
292             sw_if_index = Constants.BITWISE_NON_ZERO
293             ip_addr = ip_address(ip_addr)
294             prefix = IPUtil.create_prefix_object(ip_addr, int(prefix))
295             traffic_type = getattr(
296                 SRv6PolicySteeringTypes, u"SR_STEER_IPV4"
297             ).value if ip_addr.version == 4 else getattr(
298                 SRv6PolicySteeringTypes, u"SR_STEER_IPV6"
299             ).value
300         else:
301             raise ValueError(f"Unsupported mode: {mode}")
302
303         return sw_if_index, prefix, traffic_type
304
305     # TODO: Bring L1 names, arguments and defaults closer to PAPI ones.
306     @staticmethod
307     def configure_sr_steer(
308             node, mode, bsid, interface=None, ip_addr=None, prefix=None):
309         """Create SRv6 steering policy on the given node.
310
311         :param node: Given node to create steering policy on.
312         :param mode: Mode of operation - L2 or L3.
313         :param bsid: BindingSID - local SID IPv6 address.
314         :param interface: Interface name (Optional, required in case of
315             L2 mode).
316         :param ip_addr: IPv4/IPv6 address (Optional, required in case of L3
317             mode).
318         :param prefix: IP address prefix (Optional, required in case of L3
319             mode).
320         :type node: dict
321         :type mode: str
322         :type bsid: str
323         :type interface: str
324         :type ip_addr: str
325         :type prefix: int
326         :raises ValueError: If unsupported mode used or required parameter
327             is missing.
328         """
329         sw_if_index, prefix, traffic_type = SRv6._get_sr_steer_policy_args(
330             node, mode, interface, ip_addr, prefix
331         )
332
333         cmd = u"sr_steering_add_del"
334         args = dict(
335             is_del=False,
336             bsid_addr=IPv6Address(str(bsid)).packed,
337             sr_policy_index=0,
338             table_id=0,
339             prefix=prefix,
340             sw_if_index=sw_if_index,
341             traffic_type=traffic_type
342         )
343         err_msg = f"Failed to add SRv6 steering policy for BindingSID {bsid} " \
344             f"on host {node[u'host']}"
345
346         with PapiSocketExecutor(node) as papi_exec:
347             papi_exec.add(cmd, **args).get_reply(err_msg)
348
349     @staticmethod
350     def show_sr_steering_policies(node):
351         """Show SRv6 steering policies on the given node.
352
353         :param node: Given node to show SRv6 steering policies on.
354         :type node: dict
355         """
356         cmd = u"sr_steering_pol_dump"
357         PapiSocketExecutor.dump_and_log(node, (cmd,))
358
359     @staticmethod
360     def set_sr_encaps_source_address(node, ip6_addr):
361         """Set SRv6 encapsulation source address on the given node.
362
363         :param node: Given node to set SRv6 encapsulation source address on.
364         :param ip6_addr: Local SID IPv6 address.
365         :type node: dict
366         :type ip6_addr: str
367         """
368         cmd = u"sr_set_encap_source"
369         args = dict(
370             encaps_source=IPv6Address(ip6_addr).packed
371         )
372         err_msg = f"Failed to set SRv6 encapsulation source address " \
373             f"{ip6_addr} on host {node[u'host']}"
374
375         with PapiSocketExecutor(node) as papi_exec:
376             papi_exec.add(cmd, **args).get_reply(err_msg)