feat(crc): bump messages for cycle after 2306
[csit.git] / resources / libraries / python / SRv6.py
1 # Copyright (c) 2023 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """Segment Routing over IPv6 data plane utilities library."""
15
16 from enum import IntEnum
17
18 from ipaddress import ip_address, IPv6Address
19
20 from resources.libraries.python.Constants import Constants
21 from resources.libraries.python.InterfaceUtil import InterfaceUtil
22 from resources.libraries.python.IPAddress import IPAddress
23 from resources.libraries.python.IPUtil import IPUtil
24 from resources.libraries.python.PapiExecutor import PapiSocketExecutor
25
26
27 class SRv6Behavior(IntEnum):
28     """SRv6 LocalSID supported functions."""
29     # Endpoint function
30     END = 1
31     # Endpoint function with Layer-3 cross-connect
32     END_X = 2
33     # Endpoint with decapsulation and Layer-2 cross-connect
34     END_DX2 = 5
35     # Endpoint with decapsulation and IPv6 cross-connect
36     END_DX6 = 6
37     # Endpoint with decapsulation and IPv4 cross-connect
38     END_DX4 = 7
39     # Endpoint with decapsulation and IPv6 table lookup
40     END_DT6 = 8
41     # Endpoint with decapsulation and IPv4 table lookup
42     END_DT4 = 9
43     # Endpoint to SR-unaware appliance via static proxy
44     END_AS = 20
45     # Endpoint to SR-unaware appliance via dynamic proxy
46     END_AD = 21
47     # Endpoint to SR-unaware appliance via masquerading
48     END_AM = 22
49
50
51 class SRv6PolicySteeringTypes(IntEnum):
52     """SRv6 steering types."""
53     SR_STEER_L2 = 2
54     SR_STEER_IPV4 = 4
55     SR_STEER_IPV6 = 6
56
57
58 class SRv6:
59     """SRv6 class."""
60
61     @staticmethod
62     def create_srv6_sid_list(sids):
63         """Create SRv6 SID list object.
64
65         :param sids: SID IPv6 addresses.
66         :type sids: list
67         :returns: SRv6 SID list object.
68         :rtype: dict
69         """
70         sid_list = [IPv6Address(sid).packed for sid in sids]
71
72         return dict(
73             num_sids=len(sid_list),
74             weight=1,
75             sids=sid_list + (16 - len(sid_list)) * [IPv6Address(0).packed]
76         )
77
78     @staticmethod
79     def configure_sr_localsid(
80             node, local_sid, behavior, interface=None, next_hop=None,
81             fib_table=None, out_if=None, in_if=None, src_addr=None,
82             sid_list=None):
83         """Create SRv6 LocalSID and binds it to a particular behaviour on
84         the given node.
85
86         :param node: Given node to create localSID on.
87         :param local_sid: LocalSID IPv6 address.
88         :param behavior: SRv6 LocalSID function.
89         :param interface: Interface name (Optional, required for
90             L2/L3 xconnects).
91         :param next_hop: Next hop IPv4/IPv6 address (Optional, required for L3
92             xconnects).
93         :param fib_table: FIB table for IPv4/IPv6 lookup (Optional, required for
94             L3 routing).
95         :param out_if: Interface name of local interface for sending traffic
96             towards the Service Function (Optional, required for SRv6 endpoint
97             to SR-unaware appliance).
98         :param in_if: Interface name of local interface receiving the traffic
99             coming back from the Service Function (Optional, required for SRv6
100             endpoint to SR-unaware appliance).
101         :param src_addr: Source address on the packets coming back on in_if
102             interface (Optional, required for SRv6 endpoint to SR-unaware
103             appliance via static proxy).
104         :param sid_list: SID list (Optional, required for SRv6 endpoint to
105             SR-unaware appliance via static proxy).
106         :type node: dict
107         :type local_sid: str
108         :type behavior: str
109         :type interface: str
110         :type next_hop: str
111         :type fib_table: str
112         :type out_if: str
113         :type in_if: str
114         :type src_addr: str
115         :type sid_list: list
116         :raises ValueError: If required parameter is missing.
117         """
118         beh = behavior.replace(u".", u"_").upper()
119         # There is no SRv6Behaviour enum defined for functions from SRv6 plugins
120         # so we need to use CLI command to configure it.
121         if beh in (getattr(SRv6Behavior, u"END_AD").name,
122                    getattr(SRv6Behavior, u"END_AS").name,
123                    getattr(SRv6Behavior, u"END_AM").name):
124             if beh == getattr(SRv6Behavior, u"END_AS").name:
125                 if next_hop is None or out_if is None or in_if is None or \
126                         src_addr is None or sid_list is None:
127                     raise ValueError(
128                         f"Required parameter(s) missing.\n"
129                         f"next_hop:{next_hop}\n "
130                         f"out_if:{out_if}\n"
131                         f"in_if:{in_if}\n"
132                         f"src_addr:{src_addr}\n"
133                         f"sid_list:{sid_list}"
134                     )
135                 sid_conf = f"next {u' next '.join(sid_list)}"
136                 params = f"nh {next_hop} oif {out_if} iif {in_if} " \
137                     f"src {src_addr} {sid_conf}"
138             else:
139                 if next_hop is None or out_if is None or in_if is None:
140                     raise ValueError(
141                         f"Required parameter(s) missing.\n"
142                         f"next_hop:{next_hop}\n"
143                         f"out_if:{out_if}\n"
144                         f"in_if:{in_if}"
145                     )
146                 params = f"nh {next_hop} oif {out_if} iif {in_if}"
147
148             cli_cmd = f"sr localsid address {local_sid} behavior {behavior} " \
149                 f"{params}"
150
151             PapiSocketExecutor.run_cli_cmd(node, cli_cmd)
152             return
153
154         cmd = u"sr_localsid_add_del"
155         args = dict(
156             is_del=False,
157             localsid=IPv6Address(local_sid).packed,
158             end_psp=False,
159             behavior=getattr(SRv6Behavior, beh).value,
160             sw_if_index=Constants.BITWISE_NON_ZERO,
161             vlan_index=0,
162             fib_table=0,
163             nh_addr=0
164         )
165         err_msg = f"Failed to add SR localSID {local_sid} " \
166             f"host {node[u'host']}"
167         if beh in (getattr(SRv6Behavior, u"END_X").name,
168                    getattr(SRv6Behavior, u"END_DX4").name,
169                    getattr(SRv6Behavior, u"END_DX6").name):
170             if interface is None or next_hop is None:
171                 raise ValueError(
172                     f"Required parameter(s) missing.\n"
173                     f"interface:{interface}\n"
174                     f"next_hop:{next_hop}"
175                 )
176             args[u"sw_if_index"] = InterfaceUtil.get_interface_index(
177                 node, interface
178             )
179             args[u"nh_addr"] = IPAddress.create_ip_address_object(
180                 ip_address(next_hop)
181             )
182         elif beh == getattr(SRv6Behavior, u"END_DX2").name:
183             if interface is None:
184                 raise ValueError(
185                     f"Required parameter missing.\ninterface: {interface}"
186                 )
187             args[u"sw_if_index"] = InterfaceUtil.get_interface_index(
188                 node, interface
189             )
190         elif beh in (getattr(SRv6Behavior, u"END_DT4").name,
191                      getattr(SRv6Behavior, u"END_DT6").name):
192             if fib_table is None:
193                 raise ValueError(
194                     f"Required parameter missing.\n"
195                     f"fib_table: {fib_table}"
196                 )
197             args[u"fib_table"] = fib_table
198
199         with PapiSocketExecutor(node) as papi_exec:
200             papi_exec.add(cmd, **args).get_reply(err_msg)
201
202     @staticmethod
203     def show_sr_localsids(node):
204         """Show SRv6 LocalSIDs on the given node.
205
206         :param node: Given node to show localSIDs on.
207         :type node: dict
208         """
209         cmd = u"sr_localsids_dump"
210         PapiSocketExecutor.dump_and_log(node, (cmd,))
211
212     @staticmethod
213     def configure_sr_policy(node, bsid, sid_list, mode=u"encap"):
214         """Create SRv6 policy on the given node.
215
216         :param node: Given node to create SRv6 policy on.
217         :param bsid: BindingSID - local SID IPv6 address.
218         :param sid_list: SID list.
219         :param mode: Encapsulation / insertion mode.
220         :type node: dict
221         :type bsid: str
222         :type sid_list: list
223         :type mode: str
224         """
225         # TODO: Convert to use sr_policy_add_v2.
226         # The conversion is not straightforward so it was not done when bumping.
227         cmd = u"sr_policy_add"
228         args = dict(
229             bsid_addr=IPv6Address(bsid).packed,
230             weight=1,
231             is_encap=bool(mode == u"encap"),
232             is_spray=False,
233             sids=SRv6.create_srv6_sid_list(sid_list)
234         )
235         err_msg = f"Failed to add SR policy for BindingSID {bsid} " \
236             f"on host {node[u'host']}"
237
238         with PapiSocketExecutor(node) as papi_exec:
239             papi_exec.add(cmd, **args).get_reply(err_msg)
240
241     @staticmethod
242     def show_sr_policies(node):
243         """Show SRv6 policies on the given node.
244
245         :param node: Given node to show SRv6 policies on.
246         :type node: dict
247         """
248         cmd = u"sr_policies_v2_dump"
249         PapiSocketExecutor.dump_and_log(node, (cmd,))
250
251     @staticmethod
252     def _get_sr_steer_policy_args(
253             node, mode, interface=None, ip_addr=None, prefix=None):
254         """Return values of sw_if_index, mask_width, prefix_addr and
255             traffic_type for sr_steering_add_del API.
256
257         :param node: Given node to create/delete steering policy on.
258         :param mode: Mode of operation - L2 or L3.
259         :param interface: Interface name (Optional, required in case of
260             L2 mode).
261         :param ip_addr: IPv4/IPv6 address (Optional, required in case of L3
262             mode).
263         :param prefix: IP address prefix (Optional, required in case of L3
264             mode).
265         :type node: dict
266         :type mode: str
267         :type interface: str
268         :type ip_addr: str
269         :type prefix: int
270         :returns: Values for sw_if_index, prefix and traffic_type
271         :rtype: tuple
272         :raises ValueError: If unsupported mode used or required parameter
273             is missing.
274         """
275         if mode.lower() == u"l2":
276             if interface is None:
277                 raise ValueError(
278                     f"Required data missing.\n"
279                     f"interface: {interface}"
280                 )
281             sw_if_index = InterfaceUtil.get_interface_index(node, interface)
282             prefix = 0
283             traffic_type = getattr(
284                 SRv6PolicySteeringTypes, u"SR_STEER_L2"
285             ).value
286         elif mode.lower() == u"l3":
287             if ip_addr is None or prefix is None:
288                 raise ValueError(
289                     f"Required data missing.\n"
290                     f"IP address:{ip_addr}\n"
291                     f"mask:{prefix}"
292                 )
293             sw_if_index = Constants.BITWISE_NON_ZERO
294             ip_addr = ip_address(ip_addr)
295             prefix = IPUtil.create_prefix_object(ip_addr, int(prefix))
296             traffic_type = getattr(
297                 SRv6PolicySteeringTypes, u"SR_STEER_IPV4"
298             ).value if ip_addr.version == 4 else getattr(
299                 SRv6PolicySteeringTypes, u"SR_STEER_IPV6"
300             ).value
301         else:
302             raise ValueError(f"Unsupported mode: {mode}")
303
304         return sw_if_index, prefix, traffic_type
305
306     # TODO: Bring L1 names, arguments and defaults closer to PAPI ones.
307     @staticmethod
308     def configure_sr_steer(
309             node, mode, bsid, interface=None, ip_addr=None, prefix=None):
310         """Create SRv6 steering policy on the given node.
311
312         :param node: Given node to create steering policy on.
313         :param mode: Mode of operation - L2 or L3.
314         :param bsid: BindingSID - local SID IPv6 address.
315         :param interface: Interface name (Optional, required in case of
316             L2 mode).
317         :param ip_addr: IPv4/IPv6 address (Optional, required in case of L3
318             mode).
319         :param prefix: IP address prefix (Optional, required in case of L3
320             mode).
321         :type node: dict
322         :type mode: str
323         :type bsid: str
324         :type interface: str
325         :type ip_addr: str
326         :type prefix: int
327         :raises ValueError: If unsupported mode used or required parameter
328             is missing.
329         """
330         sw_if_index, prefix, traffic_type = SRv6._get_sr_steer_policy_args(
331             node, mode, interface, ip_addr, prefix
332         )
333
334         cmd = u"sr_steering_add_del"
335         args = dict(
336             is_del=False,
337             bsid_addr=IPv6Address(str(bsid)).packed,
338             sr_policy_index=0,
339             table_id=0,
340             prefix=prefix,
341             sw_if_index=sw_if_index,
342             traffic_type=traffic_type
343         )
344         err_msg = f"Failed to add SRv6 steering policy for BindingSID {bsid} " \
345             f"on host {node[u'host']}"
346
347         with PapiSocketExecutor(node) as papi_exec:
348             papi_exec.add(cmd, **args).get_reply(err_msg)
349
350     @staticmethod
351     def show_sr_steering_policies(node):
352         """Show SRv6 steering policies on the given node.
353
354         :param node: Given node to show SRv6 steering policies on.
355         :type node: dict
356         """
357         cmd = u"sr_steering_pol_dump"
358         PapiSocketExecutor.dump_and_log(node, (cmd,))
359
360     @staticmethod
361     def set_sr_encaps_source_address(node, ip6_addr):
362         """Set SRv6 encapsulation source address on the given node.
363
364         :param node: Given node to set SRv6 encapsulation source address on.
365         :param ip6_addr: Local SID IPv6 address.
366         :type node: dict
367         :type ip6_addr: str
368         """
369         cmd = u"sr_set_encap_source"
370         args = dict(
371             encaps_source=IPv6Address(ip6_addr).packed
372         )
373         err_msg = f"Failed to set SRv6 encapsulation source address " \
374             f"{ip6_addr} on host {node[u'host']}"
375
376         with PapiSocketExecutor(node) as papi_exec:
377             papi_exec.add(cmd, **args).get_reply(err_msg)