1 # Copyright (c) 2019 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
6 # http://www.apache.org/licenses/LICENSE-2.0
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
14 """Segment Routing over IPv6 data plane utilities library."""
16 from enum import IntEnum
18 from ipaddress import ip_address, IPv6Address
20 from resources.libraries.python.Constants import Constants
21 from resources.libraries.python.InterfaceUtil import InterfaceUtil
22 from resources.libraries.python.PapiExecutor import PapiSocketExecutor
25 class SRv6Behavior(IntEnum):
26 """SRv6 LocalSID supported functions."""
29 # Endpoint function with Layer-3 cross-connect
31 # Endpoint with decapsulation and Layer-2 cross-connect
33 # Endpoint with decapsulation and IPv6 cross-connect
35 # Endpoint with decapsulation and IPv4 cross-connect
37 # Endpoint with decapsulation and IPv6 table lookup
39 # Endpoint with decapsulation and IPv4 table lookup
41 # Endpoint to SR-unaware appliance via static proxy
43 # Endpoint to SR-unaware appliance via dynamic proxy
45 # Endpoint to SR-unaware appliance via masquerading
49 class SRv6PolicySteeringTypes(IntEnum):
50 """SRv6 LocalSID supported functions."""
60 def create_srv6_sid_object(ip_addr):
61 """Create SRv6 SID object.
63 :param ip_addr: IPv6 address.
65 :returns: SRv6 SID object.
69 addr=IPv6Address(ip_addr).packed
73 def create_srv6_sid_list(sids):
74 """Create SRv6 SID list object.
76 :param sids: SID IPv6 addresses.
78 :returns: SRv6 SID list object.
83 for index, item in enumerate(sids):
84 sid_list[index] = SRv6.create_srv6_sid_object(item)
93 def configure_sr_localsid(
94 node, local_sid, behavior, interface=None, next_hop=None,
95 fib_table=None, out_if=None, in_if=None, src_addr=None,
97 """Create SRv6 LocalSID and binds it to a particular behaviour on
100 :param node: Given node to create localSID on.
101 :param local_sid: LocalSID IPv6 address.
102 :param behavior: SRv6 LocalSID function.
103 :param interface: Interface name (Optional, required for
105 :param next_hop: Next hop IPv4/IPv6 address (Optional, required for L3
107 :param fib_table: FIB table for IPv4/IPv6 lookup (Optional, required for
109 :param out_if: Interface name of local interface for sending traffic
110 towards the Service Function (Optional, required for SRv6 endpoint
111 to SR-unaware appliance).
112 :param in_if: Interface name of local interface receiving the traffic
113 coming back from the Service Function (Optional, required for SRv6
114 endpoint to SR-unaware appliance).
115 :param src_addr: Source address on the packets coming back on in_if
116 interface (Optional, required for SRv6 endpoint to SR-unaware
117 appliance via static proxy).
118 :param sid_list: SID list (Optional, required for SRv6 endpoint to
119 SR-unaware appliance via static proxy).
130 :raises ValueError: If required parameter is missing.
132 beh = behavior.replace(u".", u"_").upper()
133 # There is no SRv6Behaviour enum defined for functions from SRv6 plugins
134 # so we need to use CLI command to configure it.
135 if beh in (getattr(SRv6Behavior, u"END_AD").name,
136 getattr(SRv6Behavior, u"END_AS").name,
137 getattr(SRv6Behavior, u"END_AM").name):
138 if beh == getattr(SRv6Behavior, u"END_AS").name:
139 if next_hop is None or out_if is None or in_if is None or \
140 src_addr is None or sid_list is None:
142 f"Required parameter(s) missing.\n"
143 f"next_hop:{next_hop}\n "
146 f"src_addr:{src_addr}\n"
147 f"sid_list:{sid_list}"
149 sid_conf = f"next {u' next '.join(sid_list)}"
150 params = f"nh {next_hop} oif {out_if} iif {in_if} " \
151 f"src {src_addr} {sid_conf}"
153 if next_hop is None or out_if is None or in_if is None:
155 f"Required parameter(s) missing.\n"
156 f"next_hop:{next_hop}\n"
160 params = f"nh {next_hop} oif {out_if} iif {in_if}"
162 cli_cmd = f"sr localsid address {local_sid} behavior {behavior}" \
165 PapiSocketExecutor.run_cli_cmd(node, cli_cmd)
167 cmd = u"sr_localsid_add_del"
170 localsid=SRv6.create_srv6_sid_object(local_sid),
172 behavior=getattr(SRv6Behavior, beh).value,
173 sw_if_index=Constants.BITWISE_NON_ZERO,
179 err_msg = f"Failed to add SR localSID {local_sid} " \
180 f"host {node[u'host']}"
181 if beh in (getattr(SRv6Behavior, u"END_X").name,
182 getattr(SRv6Behavior, u"END_DX4").name,
183 getattr(SRv6Behavior, u"END_DX6").name):
184 if interface is None or next_hop is None:
186 f"Required parameter(s) missing.\n"
187 f"interface:{interface}\n"
188 f"next_hop:{next_hop}"
190 args[u"sw_if_index"] = InterfaceUtil.get_interface_index(
193 next_hop = ip_address(next_hop)
194 if next_hop.version == 6:
195 args[u"nh_addr6"] = next_hop.packed
197 args[u"nh_addr4"] = next_hop.packed
198 elif beh == getattr(SRv6Behavior, u"END_DX2").name:
199 if interface is None:
201 f"Required parameter missing.\ninterface: {interface}"
203 args[u"sw_if_index"] = InterfaceUtil.get_interface_index(
206 elif beh in (getattr(SRv6Behavior, u"END_DT4").name,
207 getattr(SRv6Behavior, u"END_DT6").name):
208 if fib_table is None:
210 f"Required parameter missing.\n"
211 f"fib_table: {fib_table}"
213 args[u"fib_table"] = fib_table
215 with PapiSocketExecutor(node) as papi_exec:
216 papi_exec.add(cmd, **args).get_reply(err_msg)
219 def show_sr_localsids(node):
220 """Show SRv6 LocalSIDs on the given node.
222 :param node: Given node to show localSIDs on.
225 cmd = u"sr_localsids_dump"
226 err_msg = f"Failed to get SR localSID dump on host {node[u'host']}"
228 with PapiSocketExecutor(node) as papi_exec:
229 papi_exec.add(cmd).get_details(err_msg)
232 def configure_sr_policy(node, bsid, sid_list, mode=u"encap"):
233 """Create SRv6 policy on the given node.
235 :param node: Given node to create SRv6 policy on.
236 :param bsid: BindingSID - local SID IPv6 address.
237 :param sid_list: SID list.
238 :param mode: Encapsulation / insertion mode.
244 cmd = u"sr_policy_add"
246 bsid_addr=IPv6Address(bsid).packed,
248 is_encap=1 if mode == u"encap" else 0,
249 sids=SRv6.create_srv6_sid_list(sid_list)
251 err_msg = f"Failed to add SR policy for BindingSID {bsid} " \
252 f"on host {node[u'host']}"
254 with PapiSocketExecutor(node) as papi_exec:
255 papi_exec.add(cmd, **args).get_reply(err_msg)
258 def show_sr_policies(node):
259 """Show SRv6 policies on the given node.
261 :param node: Given node to show SRv6 policies on.
264 cmd = u"sr_policies_dump"
265 err_msg = f"Failed to get SR policies dump on host {node[u'host']}"
267 with PapiSocketExecutor(node) as papi_exec:
268 papi_exec.add(cmd).get_details(err_msg)
271 def _get_sr_steer_policy_args(
272 node, mode, interface=None, ip_addr=None, prefix=None):
273 """Return values of sw_if_index, mask_width, prefix_addr and
274 traffic_type for sr_steering_add_del API.
276 :param node: Given node to create/delete steering policy on.
277 :param mode: Mode of operation - L2 or L3.
278 :param interface: Interface name (Optional, required in case of
280 :param ip_addr: IPv4/IPv6 address (Optional, required in case of L3
282 :param prefix: IP address prefix (Optional, required in case of L3
289 :returns: Values for sw_if_index, mask_width, prefix_addr and
292 :raises ValueError: If unsupported mode used or required parameter
295 if mode.lower() == u"l2":
296 if interface is None:
298 f"Required data missing.\n"
299 f"interface: {interface}"
301 sw_if_index = InterfaceUtil.get_interface_index(node, interface)
303 prefix_addr = 16 * b"\0"
304 traffic_type = getattr(
305 SRv6PolicySteeringTypes, u"SR_STEER_L2"
307 elif mode.lower() == u"l3":
308 if ip_addr is None or prefix is None:
310 f"Required data missing.\n"
311 f"IP address:{ip_addr}\n"
314 sw_if_index = Constants.BITWISE_NON_ZERO
315 ip_addr = ip_address(ip_addr)
316 prefix_addr = ip_addr.packed
317 mask_width = int(prefix)
318 if ip_addr.version == 4:
319 prefix_addr += 12 * b"\0"
320 traffic_type = getattr(
321 SRv6PolicySteeringTypes, u"SR_STEER_IPV4"
324 traffic_type = getattr(
325 SRv6PolicySteeringTypes, u"SR_STEER_IPV6"
328 raise ValueError(f"Unsupported mode: {mode}")
330 return sw_if_index, mask_width, prefix_addr, traffic_type
332 # TODO: Bring L1 names, arguments and defaults closer to PAPI ones.
334 def configure_sr_steer(
335 node, mode, bsid, interface=None, ip_addr=None, prefix=None):
336 """Create SRv6 steering policy on the given node.
338 :param node: Given node to create steering policy on.
339 :param mode: Mode of operation - L2 or L3.
340 :param bsid: BindingSID - local SID IPv6 address.
341 :param interface: Interface name (Optional, required in case of
343 :param ip_addr: IPv4/IPv6 address (Optional, required in case of L3
345 :param prefix: IP address prefix (Optional, required in case of L3
353 :raises ValueError: If unsupported mode used or required parameter
356 sw_if_index, mask_width, prefix_addr, traffic_type = \
357 SRv6._get_sr_steer_policy_args(
358 node, mode, interface, ip_addr, prefix
361 cmd = u"sr_steering_add_del"
364 bsid_addr=IPv6Address(str(bsid)).packed,
367 prefix_addr=prefix_addr,
368 mask_width=mask_width,
369 sw_if_index=sw_if_index,
370 traffic_type=traffic_type
372 err_msg = f"Failed to add SRv6 steering policy for BindingSID {bsid} " \
373 f"on host {node[u'host']}"
375 with PapiSocketExecutor(node) as papi_exec:
376 papi_exec.add(cmd, **args).get_reply(err_msg)
379 def show_sr_steering_policies(node):
380 """Show SRv6 steering policies on the given node.
382 :param node: Given node to show SRv6 steering policies on.
385 cmd = u"sr_steering_pol_dump"
386 err_msg = f"Failed to get SR localSID dump on host {node[u'host']}"
388 with PapiSocketExecutor(node) as papi_exec:
389 papi_exec.add(cmd).get_details(err_msg)
392 def set_sr_encaps_source_address(node, ip6_addr):
393 """Set SRv6 encapsulation source address on the given node.
395 :param node: Given node to set SRv6 encapsulation source address on.
396 :param ip6_addr: Local SID IPv6 address.
400 cmd = u"sr_set_encap_source"
402 encaps_source=IPv6Address(ip6_addr).packed
404 err_msg = f"Failed to set SRv6 encapsulation source address " \
405 f"{ip6_addr} on host {node[u'host']}"
407 with PapiSocketExecutor(node) as papi_exec:
408 papi_exec.add(cmd, **args).get_reply(err_msg)