1 # Copyright (c) 2019 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
6 # http://www.apache.org/licenses/LICENSE-2.0
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
14 """Segment Routing over IPv6 data plane utilities library."""
16 from enum import IntEnum
17 from ipaddress import ip_address, IPv6Address
19 from resources.libraries.python.Constants import Constants
20 from resources.libraries.python.InterfaceUtil import InterfaceUtil
21 from resources.libraries.python.PapiExecutor import PapiSocketExecutor
24 class SRv6Behavior(IntEnum):
25 """SRv6 LocalSID supported functions."""
28 # Endpoint function with Layer-3 cross-connect
30 # Endpoint with decapsulation and Layer-2 cross-connect
32 # Endpoint with decapsulation and IPv6 cross-connect
34 # Endpoint with decapsulation and IPv4 cross-connect
36 # Endpoint with decapsulation and IPv6 table lookup
38 # Endpoint with decapsulation and IPv4 table lookup
40 # Endpoint to SR-unaware appliance via static proxy
42 # Endpoint to SR-unaware appliance via dynamic proxy
44 # Endpoint to SR-unaware appliance via masquerading
48 class SRv6PolicySteeringTypes(IntEnum):
49 """SRv6 LocalSID supported functions."""
62 def create_srv6_sid_object(ip_addr):
63 """Create SRv6 SID object.
65 :param ip_addr: IPv6 address.
67 :returns: SRv6 SID object.
70 return dict(addr=IPv6Address(unicode(ip_addr)).packed)
73 def create_srv6_sid_list(sids):
74 """Create SRv6 SID list object.
76 :param sids: SID IPv6 addresses.
78 :returns: SRv6 SID list object.
81 sid_list = list(0 for _ in xrange(16))
82 for i in xrange(len(sids)):
83 sid_list[i] = SRv6.create_srv6_sid_object(sids[i])
84 return dict(num_sids=len(sids), weight=1, sids=sid_list)
87 def configure_sr_localsid(
88 node, local_sid, behavior, interface=None, next_hop=None,
89 fib_table=None, out_if=None, in_if=None, src_addr=None,
91 """Create SRv6 LocalSID and binds it to a particular behaviour on
94 :param node: Given node to create localSID on.
95 :param local_sid: LocalSID IPv6 address.
96 :param behavior: SRv6 LocalSID function.
97 :param interface: Interface name (Optional, required for
99 :param next_hop: Next hop IPv4/IPv6 address (Optional, required for L3
101 :param fib_table: FIB table for IPv4/IPv6 lookup (Optional, required for
103 :param out_if: Interface name of local interface for sending traffic
104 towards the Service Function (Optional, required for SRv6 endpoint
105 to SR-unaware appliance).
106 :param in_if: Interface name of local interface receiving the traffic
107 coming back from the Service Function (Optional, required for SRv6
108 endpoint to SR-unaware appliance).
109 :param src_addr: Source address on the packets coming back on in_if
110 interface (Optional, required for SRv6 endpoint to SR-unaware
111 appliance via static proxy).
112 :param sid_list: SID list (Optional, required for SRv6 endpoint to
113 SR-unaware appliance via static proxy).
124 :raises ValueError: If required parameter is missing.
126 beh = behavior.replace('.', '_').upper()
127 # There is no SRv6Behaviour enum defined for functions from SRv6 plugins
128 # so we need to use CLI command to configure it.
129 if beh in (getattr(SRv6Behavior, 'END_AD').name,
130 getattr(SRv6Behavior, 'END_AS').name,
131 getattr(SRv6Behavior, 'END_AM').name):
132 if beh == getattr(SRv6Behavior, 'END_AS').name:
133 if next_hop is None or out_if is None or in_if is None or \
134 src_addr is None or sid_list is None:
136 'Required parameter(s) missing.\n'
141 'sid_list:{sids}'.format(
142 nh=next_hop, oif=out_if, iif=in_if, saddr=src_addr,
144 sid_conf = 'next ' + ' next '.join(sid_list)
145 params = 'nh {nh} oif {oif} iif {iif} src {saddr} {sids}'.\
146 format(nh=next_hop, oif=out_if, iif=in_if, saddr=src_addr,
149 if next_hop is None or out_if is None or in_if is None:
151 'Required parameter(s) missing.\nnext_hop:{0}\n'
152 'out_if:{1}\nin_if:{2}'.format(next_hop, out_if, in_if))
153 params = 'nh {0} oif {1} iif {2}'.format(
154 next_hop, out_if, in_if)
156 cli_cmd = 'sr localsid address {l_sid} behavior {beh} {params}'.\
157 format(l_sid=local_sid, beh=behavior, params=params)
159 PapiSocketExecutor.run_cli_cmd(node, cli_cmd)
162 cmd = 'sr_localsid_add_del'
165 localsid=SRv6.create_srv6_sid_object(local_sid),
167 behavior=getattr(SRv6Behavior, beh).value,
168 sw_if_index=Constants.BITWISE_NON_ZERO,
174 err_msg = 'Failed to add SR localSID {lsid} on host {host}'.format(
175 lsid=local_sid, host=node['host'])
177 if beh in (getattr(SRv6Behavior, 'END_X').name,
178 getattr(SRv6Behavior, 'END_DX4').name,
179 getattr(SRv6Behavior, 'END_DX6').name):
180 if interface is None or next_hop is None:
181 raise ValueError('Required parameter(s) missing.\n'
184 format(ifc=interface, nh=next_hop))
185 args['sw_if_index'] = InterfaceUtil.get_interface_index(
187 next_hop = ip_address(unicode(next_hop))
188 if next_hop.version == 6:
189 args['nh_addr6'] = next_hop.packed
191 args['nh_addr4'] = next_hop.packed
192 elif beh == getattr(SRv6Behavior, 'END_DX2').name:
193 if interface is None:
194 raise ValueError('Required parameter missing.\ninterface:{ifc}'.
195 format(ifc=interface))
196 args['sw_if_index'] = InterfaceUtil.get_interface_index(
198 elif beh in (getattr(SRv6Behavior, 'END_DT4').name,
199 getattr(SRv6Behavior, 'END_DT6').name):
200 if fib_table is None:
201 raise ValueError('Required parameter missing.\n'
202 'fib_table: {fib}'.format(fib=fib_table))
203 args['fib_table'] = fib_table
205 with PapiSocketExecutor(node) as papi_exec:
206 papi_exec.add(cmd, **args).get_reply(err_msg)
209 def show_sr_localsids(node):
210 """Show SRv6 LocalSIDs on the given node.
212 :param node: Given node to show localSIDs on.
215 cmd = 'sr_localsids_dump'
216 err_msg = 'Failed to get SR localSID dump on host {host}'.format(
219 with PapiSocketExecutor(node) as papi_exec:
220 papi_exec.add(cmd).get_details(err_msg)
223 def configure_sr_policy(node, bsid, sid_list, mode='encap'):
224 """Create SRv6 policy on the given node.
226 :param node: Given node to create SRv6 policy on.
227 :param bsid: BindingSID - local SID IPv6 address.
228 :param sid_list: SID list.
229 :param mode: Encapsulation / insertion mode.
235 cmd = 'sr_policy_add'
237 bsid_addr=IPv6Address(unicode(bsid)).packed,
239 is_encap=1 if mode == 'encap' else 0,
240 sids=SRv6.create_srv6_sid_list(sid_list)
242 err_msg = 'Failed to add SR policy for BindingSID {bsid} ' \
243 'on host {host}'.format(bsid=bsid, host=node['host'])
245 with PapiSocketExecutor(node) as papi_exec:
246 papi_exec.add(cmd, **args).get_reply(err_msg)
249 def show_sr_policies(node):
250 """Show SRv6 policies on the given node.
252 :param node: Given node to show SRv6 policies on.
255 cmd = 'sr_policies_dump'
256 err_msg = 'Failed to get SR policies dump on host {host}'.format(
259 with PapiSocketExecutor(node) as papi_exec:
260 papi_exec.add(cmd).get_details(err_msg)
263 def _get_sr_steer_policy_args(
264 node, mode, interface=None, ip_addr=None, prefix=None):
265 """Return values of sw_if_index, mask_width, prefix_addr and
266 traffic_type for sr_steering_add_del API.
268 :param node: Given node to create/delete steering policy on.
269 :param mode: Mode of operation - L2 or L3.
270 :param interface: Interface name (Optional, required in case of
272 :param ip_addr: IPv4/IPv6 address (Optional, required in case of L3
274 :param prefix: IP address prefix (Optional, required in case of L3
281 :returns: Values for sw_if_index, mask_width, prefix_addr and
284 :raises ValueError: If unsupported mode used or required parameter
287 if mode.lower() == 'l2':
288 if interface is None:
289 raise ValueError('Required data missing.\ninterface:{ifc}'.
290 format(ifc=interface))
291 sw_if_index = InterfaceUtil.get_interface_index(node, interface)
293 prefix_addr = 16 * b'\x00'
294 traffic_type = getattr(SRv6PolicySteeringTypes, 'SR_STEER_L2').value
295 elif mode.lower() == 'l3':
296 if ip_addr is None or prefix is None:
297 raise ValueError('Required data missing.\nIP address:{0}\n'
298 'mask:{1}'.format(ip_addr, prefix))
299 sw_if_index = Constants.BITWISE_NON_ZERO
300 ip_addr = ip_address(unicode(ip_addr))
301 prefix_addr = ip_addr.packed
302 mask_width = int(prefix)
303 if ip_addr.version == 4:
304 prefix_addr += 12 * b'\x00'
305 traffic_type = getattr(
306 SRv6PolicySteeringTypes, 'SR_STEER_IPV4').value
308 traffic_type = getattr(
309 SRv6PolicySteeringTypes, 'SR_STEER_IPV6').value
311 raise ValueError('Unsupported mode: {0}'.format(mode))
313 return sw_if_index, mask_width, prefix_addr, traffic_type
315 # TODO: Bring L1 names, arguments and defaults closer to PAPI ones.
317 def configure_sr_steer(
318 node, mode, bsid, interface=None, ip_addr=None, prefix=None):
319 """Create SRv6 steering policy on the given node.
321 :param node: Given node to create steering policy on.
322 :param mode: Mode of operation - L2 or L3.
323 :param bsid: BindingSID - local SID IPv6 address.
324 :param interface: Interface name (Optional, required in case of
326 :param ip_addr: IPv4/IPv6 address (Optional, required in case of L3
328 :param prefix: IP address prefix (Optional, required in case of L3
336 :raises ValueError: If unsupported mode used or required parameter
339 sw_if_index, mask_width, prefix_addr, traffic_type = \
340 SRv6._get_sr_steer_policy_args(
341 node, mode, interface, ip_addr, prefix)
343 cmd = 'sr_steering_add_del'
346 bsid_addr=IPv6Address(unicode(bsid)).packed,
349 prefix_addr=prefix_addr,
350 mask_width=mask_width,
351 sw_if_index=sw_if_index,
352 traffic_type=traffic_type
354 err_msg = 'Failed to add SRv6 steering policy for BindingSID {bsid} ' \
355 'on host {host}'.format(bsid=bsid, host=node['host'])
357 with PapiSocketExecutor(node) as papi_exec:
358 papi_exec.add(cmd, **args).get_reply(err_msg)
361 def show_sr_steering_policies(node):
362 """Show SRv6 steering policies on the given node.
364 :param node: Given node to show SRv6 steering policies on.
367 cmd = 'sr_steering_pol_dump'
368 err_msg = 'Failed to get SR localSID dump on host {host}'.format(
371 with PapiSocketExecutor(node) as papi_exec:
372 papi_exec.add(cmd).get_details(err_msg)
375 def set_sr_encaps_source_address(node, ip6_addr):
376 """Set SRv6 encapsulation source address on the given node.
378 :param node: Given node to set SRv6 encapsulation source address on.
379 :param ip6_addr: Local SID IPv6 address.
383 cmd = 'sr_set_encap_source'
384 args = dict(encaps_source=IPv6Address(unicode(ip6_addr)).packed)
385 err_msg = 'Failed to set SRv6 encapsulation source address {addr} ' \
386 'on host {host}'.format(addr=ip6_addr, host=node['host'])
388 with PapiSocketExecutor(node) as papi_exec:
389 papi_exec.add(cmd, **args).get_reply(err_msg)