-# Copyright (c) 2017 Cisco and/or its affiliates.
+# Copyright (c) 2019 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
# See the License for the specific language governing permissions and
# limitations under the License.
-"""Segment Routing over IPv6 dataplane utilities library."""
+"""Segment Routing over IPv6 data plane utilities library."""
-from enum import Enum
+from enum import IntEnum
+from ipaddress import ip_address, IPv6Address
-from resources.libraries.python.VatExecutor import VatTerminal
-from resources.libraries.python.VatJsonUtil import VatJsonUtil
-from resources.libraries.python.topology import Topology
+from resources.libraries.python.Constants import Constants
+from resources.libraries.python.InterfaceUtil import InterfaceUtil
+from resources.libraries.python.PapiExecutor import PapiSocketExecutor
-class SRv6Behaviour(Enum):
- """Defines SRv6 endpoint functions implemented in VPP."""
+class SRv6Behavior(IntEnum):
+ """SRv6 LocalSID supported functions."""
# Endpoint function
- END = 'end'
+ END = 1
# Endpoint function with Layer-3 cross-connect
- END_X = 'end.x'
+ END_X = 2
# Endpoint with decapsulation and Layer-2 cross-connect
- END_DX2 = 'end.dx2'
- # Endpoint with decapsulation and IPv4 cross-connect
- END_DX4 = 'end.dx4'
- # Endpoint with decapsulation and IPv4 table lookup
- END_DT4 = 'end.dt4'
+ END_DX2 = 5
# Endpoint with decapsulation and IPv6 cross-connect
- END_DX6 = 'end.dx6'
+ END_DX6 = 6
+ # Endpoint with decapsulation and IPv4 cross-connect
+ END_DX4 = 7
# Endpoint with decapsulation and IPv6 table lookup
- END_DT6 = 'end.dt6'
+ END_DT6 = 8
+ # Endpoint with decapsulation and IPv4 table lookup
+ END_DT4 = 9
+ # Endpoint to SR-unaware appliance via static proxy
+ END_AS = 20
+ # Endpoint to SR-unaware appliance via dynamic proxy
+ END_AD = 21
+ # Endpoint to SR-unaware appliance via masquerading
+ END_AM = 22
+
+
+class SRv6PolicySteeringTypes(IntEnum):
+ """SRv6 LocalSID supported functions."""
+ SR_STEER_L2 = 2
+ SR_STEER_IPV4 = 4
+ SR_STEER_IPV6 = 6
class SRv6(object):
pass
@staticmethod
- def configure_sr_localsid(node, local_sid, behavior, interface=None,
- next_hop=None, fib_table=None):
+ def create_srv6_sid_object(ip_addr):
+ """Create SRv6 SID object.
+
+ :param ip_addr: IPv6 address.
+ :type ip_addr: str
+ :returns: SRv6 SID object.
+ :rtype: dict
+ """
+ return dict(addr=IPv6Address(unicode(ip_addr)).packed)
+
+ @staticmethod
+ def create_srv6_sid_list(sids):
+ """Create SRv6 SID list object.
+
+ :param sids: SID IPv6 addresses.
+ :type sids: list
+ :returns: SRv6 SID list object.
+ :rtype: list
+ """
+ sid_list = list(0 for _ in xrange(16))
+ for i in xrange(len(sids)):
+ sid_list[i] = SRv6.create_srv6_sid_object(sids[i])
+ return dict(num_sids=len(sids), weight=1, sids=sid_list)
+
+ @staticmethod
+ def configure_sr_localsid(
+ node, local_sid, behavior, interface=None, next_hop=None,
+ fib_table=None, out_if=None, in_if=None, src_addr=None,
+ sid_list=None):
"""Create SRv6 LocalSID and binds it to a particular behaviour on
the given node.
xconnects).
:param fib_table: FIB table for IPv4/IPv6 lookup (Optional, required for
L3 routing).
+ :param out_if: Interface name of local interface for sending traffic
+ towards the Service Function (Optional, required for SRv6 endpoint
+ to SR-unaware appliance).
+ :param in_if: Interface name of local interface receiving the traffic
+ coming back from the Service Function (Optional, required for SRv6
+ endpoint to SR-unaware appliance).
+ :param src_addr: Source address on the packets coming back on in_if
+ interface (Optional, required for SRv6 endpoint to SR-unaware
+ appliance via static proxy).
+ :param sid_list: SID list (Optional, required for SRv6 endpoint to
+ SR-unaware appliance via static proxy).
:type node: dict
:type local_sid: str
:type behavior: str
:type interface: str
- :type next_hop: int
+ :type next_hop: str
:type fib_table: str
- :raises ValueError: If unsupported SRv6 LocalSID function used or
- required parameter is missing.
+ :type out_if: str
+ :type in_if: str
+ :type src_addr: str
+ :type sid_list: list
+ :raises ValueError: If required parameter is missing.
"""
- if behavior == SRv6Behaviour.END:
- params = ''
- elif behavior in [SRv6Behaviour.END_X, SRv6Behaviour.END_DX4,
- SRv6Behaviour.END_DX6]:
+ beh = behavior.replace('.', '_').upper()
+ # There is no SRv6Behaviour enum defined for functions from SRv6 plugins
+ # so we need to use CLI command to configure it.
+ if beh in (getattr(SRv6Behavior, 'END_AD').name,
+ getattr(SRv6Behavior, 'END_AS').name,
+ getattr(SRv6Behavior, 'END_AM').name):
+ if beh == getattr(SRv6Behavior, 'END_AS').name:
+ if next_hop is None or out_if is None or in_if is None or \
+ src_addr is None or sid_list is None:
+ raise ValueError(
+ 'Required parameter(s) missing.\n'
+ 'next_hop:{nh}\n'
+ 'out_if:{oif}\n'
+ 'in_if:{iif}\n'
+ 'src_addr:{saddr}\n'
+ 'sid_list:{sids}'.format(
+ nh=next_hop, oif=out_if, iif=in_if, saddr=src_addr,
+ sids=sid_list))
+ sid_conf = 'next ' + ' next '.join(sid_list)
+ params = 'nh {nh} oif {oif} iif {iif} src {saddr} {sids}'.\
+ format(nh=next_hop, oif=out_if, iif=in_if, saddr=src_addr,
+ sids=sid_conf)
+ else:
+ if next_hop is None or out_if is None or in_if is None:
+ raise ValueError(
+ 'Required parameter(s) missing.\nnext_hop:{0}\n'
+ 'out_if:{1}\nin_if:{2}'.format(next_hop, out_if, in_if))
+ params = 'nh {0} oif {1} iif {2}'.format(
+ next_hop, out_if, in_if)
+
+ cli_cmd = 'sr localsid address {l_sid} behavior {beh} {params}'.\
+ format(l_sid=local_sid, beh=behavior, params=params)
+
+ PapiSocketExecutor.run_cli_cmd(node, cli_cmd)
+ return
+
+ cmd = 'sr_localsid_add_del'
+ args = dict(
+ is_del=0,
+ localsid=SRv6.create_srv6_sid_object(local_sid),
+ end_psp=0,
+ behavior=getattr(SRv6Behavior, beh).value,
+ sw_if_index=Constants.BITWISE_NON_ZERO,
+ vlan_index=0,
+ fib_table=0,
+ nh_addr6=0,
+ nh_addr4=0
+ )
+ err_msg = 'Failed to add SR localSID {lsid} on host {host}'.format(
+ lsid=local_sid, host=node['host'])
+
+ if beh in (getattr(SRv6Behavior, 'END_X').name,
+ getattr(SRv6Behavior, 'END_DX4').name,
+ getattr(SRv6Behavior, 'END_DX6').name):
if interface is None or next_hop is None:
- raise ValueError('Required data missing.\ninterface:{0}\n'
- 'next_hop:{1}'.format(interface, next_hop))
- interface_name = Topology.get_interface_name(node, interface)
- params = '{0} {1}'.format(interface_name, next_hop)
- elif behavior == SRv6Behaviour.END_DX2:
+ raise ValueError('Required parameter(s) missing.\n'
+ 'interface:{ifc}\n'
+ 'next_hop:{nh}'.
+ format(ifc=interface, nh=next_hop))
+ args['sw_if_index'] = InterfaceUtil.get_interface_index(
+ node, interface)
+ next_hop = ip_address(unicode(next_hop))
+ if next_hop.version == 6:
+ args['nh_addr6'] = next_hop.packed
+ else:
+ args['nh_addr4'] = next_hop.packed
+ elif beh == getattr(SRv6Behavior, 'END_DX2').name:
if interface is None:
- raise ValueError('Required data missing.\ninterface:{0}\n'.
- format(interface))
- params = '{0}'.format(interface)
- elif behavior in [SRv6Behaviour.END_DT4, SRv6Behaviour.END_DT6]:
+ raise ValueError('Required parameter missing.\ninterface:{ifc}'.
+ format(ifc=interface))
+ args['sw_if_index'] = InterfaceUtil.get_interface_index(
+ node, interface)
+ elif beh in (getattr(SRv6Behavior, 'END_DT4').name,
+ getattr(SRv6Behavior, 'END_DT6').name):
if fib_table is None:
- raise ValueError('Required data missing.\nfib_table:{0}\n'.
- format(fib_table))
- params = '{0}'.format(fib_table)
- else:
- raise ValueError('Unsupported SRv6 LocalSID function: {0}'.
- format(behavior))
+ raise ValueError('Required parameter missing.\n'
+ 'fib_table: {fib}'.format(fib=fib_table))
+ args['fib_table'] = fib_table
- with VatTerminal(node) as vat:
- resp = vat.vat_terminal_exec_cmd_from_template(
- 'srv6/sr_localsid_add.vat', local_sid=local_sid,
- behavior=behavior, params=params)
-
- VatJsonUtil.verify_vat_retval(
- resp[0],
- err_msg='Create SRv6 LocalSID {0} failed on node {1}'.format(
- local_sid, node['host']))
-
- @staticmethod
- def delete_sr_localsid(node, local_sid):
- """Delete SRv6 LocalSID on the given node.
-
- :param node: Given node to delete localSID on.
- :param local_sid: LocalSID IPv6 address.
- :type node: dict
- :type local_sid: str
- """
- with VatTerminal(node) as vat:
- resp = vat.vat_terminal_exec_cmd_from_template(
- 'srv6/sr_localsid_del.vat', local_sid=local_sid)
-
- VatJsonUtil.verify_vat_retval(
- resp[0],
- err_msg='Delete SRv6 LocalSID {0} failed on node {1}'.format(
- local_sid, node['host']))
+ with PapiSocketExecutor(node) as papi_exec:
+ papi_exec.add(cmd, **args).get_reply(err_msg)
@staticmethod
def show_sr_localsids(node):
:param node: Given node to show localSIDs on.
:type node: dict
"""
- with VatTerminal(node) as vat:
- vat.vat_terminal_exec_cmd_from_template(
- 'srv6/sr_localsids_show.vat')
+ cmd = 'sr_localsids_dump'
+ err_msg = 'Failed to get SR localSID dump on host {host}'.format(
+ host=node['host'])
+
+ with PapiSocketExecutor(node) as papi_exec:
+ papi_exec.add(cmd).get_details(err_msg)
@staticmethod
def configure_sr_policy(node, bsid, sid_list, mode='encap'):
:type sid_list: list
:type mode: str
"""
- sid_conf = 'next ' + ' next '.join(sid_list)
-
- with VatTerminal(node) as vat:
- resp = vat.vat_terminal_exec_cmd_from_template(
- 'srv6/sr_policy_add.vat', bsid=bsid,
- sid_conf=sid_conf, mode=mode)
-
- VatJsonUtil.verify_vat_retval(
- resp[0],
- err_msg='Create SRv6 policy for BindingSID {0} failed on node '
- '{1}'.format(bsid, node['host']))
-
- @staticmethod
- def delete_sr_policy(node, bsid):
- """Delete SRv6 policy on the given node.
-
- :param node: Given node to delete SRv6 policy on.
- :param bsid: BindingSID IPv6 address.
- :type node: dict
- :type bsid: str
- """
- with VatTerminal(node) as vat:
- resp = vat.vat_terminal_exec_cmd_from_template(
- 'srv6/sr_policy_del.vat', bsid=bsid)
-
- VatJsonUtil.verify_vat_retval(
- resp[0],
- err_msg='Delete SRv6 policy for BindingSID {0} failed on node '
- '{1}'.format(bsid, node['host']))
+ cmd = 'sr_policy_add'
+ args = dict(
+ bsid_addr=IPv6Address(unicode(bsid)).packed,
+ weight=1,
+ is_encap=1 if mode == 'encap' else 0,
+ sids=SRv6.create_srv6_sid_list(sid_list)
+ )
+ err_msg = 'Failed to add SR policy for BindingSID {bsid} ' \
+ 'on host {host}'.format(bsid=bsid, host=node['host'])
+
+ with PapiSocketExecutor(node) as papi_exec:
+ papi_exec.add(cmd, **args).get_reply(err_msg)
@staticmethod
def show_sr_policies(node):
:param node: Given node to show SRv6 policies on.
:type node: dict
"""
- with VatTerminal(node, json_param=False) as vat:
- vat.vat_terminal_exec_cmd_from_template(
- 'srv6/sr_policies_show.vat')
+ cmd = 'sr_policies_dump'
+ err_msg = 'Failed to get SR policies dump on host {host}'.format(
+ host=node['host'])
+
+ with PapiSocketExecutor(node) as papi_exec:
+ papi_exec.add(cmd).get_details(err_msg)
@staticmethod
- def configure_sr_steer(node, mode, bsid, interface=None, ip_addr=None,
- mask=None):
- """Create SRv6 steering policy on the given node.
+ def _get_sr_steer_policy_args(
+ node, mode, interface=None, ip_addr=None, prefix=None):
+ """Return values of sw_if_index, mask_width, prefix_addr and
+ traffic_type for sr_steering_add_del API.
- :param node: Given node to create steering policy on.
+ :param node: Given node to create/delete steering policy on.
:param mode: Mode of operation - L2 or L3.
- :param bsid: BindingSID - local SID IPv6 address.
:param interface: Interface name (Optional, required in case of
L2 mode).
:param ip_addr: IPv4/IPv6 address (Optional, required in case of L3
mode).
- :param mask: IP address mask (Optional, required in case of L3 mode).
+ :param prefix: IP address prefix (Optional, required in case of L3
+ mode).
:type node: dict
:type mode: str
- :type bsid: str
:type interface: str
- :type ip_addr: int
- :type mask: int
+ :type ip_addr: str
+ :type prefix: int
+ :returns: Values for sw_if_index, mask_width, prefix_addr and
+ traffic_type
+ :rtype: tuple
:raises ValueError: If unsupported mode used or required parameter
is missing.
"""
- if mode == 'l2':
+ if mode.lower() == 'l2':
if interface is None:
- raise ValueError('Required data missing.\ninterface:{0}\n'.
- format(interface))
- interface_name = Topology.get_interface_name(node, interface)
- params = 'l2 {0}'.format(interface_name)
- elif mode == 'l3':
- if ip_addr is None or mask is None:
+ raise ValueError('Required data missing.\ninterface:{ifc}'.
+ format(ifc=interface))
+ sw_if_index = InterfaceUtil.get_interface_index(node, interface)
+ mask_width = 0
+ prefix_addr = 16 * b'\x00'
+ traffic_type = getattr(SRv6PolicySteeringTypes, 'SR_STEER_L2').value
+ elif mode.lower() == 'l3':
+ if ip_addr is None or prefix is None:
raise ValueError('Required data missing.\nIP address:{0}\n'
- 'mask:{1}'.format(ip_addr, mask))
- params = '{0}/{1}'.format(ip_addr, mask)
+ 'mask:{1}'.format(ip_addr, prefix))
+ sw_if_index = Constants.BITWISE_NON_ZERO
+ ip_addr = ip_address(unicode(ip_addr))
+ prefix_addr = ip_addr.packed
+ mask_width = int(prefix)
+ if ip_addr.version == 4:
+ prefix_addr += 12 * b'\x00'
+ traffic_type = getattr(
+ SRv6PolicySteeringTypes, 'SR_STEER_IPV4').value
+ else:
+ traffic_type = getattr(
+ SRv6PolicySteeringTypes, 'SR_STEER_IPV6').value
else:
raise ValueError('Unsupported mode: {0}'.format(mode))
- with VatTerminal(node) as vat:
- resp = vat.vat_terminal_exec_cmd_from_template(
- 'srv6/sr_steer_add_del.vat', params=params, bsid=bsid)
-
- VatJsonUtil.verify_vat_retval(
- resp[0],
- err_msg='Create SRv6 steering policy for BindingSID {0} failed on '
- 'node {1}'.format(bsid, node['host']))
+ return sw_if_index, mask_width, prefix_addr, traffic_type
+ # TODO: Bring L1 names, arguments and defaults closer to PAPI ones.
@staticmethod
- def delete_sr_steer(node, mode, bsid, interface=None, ip_addr=None,
- mask=None):
- """Delete SRv6 steering policy on the given node.
+ def configure_sr_steer(
+ node, mode, bsid, interface=None, ip_addr=None, prefix=None):
+ """Create SRv6 steering policy on the given node.
- :param node: Given node to delete steering policy on.
+ :param node: Given node to create steering policy on.
:param mode: Mode of operation - L2 or L3.
:param bsid: BindingSID - local SID IPv6 address.
:param interface: Interface name (Optional, required in case of
L2 mode).
:param ip_addr: IPv4/IPv6 address (Optional, required in case of L3
mode).
- :param mask: IP address mask (Optional, required in case of L3 mode).
+ :param prefix: IP address prefix (Optional, required in case of L3
+ mode).
:type node: dict
:type mode: str
:type bsid: str
:type interface: str
- :type ip_addr: int
- :type mask: int
+ :type ip_addr: str
+ :type prefix: int
:raises ValueError: If unsupported mode used or required parameter
is missing.
"""
- params = 'del'
- if mode == 'l2':
- if interface is None:
- raise ValueError('Required data missing.\ninterface:{0}\n'.
- format(interface))
- interface_name = Topology.get_interface_name(node, interface)
- params += 'l2 {0}'.format(interface_name)
- elif mode == 'l3':
- if ip_addr is None or mask is None:
- raise ValueError('Required data missing.\nIP address:{0}\n'
- 'mask:{1}'.format(ip_addr, mask))
- params += '{0}/{1}'.format(ip_addr, mask)
- else:
- raise ValueError('Unsupported mode: {0}'.format(mode))
-
- with VatTerminal(node) as vat:
- resp = vat.vat_terminal_exec_cmd_from_template(
- 'srv6/sr_steer_add_del.vat', params=params, bsid=bsid)
-
- VatJsonUtil.verify_vat_retval(
- resp[0],
- err_msg='Delete SRv6 policy for bsid {0} failed on node {1}'.format(
- bsid, node['host']))
+ sw_if_index, mask_width, prefix_addr, traffic_type = \
+ SRv6._get_sr_steer_policy_args(
+ node, mode, interface, ip_addr, prefix)
+
+ cmd = 'sr_steering_add_del'
+ args = dict(
+ is_del=0,
+ bsid_addr=IPv6Address(unicode(bsid)).packed,
+ sr_policy_index=0,
+ table_id=0,
+ prefix_addr=prefix_addr,
+ mask_width=mask_width,
+ sw_if_index=sw_if_index,
+ traffic_type=traffic_type
+ )
+ err_msg = 'Failed to add SRv6 steering policy for BindingSID {bsid} ' \
+ 'on host {host}'.format(bsid=bsid, host=node['host'])
+
+ with PapiSocketExecutor(node) as papi_exec:
+ papi_exec.add(cmd, **args).get_reply(err_msg)
@staticmethod
def show_sr_steering_policies(node):
:param node: Given node to show SRv6 steering policies on.
:type node: dict
"""
- with VatTerminal(node, json_param=False) as vat:
- vat.vat_terminal_exec_cmd_from_template(
- 'srv6/sr_steer_policies_show.vat')
+ cmd = 'sr_steering_pol_dump'
+ err_msg = 'Failed to get SR localSID dump on host {host}'.format(
+ host=node['host'])
+
+ with PapiSocketExecutor(node) as papi_exec:
+ papi_exec.add(cmd).get_details(err_msg)
@staticmethod
def set_sr_encaps_source_address(node, ip6_addr):
:type node: dict
:type ip6_addr: str
"""
- with VatTerminal(node) as vat:
- resp = vat.vat_terminal_exec_cmd_from_template(
- 'srv6/sr_set_encaps_source.vat', ip6_addr=ip6_addr)
-
- VatJsonUtil.verify_vat_retval(
- resp[0],
- err_msg='Set SRv6 encapsulation source address {0} failed on node'
- ' {1}'.format(ip6_addr, node['host']))
+ cmd = 'sr_set_encap_source'
+ args = dict(encaps_source=IPv6Address(unicode(ip6_addr)).packed)
+ err_msg = 'Failed to set SRv6 encapsulation source address {addr} ' \
+ 'on host {host}'.format(addr=ip6_addr, host=node['host'])
+
+ with PapiSocketExecutor(node) as papi_exec:
+ papi_exec.add(cmd, **args).get_reply(err_msg)