1 # Copyright (c) 2019 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
6 # http://www.apache.org/licenses/LICENSE-2.0
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
14 """Segment Routing over IPv6 data plane utilities library."""
16 from enum import IntEnum
17 from ipaddress import ip_address, IPv6Address
19 from resources.libraries.python.Constants import Constants
20 from resources.libraries.python.InterfaceUtil import InterfaceUtil
21 from resources.libraries.python.PapiExecutor import PapiSocketExecutor
24 class SRv6Behavior(IntEnum):
25 """SRv6 LocalSID supported functions."""
28 # Endpoint function with Layer-3 cross-connect
30 # Endpoint with decapsulation and Layer-2 cross-connect
32 # Endpoint with decapsulation and IPv6 cross-connect
34 # Endpoint with decapsulation and IPv4 cross-connect
36 # Endpoint with decapsulation and IPv6 table lookup
38 # Endpoint with decapsulation and IPv4 table lookup
40 # Endpoint to SR-unaware appliance via static proxy
42 # Endpoint to SR-unaware appliance via dynamic proxy
44 # Endpoint to SR-unaware appliance via masquerading
48 class SRv6PolicySteeringTypes(IntEnum):
49 """SRv6 LocalSID supported functions."""
62 def create_srv6_sid_object(ip_addr):
63 """Create SRv6 SID object.
65 :param ip_addr: IPv6 address.
67 :returns: SRv6 SID object.
70 return dict(addr=IPv6Address(unicode(ip_addr)).packed)
73 def create_srv6_sid_list(sids):
74 """Create SRv6 SID list object.
76 :param sids: SID IPv6 addresses.
78 :returns: SRv6 SID list object.
81 sid_list = list(0 for _ in xrange(16))
82 for i in xrange(len(sids)):
83 sid_list[i] = SRv6.create_srv6_sid_object(sids[i])
84 return dict(num_sids=len(sids), weight=1, sids=sid_list)
87 def configure_sr_localsid(
88 node, local_sid, behavior, interface=None, next_hop=None,
89 fib_table=None, out_if=None, in_if=None, src_addr=None,
91 """Create SRv6 LocalSID and binds it to a particular behaviour on
94 :param node: Given node to create localSID on.
95 :param local_sid: LocalSID IPv6 address.
96 :param behavior: SRv6 LocalSID function.
97 :param interface: Interface name (Optional, required for
99 :param next_hop: Next hop IPv4/IPv6 address (Optional, required for L3
101 :param fib_table: FIB table for IPv4/IPv6 lookup (Optional, required for
103 :param out_if: Interface name of local interface for sending traffic
104 towards the Service Function (Optional, required for SRv6 endpoint
105 to SR-unaware appliance).
106 :param in_if: Interface name of local interface receiving the traffic
107 coming back from the Service Function (Optional, required for SRv6
108 endpoint to SR-unaware appliance).
109 :param src_addr: Source address on the packets coming back on in_if
110 interface (Optional, required for SRv6 endpoint to SR-unaware
111 appliance via static proxy).
112 :param sid_list: SID list (Optional, required for SRv6 endpoint to
113 SR-unaware appliance via static proxy).
124 :raises ValueError: If required parameter is missing.
126 beh = behavior.replace('.', '_').upper()
127 # There is no SRv6Behaviour enum defined for functions from SRv6 plugins
128 # so we need to use CLI command to configure it.
129 if beh in (getattr(SRv6Behavior, 'END_AD').name,
130 getattr(SRv6Behavior, 'END_AS').name,
131 getattr(SRv6Behavior, 'END_AM').name):
132 if beh == getattr(SRv6Behavior, 'END_AS').name:
133 if next_hop is None or out_if is None or in_if is None or \
134 src_addr is None or sid_list is None:
136 'Required parameter(s) missing.\n'
141 'sid_list:{sids}'.format(
142 nh=next_hop, oif=out_if, iif=in_if, saddr=src_addr,
144 sid_conf = 'next ' + ' next '.join(sid_list)
145 params = 'nh {nh} oif {oif} iif {iif} src {saddr} {sids}'.\
146 format(nh=next_hop, oif=out_if, iif=in_if, saddr=src_addr,
149 if next_hop is None or out_if is None or in_if is None:
151 'Required parameter(s) missing.\nnext_hop:{0}\n'
152 'out_if:{1}\nin_if:{2}'.format(next_hop, out_if, in_if))
153 params = 'nh {0} oif {1} iif {2}'.format(
154 next_hop, out_if, in_if)
156 cli_cmd = 'sr localsid address {l_sid} behavior {beh} {params}'.\
157 format(l_sid=local_sid, beh=behavior, params=params)
159 PapiSocketExecutor.run_cli_cmd(node, cli_cmd)
162 cmd = 'sr_localsid_add_del'
165 localsid=SRv6.create_srv6_sid_object(local_sid),
167 behavior=getattr(SRv6Behavior, beh).value,
168 sw_if_index=Constants.BITWISE_NON_ZERO,
174 err_msg = 'Failed to add SR localSID {lsid} on host {host}'.format(
175 lsid=local_sid, host=node['host'])
177 if beh in (getattr(SRv6Behavior, 'END_X').name,
178 getattr(SRv6Behavior, 'END_DX4').name,
179 getattr(SRv6Behavior, 'END_DX6').name):
180 if interface is None or next_hop is None:
181 raise ValueError('Required parameter(s) missing.\n'
184 format(ifc=interface, nh=next_hop))
185 args['sw_if_index'] = InterfaceUtil.get_interface_index(
187 next_hop = ip_address(unicode(next_hop))
188 if next_hop.version == 6:
189 args['nh_addr6'] = next_hop.packed
191 args['nh_addr4'] = next_hop.packed
192 elif beh == getattr(SRv6Behavior, 'END_DX2').name:
193 if interface is None:
194 raise ValueError('Required parameter missing.\ninterface:{ifc}'.
195 format(ifc=interface))
196 args['sw_if_index'] = InterfaceUtil.get_interface_index(
198 elif beh in (getattr(SRv6Behavior, 'END_DT4').name,
199 getattr(SRv6Behavior, 'END_DT6').name):
200 if fib_table is None:
201 raise ValueError('Required parameter missing.\n'
202 'fib_table: {fib}'.format(fib=fib_table))
203 args['fib_table'] = fib_table
205 with PapiSocketExecutor(node) as papi_exec:
206 papi_exec.add(cmd, **args).get_reply(err_msg)
209 def delete_sr_localsid(node, local_sid):
210 """Delete SRv6 LocalSID on the given node.
212 :param node: Given node to delete localSID on.
213 :param local_sid: LocalSID IPv6 address.
217 cmd = 'sr_localsid_add_del'
220 localsid=SRv6.create_srv6_sid_object(local_sid),
221 sw_if_index=Constants.BITWISE_NON_ZERO,
223 err_msg = 'Failed to delete SR localSID {lsid} on host {host}'.format(
224 lsid=local_sid, host=node['host'])
226 with PapiSocketExecutor(node) as papi_exec:
227 papi_exec.add(cmd, **args).get_reply(err_msg)
230 def show_sr_localsids(node):
231 """Show SRv6 LocalSIDs on the given node.
233 :param node: Given node to show localSIDs on.
236 cmd = 'sr_localsids_dump'
237 err_msg = 'Failed to get SR localSID dump on host {host}'.format(
240 with PapiSocketExecutor(node) as papi_exec:
241 papi_exec.add(cmd).get_details(err_msg)
244 def configure_sr_policy(node, bsid, sid_list, mode='encap'):
245 """Create SRv6 policy on the given node.
247 :param node: Given node to create SRv6 policy on.
248 :param bsid: BindingSID - local SID IPv6 address.
249 :param sid_list: SID list.
250 :param mode: Encapsulation / insertion mode.
256 cmd = 'sr_policy_add'
258 bsid_addr=IPv6Address(unicode(bsid)).packed,
260 is_encap=1 if mode == 'encap' else 0,
261 sids=SRv6.create_srv6_sid_list(sid_list)
263 err_msg = 'Failed to add SR policy for BindingSID {bsid} ' \
264 'on host {host}'.format(bsid=bsid, host=node['host'])
266 with PapiSocketExecutor(node) as papi_exec:
267 papi_exec.add(cmd, **args).get_reply(err_msg)
270 def delete_sr_policy(node, bsid):
271 """Delete SRv6 policy on the given node.
273 :param node: Given node to delete SRv6 policy on.
274 :param bsid: BindingSID IPv6 address.
278 cmd = 'sr_policy_del'
279 args = dict(bsid_addr=IPv6Address(unicode(bsid)).packed)
280 err_msg = 'Failed to delete SR policy for BindingSID {bsid} ' \
281 'on host {host}'.format(bsid=bsid, host=node['host'])
283 with PapiSocketExecutor(node) as papi_exec:
284 papi_exec.add(cmd, **args).get_reply(err_msg)
287 def show_sr_policies(node):
288 """Show SRv6 policies on the given node.
290 :param node: Given node to show SRv6 policies on.
293 cmd = 'sr_policies_dump'
294 err_msg = 'Failed to get SR policies dump on host {host}'.format(
297 with PapiSocketExecutor(node) as papi_exec:
298 papi_exec.add(cmd).get_details(err_msg)
301 def _get_sr_steer_policy_args(
302 node, mode, interface=None, ip_addr=None, prefix=None):
303 """Return values of sw_if_index, mask_width, prefix_addr and
304 traffic_type for sr_steering_add_del API.
306 :param node: Given node to create/delete steering policy on.
307 :param mode: Mode of operation - L2 or L3.
308 :param interface: Interface name (Optional, required in case of
310 :param ip_addr: IPv4/IPv6 address (Optional, required in case of L3
312 :param prefix: IP address prefix (Optional, required in case of L3
319 :returns: Values for sw_if_index, mask_width, prefix_addr and
322 :raises ValueError: If unsupported mode used or required parameter
325 if mode.lower() == 'l2':
326 if interface is None:
327 raise ValueError('Required data missing.\ninterface:{ifc}'.
328 format(ifc=interface))
329 sw_if_index = InterfaceUtil.get_interface_index(node, interface)
331 prefix_addr = 16 * b'\x00'
332 traffic_type = getattr(SRv6PolicySteeringTypes, 'SR_STEER_L2').value
333 elif mode.lower() == 'l3':
334 if ip_addr is None or prefix is None:
335 raise ValueError('Required data missing.\nIP address:{0}\n'
336 'mask:{1}'.format(ip_addr, prefix))
337 sw_if_index = Constants.BITWISE_NON_ZERO
338 ip_addr = ip_address(unicode(ip_addr))
339 prefix_addr = ip_addr.packed
340 mask_width = int(prefix)
341 if ip_addr.version == 4:
342 prefix_addr += 12 * b'\x00'
343 traffic_type = getattr(
344 SRv6PolicySteeringTypes, 'SR_STEER_IPV4').value
346 traffic_type = getattr(
347 SRv6PolicySteeringTypes, 'SR_STEER_IPV6').value
349 raise ValueError('Unsupported mode: {0}'.format(mode))
351 return sw_if_index, mask_width, prefix_addr, traffic_type
354 def configure_sr_steer(
355 node, mode, bsid, interface=None, ip_addr=None, prefix=None):
356 """Create SRv6 steering policy on the given node.
358 :param node: Given node to create steering policy on.
359 :param mode: Mode of operation - L2 or L3.
360 :param bsid: BindingSID - local SID IPv6 address.
361 :param interface: Interface name (Optional, required in case of
363 :param ip_addr: IPv4/IPv6 address (Optional, required in case of L3
365 :param prefix: IP address prefix (Optional, required in case of L3
373 :raises ValueError: If unsupported mode used or required parameter
376 sw_if_index, mask_width, prefix_addr, traffic_type = \
377 SRv6._get_sr_steer_policy_args(
378 node, mode, interface, ip_addr, prefix)
380 cmd = 'sr_steering_add_del'
383 bsid_addr=IPv6Address(unicode(bsid)).packed,
386 prefix_addr=prefix_addr,
387 mask_width=mask_width,
388 sw_if_index=sw_if_index,
389 traffic_type=traffic_type
391 err_msg = 'Failed to add SRv6 steering policy for BindingSID {bsid} ' \
392 'on host {host}'.format(bsid=bsid, host=node['host'])
394 with PapiSocketExecutor(node) as papi_exec:
395 papi_exec.add(cmd, **args).get_reply(err_msg)
399 node, mode, bsid, interface=None, ip_addr=None, mask=None):
400 """Delete SRv6 steering policy on the given node.
402 :param node: Given node to delete steering policy on.
403 :param mode: Mode of operation - L2 or L3.
404 :param bsid: BindingSID - local SID IPv6 address.
405 :param interface: Interface name (Optional, required in case of
407 :param ip_addr: IPv4/IPv6 address (Optional, required in case of L3
409 :param mask: IP address mask (Optional, required in case of L3 mode).
416 :raises ValueError: If unsupported mode used or required parameter
419 sw_if_index, mask_width, prefix_addr, traffic_type = \
420 SRv6._get_sr_steer_policy_args(node, mode, interface, ip_addr, mask)
422 cmd = 'sr_steering_add_del'
425 bsid_addr=IPv6Address(unicode(bsid)).packed,
428 prefix_addr=prefix_addr,
429 mask_width=mask_width,
430 sw_if_index=sw_if_index,
431 traffic_type=traffic_type
433 err_msg = 'Failed to delete SRv6 steering policy for BindingSID ' \
434 '{bsid} on host {host}'.format(bsid=bsid, host=node['host'])
436 with PapiSocketExecutor(node) as papi_exec:
437 papi_exec.add(cmd, **args).get_reply(err_msg)
440 def show_sr_steering_policies(node):
441 """Show SRv6 steering policies on the given node.
443 :param node: Given node to show SRv6 steering policies on.
446 cmd = 'sr_steering_pol_dump'
447 err_msg = 'Failed to get SR localSID dump on host {host}'.format(
450 with PapiSocketExecutor(node) as papi_exec:
451 papi_exec.add(cmd).get_details(err_msg)
454 def set_sr_encaps_source_address(node, ip6_addr):
455 """Set SRv6 encapsulation source address on the given node.
457 :param node: Given node to set SRv6 encapsulation source address on.
458 :param ip6_addr: Local SID IPv6 address.
462 cmd = 'sr_set_encap_source'
463 args = dict(encaps_source=IPv6Address(unicode(ip6_addr)).packed)
464 err_msg = 'Failed to set SRv6 encapsulation source address {addr} ' \
465 'on host {host}'.format(addr=ip6_addr, host=node['host'])
467 with PapiSocketExecutor(node) as papi_exec:
468 papi_exec.add(cmd, **args).get_reply(err_msg)