fix(vlan): do not apply strip offload
[csit.git] / resources / libraries / python / SRv6.py
1 # Copyright (c) 2021 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """Segment Routing over IPv6 data plane utilities library."""
15
16 from enum import IntEnum
17
18 from ipaddress import ip_address, IPv6Address
19
20 from resources.libraries.python.Constants import Constants
21 from resources.libraries.python.InterfaceUtil import InterfaceUtil
22 from resources.libraries.python.IPAddress import IPAddress
23 from resources.libraries.python.IPUtil import IPUtil
24 from resources.libraries.python.PapiExecutor import PapiSocketExecutor
25
26
27 class SRv6Behavior(IntEnum):
28     """SRv6 LocalSID supported functions."""
29     # Endpoint function
30     END = 1
31     # Endpoint function with Layer-3 cross-connect
32     END_X = 2
33     # Endpoint with decapsulation and Layer-2 cross-connect
34     END_DX2 = 5
35     # Endpoint with decapsulation and IPv6 cross-connect
36     END_DX6 = 6
37     # Endpoint with decapsulation and IPv4 cross-connect
38     END_DX4 = 7
39     # Endpoint with decapsulation and IPv6 table lookup
40     END_DT6 = 8
41     # Endpoint with decapsulation and IPv4 table lookup
42     END_DT4 = 9
43     # Endpoint to SR-unaware appliance via static proxy
44     END_AS = 20
45     # Endpoint to SR-unaware appliance via dynamic proxy
46     END_AD = 21
47     # Endpoint to SR-unaware appliance via masquerading
48     END_AM = 22
49
50
51 class SRv6PolicySteeringTypes(IntEnum):
52     """SRv6 steering types."""
53     SR_STEER_L2 = 2
54     SR_STEER_IPV4 = 4
55     SR_STEER_IPV6 = 6
56
57
58 class SRv6:
59     """SRv6 class."""
60
61     @staticmethod
62     def create_srv6_sid_list(sids):
63         """Create SRv6 SID list object.
64
65         :param sids: SID IPv6 addresses.
66         :type sids: list
67         :returns: SRv6 SID list object.
68         :rtype: dict
69         """
70         sid_list = [IPv6Address(sid).packed for sid in sids]
71
72         return dict(
73             num_sids=len(sid_list),
74             weight=1,
75             sids=sid_list + (16 - len(sid_list)) * [IPv6Address(0).packed]
76         )
77
78     @staticmethod
79     def configure_sr_localsid(
80             node, local_sid, behavior, interface=None, next_hop=None,
81             fib_table=None, out_if=None, in_if=None, src_addr=None,
82             sid_list=None):
83         """Create SRv6 LocalSID and binds it to a particular behaviour on
84         the given node.
85
86         :param node: Given node to create localSID on.
87         :param local_sid: LocalSID IPv6 address.
88         :param behavior: SRv6 LocalSID function.
89         :param interface: Interface name (Optional, required for
90             L2/L3 xconnects).
91         :param next_hop: Next hop IPv4/IPv6 address (Optional, required for L3
92             xconnects).
93         :param fib_table: FIB table for IPv4/IPv6 lookup (Optional, required for
94             L3 routing).
95         :param out_if: Interface name of local interface for sending traffic
96             towards the Service Function (Optional, required for SRv6 endpoint
97             to SR-unaware appliance).
98         :param in_if: Interface name of local interface receiving the traffic
99             coming back from the Service Function (Optional, required for SRv6
100             endpoint to SR-unaware appliance).
101         :param src_addr: Source address on the packets coming back on in_if
102             interface (Optional, required for SRv6 endpoint to SR-unaware
103             appliance via static proxy).
104         :param sid_list: SID list (Optional, required for SRv6 endpoint to
105             SR-unaware appliance via static proxy).
106         :type node: dict
107         :type local_sid: str
108         :type behavior: str
109         :type interface: str
110         :type next_hop: str
111         :type fib_table: str
112         :type out_if: str
113         :type in_if: str
114         :type src_addr: str
115         :type sid_list: list
116         :raises ValueError: If required parameter is missing.
117         """
118         beh = behavior.replace(u".", u"_").upper()
119         # There is no SRv6Behaviour enum defined for functions from SRv6 plugins
120         # so we need to use CLI command to configure it.
121         if beh in (getattr(SRv6Behavior, u"END_AD").name,
122                    getattr(SRv6Behavior, u"END_AS").name,
123                    getattr(SRv6Behavior, u"END_AM").name):
124             if beh == getattr(SRv6Behavior, u"END_AS").name:
125                 if next_hop is None or out_if is None or in_if is None or \
126                         src_addr is None or sid_list is None:
127                     raise ValueError(
128                         f"Required parameter(s) missing.\n"
129                         f"next_hop:{next_hop}\n "
130                         f"out_if:{out_if}\n"
131                         f"in_if:{in_if}\n"
132                         f"src_addr:{src_addr}\n"
133                         f"sid_list:{sid_list}"
134                     )
135                 sid_conf = f"next {u' next '.join(sid_list)}"
136                 params = f"nh {next_hop} oif {out_if} iif {in_if} " \
137                     f"src {src_addr} {sid_conf}"
138             else:
139                 if next_hop is None or out_if is None or in_if is None:
140                     raise ValueError(
141                         f"Required parameter(s) missing.\n"
142                         f"next_hop:{next_hop}\n"
143                         f"out_if:{out_if}\n"
144                         f"in_if:{in_if}"
145                     )
146                 params = f"nh {next_hop} oif {out_if} iif {in_if}"
147
148             cli_cmd = f"sr localsid address {local_sid} behavior {behavior} " \
149                 f"{params}"
150
151             PapiSocketExecutor.run_cli_cmd(node, cli_cmd)
152             return
153
154         cmd = u"sr_localsid_add_del"
155         args = dict(
156             is_del=False,
157             localsid=IPv6Address(local_sid).packed,
158             end_psp=False,
159             behavior=getattr(SRv6Behavior, beh).value,
160             sw_if_index=Constants.BITWISE_NON_ZERO,
161             vlan_index=0,
162             fib_table=0,
163             nh_addr=0
164         )
165         err_msg = f"Failed to add SR localSID {local_sid} " \
166             f"host {node[u'host']}"
167         if beh in (getattr(SRv6Behavior, u"END_X").name,
168                    getattr(SRv6Behavior, u"END_DX4").name,
169                    getattr(SRv6Behavior, u"END_DX6").name):
170             if interface is None or next_hop is None:
171                 raise ValueError(
172                     f"Required parameter(s) missing.\n"
173                     f"interface:{interface}\n"
174                     f"next_hop:{next_hop}"
175                 )
176             args[u"sw_if_index"] = InterfaceUtil.get_interface_index(
177                 node, interface
178             )
179             args[u"nh_addr"] = IPAddress.create_ip_address_object(
180                 ip_address(next_hop)
181             )
182         elif beh == getattr(SRv6Behavior, u"END_DX2").name:
183             if interface is None:
184                 raise ValueError(
185                     f"Required parameter missing.\ninterface: {interface}"
186                 )
187             args[u"sw_if_index"] = InterfaceUtil.get_interface_index(
188                 node, interface
189             )
190         elif beh in (getattr(SRv6Behavior, u"END_DT4").name,
191                      getattr(SRv6Behavior, u"END_DT6").name):
192             if fib_table is None:
193                 raise ValueError(
194                     f"Required parameter missing.\n"
195                     f"fib_table: {fib_table}"
196                 )
197             args[u"fib_table"] = fib_table
198
199         with PapiSocketExecutor(node) as papi_exec:
200             papi_exec.add(cmd, **args).get_reply(err_msg)
201
202     @staticmethod
203     def show_sr_localsids(node):
204         """Show SRv6 LocalSIDs on the given node.
205
206         :param node: Given node to show localSIDs on.
207         :type node: dict
208         """
209         cmd = u"sr_localsids_dump"
210         PapiSocketExecutor.dump_and_log(node, (cmd,))
211
212     @staticmethod
213     def configure_sr_policy(node, bsid, sid_list, mode=u"encap"):
214         """Create SRv6 policy on the given node.
215
216         :param node: Given node to create SRv6 policy on.
217         :param bsid: BindingSID - local SID IPv6 address.
218         :param sid_list: SID list.
219         :param mode: Encapsulation / insertion mode.
220         :type node: dict
221         :type bsid: str
222         :type sid_list: list
223         :type mode: str
224         """
225         cmd = u"sr_policy_add"
226         args = dict(
227             bsid_addr=IPv6Address(bsid).packed,
228             weight=1,
229             is_encap=bool(mode == u"encap"),
230             is_spray=False,
231             sids=SRv6.create_srv6_sid_list(sid_list)
232         )
233         err_msg = f"Failed to add SR policy for BindingSID {bsid} " \
234             f"on host {node[u'host']}"
235
236         with PapiSocketExecutor(node) as papi_exec:
237             papi_exec.add(cmd, **args).get_reply(err_msg)
238
239     @staticmethod
240     def show_sr_policies(node):
241         """Show SRv6 policies on the given node.
242
243         :param node: Given node to show SRv6 policies on.
244         :type node: dict
245         """
246         cmd = u"sr_policies_dump"
247         PapiSocketExecutor.dump_and_log(node, (cmd,))
248
249     @staticmethod
250     def _get_sr_steer_policy_args(
251             node, mode, interface=None, ip_addr=None, prefix=None):
252         """Return values of sw_if_index, mask_width, prefix_addr and
253             traffic_type for sr_steering_add_del API.
254
255         :param node: Given node to create/delete steering policy on.
256         :param mode: Mode of operation - L2 or L3.
257         :param interface: Interface name (Optional, required in case of
258             L2 mode).
259         :param ip_addr: IPv4/IPv6 address (Optional, required in case of L3
260             mode).
261         :param prefix: IP address prefix (Optional, required in case of L3
262             mode).
263         :type node: dict
264         :type mode: str
265         :type interface: str
266         :type ip_addr: str
267         :type prefix: int
268         :returns: Values for sw_if_index, prefix and traffic_type
269         :rtype: tuple
270         :raises ValueError: If unsupported mode used or required parameter
271             is missing.
272         """
273         if mode.lower() == u"l2":
274             if interface is None:
275                 raise ValueError(
276                     f"Required data missing.\n"
277                     f"interface: {interface}"
278                 )
279             sw_if_index = InterfaceUtil.get_interface_index(node, interface)
280             prefix = 0
281             traffic_type = getattr(
282                 SRv6PolicySteeringTypes, u"SR_STEER_L2"
283             ).value
284         elif mode.lower() == u"l3":
285             if ip_addr is None or prefix is None:
286                 raise ValueError(
287                     f"Required data missing.\n"
288                     f"IP address:{ip_addr}\n"
289                     f"mask:{prefix}"
290                 )
291             sw_if_index = Constants.BITWISE_NON_ZERO
292             ip_addr = ip_address(ip_addr)
293             prefix = IPUtil.create_prefix_object(ip_addr, int(prefix))
294             traffic_type = getattr(
295                 SRv6PolicySteeringTypes, u"SR_STEER_IPV4"
296             ).value if ip_addr.version == 4 else getattr(
297                 SRv6PolicySteeringTypes, u"SR_STEER_IPV6"
298             ).value
299         else:
300             raise ValueError(f"Unsupported mode: {mode}")
301
302         return sw_if_index, prefix, traffic_type
303
304     # TODO: Bring L1 names, arguments and defaults closer to PAPI ones.
305     @staticmethod
306     def configure_sr_steer(
307             node, mode, bsid, interface=None, ip_addr=None, prefix=None):
308         """Create SRv6 steering policy on the given node.
309
310         :param node: Given node to create steering policy on.
311         :param mode: Mode of operation - L2 or L3.
312         :param bsid: BindingSID - local SID IPv6 address.
313         :param interface: Interface name (Optional, required in case of
314             L2 mode).
315         :param ip_addr: IPv4/IPv6 address (Optional, required in case of L3
316             mode).
317         :param prefix: IP address prefix (Optional, required in case of L3
318             mode).
319         :type node: dict
320         :type mode: str
321         :type bsid: str
322         :type interface: str
323         :type ip_addr: str
324         :type prefix: int
325         :raises ValueError: If unsupported mode used or required parameter
326             is missing.
327         """
328         sw_if_index, prefix, traffic_type = SRv6._get_sr_steer_policy_args(
329             node, mode, interface, ip_addr, prefix
330         )
331
332         cmd = u"sr_steering_add_del"
333         args = dict(
334             is_del=False,
335             bsid_addr=IPv6Address(str(bsid)).packed,
336             sr_policy_index=0,
337             table_id=0,
338             prefix=prefix,
339             sw_if_index=sw_if_index,
340             traffic_type=traffic_type
341         )
342         err_msg = f"Failed to add SRv6 steering policy for BindingSID {bsid} " \
343             f"on host {node[u'host']}"
344
345         with PapiSocketExecutor(node) as papi_exec:
346             papi_exec.add(cmd, **args).get_reply(err_msg)
347
348     @staticmethod
349     def show_sr_steering_policies(node):
350         """Show SRv6 steering policies on the given node.
351
352         :param node: Given node to show SRv6 steering policies on.
353         :type node: dict
354         """
355         cmd = u"sr_steering_pol_dump"
356         PapiSocketExecutor.dump_and_log(node, (cmd,))
357
358     @staticmethod
359     def set_sr_encaps_source_address(node, ip6_addr):
360         """Set SRv6 encapsulation source address on the given node.
361
362         :param node: Given node to set SRv6 encapsulation source address on.
363         :param ip6_addr: Local SID IPv6 address.
364         :type node: dict
365         :type ip6_addr: str
366         """
367         cmd = u"sr_set_encap_source"
368         args = dict(
369             encaps_source=IPv6Address(ip6_addr).packed
370         )
371         err_msg = f"Failed to set SRv6 encapsulation source address " \
372             f"{ip6_addr} on host {node[u'host']}"
373
374         with PapiSocketExecutor(node) as papi_exec:
375             papi_exec.add(cmd, **args).get_reply(err_msg)