1 # Copyright (c) 2019 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
6 # http://www.apache.org/licenses/LICENSE-2.0
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
14 """Segment Routing over IPv6 data plane utilities library."""
16 from enum import IntEnum
18 from ipaddress import ip_address, IPv6Address
20 from resources.libraries.python.Constants import Constants
21 from resources.libraries.python.InterfaceUtil import InterfaceUtil
22 from resources.libraries.python.PapiExecutor import PapiSocketExecutor
25 class SRv6Behavior(IntEnum):
26 """SRv6 LocalSID supported functions."""
29 # Endpoint function with Layer-3 cross-connect
31 # Endpoint with decapsulation and Layer-2 cross-connect
33 # Endpoint with decapsulation and IPv6 cross-connect
35 # Endpoint with decapsulation and IPv4 cross-connect
37 # Endpoint with decapsulation and IPv6 table lookup
39 # Endpoint with decapsulation and IPv4 table lookup
41 # Endpoint to SR-unaware appliance via static proxy
43 # Endpoint to SR-unaware appliance via dynamic proxy
45 # Endpoint to SR-unaware appliance via masquerading
49 class SRv6PolicySteeringTypes(IntEnum):
50 """SRv6 LocalSID supported functions."""
60 def create_srv6_sid_object(ip_addr):
61 """Create SRv6 SID object.
63 :param ip_addr: IPv6 address.
65 :returns: SRv6 SID object.
69 addr=IPv6Address(ip_addr).packed
73 def create_srv6_sid_list(sids):
74 """Create SRv6 SID list object.
76 :param sids: SID IPv6 addresses.
78 :returns: SRv6 SID list object.
83 for index, item in enumerate(sids):
84 sid_list[index] = SRv6.create_srv6_sid_object(item)
93 def configure_sr_localsid(
94 node, local_sid, behavior, interface=None, next_hop=None,
95 fib_table=None, out_if=None, in_if=None, src_addr=None,
97 """Create SRv6 LocalSID and binds it to a particular behaviour on
100 :param node: Given node to create localSID on.
101 :param local_sid: LocalSID IPv6 address.
102 :param behavior: SRv6 LocalSID function.
103 :param interface: Interface name (Optional, required for
105 :param next_hop: Next hop IPv4/IPv6 address (Optional, required for L3
107 :param fib_table: FIB table for IPv4/IPv6 lookup (Optional, required for
109 :param out_if: Interface name of local interface for sending traffic
110 towards the Service Function (Optional, required for SRv6 endpoint
111 to SR-unaware appliance).
112 :param in_if: Interface name of local interface receiving the traffic
113 coming back from the Service Function (Optional, required for SRv6
114 endpoint to SR-unaware appliance).
115 :param src_addr: Source address on the packets coming back on in_if
116 interface (Optional, required for SRv6 endpoint to SR-unaware
117 appliance via static proxy).
118 :param sid_list: SID list (Optional, required for SRv6 endpoint to
119 SR-unaware appliance via static proxy).
130 :raises ValueError: If required parameter is missing.
132 beh = behavior.replace(u".", u"_").upper()
133 # There is no SRv6Behaviour enum defined for functions from SRv6 plugins
134 # so we need to use CLI command to configure it.
135 if beh in (getattr(SRv6Behavior, u"END_AD").name,
136 getattr(SRv6Behavior, u"END_AS").name,
137 getattr(SRv6Behavior, u"END_AM").name):
138 if beh == getattr(SRv6Behavior, u"END_AS").name:
139 if next_hop is None or out_if is None or in_if is None or \
140 src_addr is None or sid_list is None:
142 f"Required parameter(s) missing.\n"
143 f"next_hop:{next_hop}\n "
146 f"src_addr:{src_addr}\n"
147 f"sid_list:{sid_list}"
149 sid_conf = f"next {u' next '.join(sid_list)}"
150 params = f"nh {next_hop} oif {out_if} iif {in_if} " \
151 f"src {src_addr} {sid_conf}"
153 if next_hop is None or out_if is None or in_if is None:
155 f"Required parameter(s) missing.\n"
156 f"next_hop:{next_hop}\n"
160 params = f"nh {next_hop} oif {out_if} iif {in_if}"
162 cli_cmd = f"sr localsid address {local_sid} behavior {behavior} " \
165 PapiSocketExecutor.run_cli_cmd(node, cli_cmd)
168 cmd = u"sr_localsid_add_del"
171 localsid=SRv6.create_srv6_sid_object(local_sid),
173 behavior=getattr(SRv6Behavior, beh).value,
174 sw_if_index=Constants.BITWISE_NON_ZERO,
180 err_msg = f"Failed to add SR localSID {local_sid} " \
181 f"host {node[u'host']}"
182 if beh in (getattr(SRv6Behavior, u"END_X").name,
183 getattr(SRv6Behavior, u"END_DX4").name,
184 getattr(SRv6Behavior, u"END_DX6").name):
185 if interface is None or next_hop is None:
187 f"Required parameter(s) missing.\n"
188 f"interface:{interface}\n"
189 f"next_hop:{next_hop}"
191 args[u"sw_if_index"] = InterfaceUtil.get_interface_index(
194 next_hop = ip_address(next_hop)
195 if next_hop.version == 6:
196 args[u"nh_addr6"] = next_hop.packed
198 args[u"nh_addr4"] = next_hop.packed
199 elif beh == getattr(SRv6Behavior, u"END_DX2").name:
200 if interface is None:
202 f"Required parameter missing.\ninterface: {interface}"
204 args[u"sw_if_index"] = InterfaceUtil.get_interface_index(
207 elif beh in (getattr(SRv6Behavior, u"END_DT4").name,
208 getattr(SRv6Behavior, u"END_DT6").name):
209 if fib_table is None:
211 f"Required parameter missing.\n"
212 f"fib_table: {fib_table}"
214 args[u"fib_table"] = fib_table
216 with PapiSocketExecutor(node) as papi_exec:
217 papi_exec.add(cmd, **args).get_reply(err_msg)
220 def show_sr_localsids(node):
221 """Show SRv6 LocalSIDs on the given node.
223 :param node: Given node to show localSIDs on.
226 cmd = u"sr_localsids_dump"
227 err_msg = f"Failed to get SR localSID dump on host {node[u'host']}"
229 with PapiSocketExecutor(node) as papi_exec:
230 papi_exec.add(cmd).get_details(err_msg)
233 def configure_sr_policy(node, bsid, sid_list, mode=u"encap"):
234 """Create SRv6 policy on the given node.
236 :param node: Given node to create SRv6 policy on.
237 :param bsid: BindingSID - local SID IPv6 address.
238 :param sid_list: SID list.
239 :param mode: Encapsulation / insertion mode.
245 cmd = u"sr_policy_add"
247 bsid_addr=IPv6Address(bsid).packed,
249 is_encap=1 if mode == u"encap" else 0,
250 sids=SRv6.create_srv6_sid_list(sid_list)
252 err_msg = f"Failed to add SR policy for BindingSID {bsid} " \
253 f"on host {node[u'host']}"
255 with PapiSocketExecutor(node) as papi_exec:
256 papi_exec.add(cmd, **args).get_reply(err_msg)
259 def show_sr_policies(node):
260 """Show SRv6 policies on the given node.
262 :param node: Given node to show SRv6 policies on.
265 cmd = u"sr_policies_dump"
266 err_msg = f"Failed to get SR policies dump on host {node[u'host']}"
268 with PapiSocketExecutor(node) as papi_exec:
269 papi_exec.add(cmd).get_details(err_msg)
272 def _get_sr_steer_policy_args(
273 node, mode, interface=None, ip_addr=None, prefix=None):
274 """Return values of sw_if_index, mask_width, prefix_addr and
275 traffic_type for sr_steering_add_del API.
277 :param node: Given node to create/delete steering policy on.
278 :param mode: Mode of operation - L2 or L3.
279 :param interface: Interface name (Optional, required in case of
281 :param ip_addr: IPv4/IPv6 address (Optional, required in case of L3
283 :param prefix: IP address prefix (Optional, required in case of L3
290 :returns: Values for sw_if_index, mask_width, prefix_addr and
293 :raises ValueError: If unsupported mode used or required parameter
296 if mode.lower() == u"l2":
297 if interface is None:
299 f"Required data missing.\n"
300 f"interface: {interface}"
302 sw_if_index = InterfaceUtil.get_interface_index(node, interface)
304 prefix_addr = 16 * b"\0"
305 traffic_type = getattr(
306 SRv6PolicySteeringTypes, u"SR_STEER_L2"
308 elif mode.lower() == u"l3":
309 if ip_addr is None or prefix is None:
311 f"Required data missing.\n"
312 f"IP address:{ip_addr}\n"
315 sw_if_index = Constants.BITWISE_NON_ZERO
316 ip_addr = ip_address(ip_addr)
317 prefix_addr = ip_addr.packed
318 mask_width = int(prefix)
319 if ip_addr.version == 4:
320 prefix_addr += 12 * b"\0"
321 traffic_type = getattr(
322 SRv6PolicySteeringTypes, u"SR_STEER_IPV4"
325 traffic_type = getattr(
326 SRv6PolicySteeringTypes, u"SR_STEER_IPV6"
329 raise ValueError(f"Unsupported mode: {mode}")
331 return sw_if_index, mask_width, prefix_addr, traffic_type
333 # TODO: Bring L1 names, arguments and defaults closer to PAPI ones.
335 def configure_sr_steer(
336 node, mode, bsid, interface=None, ip_addr=None, prefix=None):
337 """Create SRv6 steering policy on the given node.
339 :param node: Given node to create steering policy on.
340 :param mode: Mode of operation - L2 or L3.
341 :param bsid: BindingSID - local SID IPv6 address.
342 :param interface: Interface name (Optional, required in case of
344 :param ip_addr: IPv4/IPv6 address (Optional, required in case of L3
346 :param prefix: IP address prefix (Optional, required in case of L3
354 :raises ValueError: If unsupported mode used or required parameter
357 sw_if_index, mask_width, prefix_addr, traffic_type = \
358 SRv6._get_sr_steer_policy_args(
359 node, mode, interface, ip_addr, prefix
362 cmd = u"sr_steering_add_del"
365 bsid_addr=IPv6Address(str(bsid)).packed,
368 prefix_addr=prefix_addr,
369 mask_width=mask_width,
370 sw_if_index=sw_if_index,
371 traffic_type=traffic_type
373 err_msg = f"Failed to add SRv6 steering policy for BindingSID {bsid} " \
374 f"on host {node[u'host']}"
376 with PapiSocketExecutor(node) as papi_exec:
377 papi_exec.add(cmd, **args).get_reply(err_msg)
380 def show_sr_steering_policies(node):
381 """Show SRv6 steering policies on the given node.
383 :param node: Given node to show SRv6 steering policies on.
386 cmd = u"sr_steering_pol_dump"
387 err_msg = f"Failed to get SR localSID dump on host {node[u'host']}"
389 with PapiSocketExecutor(node) as papi_exec:
390 papi_exec.add(cmd).get_details(err_msg)
393 def set_sr_encaps_source_address(node, ip6_addr):
394 """Set SRv6 encapsulation source address on the given node.
396 :param node: Given node to set SRv6 encapsulation source address on.
397 :param ip6_addr: Local SID IPv6 address.
401 cmd = u"sr_set_encap_source"
403 encaps_source=IPv6Address(ip6_addr).packed
405 err_msg = f"Failed to set SRv6 encapsulation source address " \
406 f"{ip6_addr} on host {node[u'host']}"
408 with PapiSocketExecutor(node) as papi_exec:
409 papi_exec.add(cmd, **args).get_reply(err_msg)