+++ /dev/null
-# Copyright (c) 2019 Cisco and/or its affiliates.
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at:
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""GBP utilities library."""
-
-from enum import IntEnum
-from ipaddress import ip_address
-
-from resources.libraries.python.IPUtil import IPUtil
-from resources.libraries.python.L2Util import L2Util
-from resources.libraries.python.PapiExecutor import PapiSocketExecutor
-from resources.libraries.python.topology import Topology
-
-
-class GBPEndpointFlags(IntEnum):
- """GBP Endpoint Flags."""
- GBP_API_ENDPOINT_FLAG_NONE = 0
- GBP_API_ENDPOINT_FLAG_BOUNCE = 1
- GBP_API_ENDPOINT_FLAG_REMOTE = 2
- GBP_API_ENDPOINT_FLAG_LEARNT = 4
- GBP_API_ENDPOINT_FLAG_EXTERNAL = 8
-
-
-class GBPBridgeDomainFlags(IntEnum):
- """GBP Bridge Domain Flags."""
- GBP_BD_API_FLAG_NONE = 0
- GBP_BD_API_FLAG_DO_NOT_LEARN = 1
- GBP_BD_API_FLAG_UU_FWD_DROP = 2
- GBP_BD_API_FLAG_MCAST_DROP = 4
- GBP_BD_API_FLAG_UCAST_ARP = 8
-
-
-class GBPSubnetType(IntEnum):
- """GBP Subnet Type."""
- GBP_API_SUBNET_TRANSPORT = 1
- GBP_API_SUBNET_STITCHED_INTERNAL = 2
- GBP_API_SUBNET_STITCHED_EXTERNAL = 3
- GBP_API_SUBNET_L3_OUT = 4
- GBP_API_SUBNET_ANON_L3_OUT = 5
-
-
-class GBPExtItfFlags(IntEnum):
- """GBP External Interface Flags."""
- GBP_API_EXT_ITF_F_NONE = 0
- GBP_API_EXT_ITF_F_ANON = 1
-
-
-class GBPRuleAction(IntEnum):
- """GBP Rule Action."""
- GBP_API_RULE_PERMIT = 1
- GBP_API_RULE_DENY = 2
- GBP_API_RULE_REDIRECT = 3
-
-
-class GBP(object):
- """GBP utilities."""
-
- @staticmethod
- def gbp_route_domain_add(
- node, rd_id=1, ip4_table_id=1, ip6_table_id=0,
- ip4_uu_sw_if_index=0xffffffff, ip6_uu_sw_if_index=0xffffffff):
- """Add GBP route domain.
-
- :param node: Node to add GBP route domain on.
- :param rd_id: GBP route domain ID.
- :param ip4_table_id: IPv4 table.
- :param ip6_table_id: IPv6 table.
- :param ip4_uu_sw_if_index: IPv4 unicast interface index.
- :param ip6_uu_sw_if_index: IPv6 unicast interface index.
- :type node: dict
- :type rd_id: int
- :type ip4_table_id: int
- :type ip6_table_id: int
- :type ip4_uu_sw_if_index: int
- :type ip6_uu_sw_if_index: int
- """
- cmd = 'gbp_route_domain_add'
- err_msg = 'Failed to add GBP route domain on {node}!'\
- .format(node=node['host'])
-
- args_in = dict(
- rd = dict (
- rd_id = rd_id,
- ip4_table_id = ip4_table_id,
- ip6_table_id = ip6_table_id,
- ip4_uu_sw_if_index = ip4_uu_sw_if_index,
- ip6_uu_sw_if_index = ip6_uu_sw_if_index
- )
- )
-
- with PapiSocketExecutor(node) as papi_exec:
- papi_exec.add(cmd, **args_in).get_reply(err_msg)
-
- @staticmethod
- def gbp_bridge_domain_add(
- node, bvi_sw_if_index, bd_id=1, rd_id=1,
- uu_fwd_sw_if_index=0xffffffff, bm_flood_sw_if_index=0xffffffff):
- """Add GBP bridge domain.
-
- :param node: Node to add GBP bridge domain on.
- :param bvi_sw_if_index: SW index of BVI/loopback interface.
- :param bd_id: GBP bridge domain ID.
- :param rd_id: GBP route domain ID.
- :param uu_fwd_sw_if_index: Unicast forward interface index.
- :param bm_flood_sw_if_index: Bcast/Mcast flood interface index.
- :type node: dict
- :type bvi_sw_if_index: int
- :type bd_id: int
- :type rd_id: int
- :type uu_fwd_sw_if_index: int
- :type bm_flood_sw_if_index: int
- """
- cmd = 'gbp_bridge_domain_add'
- err_msg = 'Failed to add GBP bridge domain on {node}!'\
- .format(node=node['host'])
-
- args_in = dict(
- bd = dict(
- flags = getattr(GBPBridgeDomainFlags,
- 'GBP_BD_API_FLAG_NONE').value,
- bvi_sw_if_index = bvi_sw_if_index,
- uu_fwd_sw_if_index = uu_fwd_sw_if_index,
- bm_flood_sw_if_index = bm_flood_sw_if_index,
- bd_id = bd_id,
- rd_id = rd_id
- )
- )
-
- with PapiSocketExecutor(node) as papi_exec:
- papi_exec.add(cmd, **args_in).get_reply(err_msg)
-
- @staticmethod
- def gbp_endpoint_group_add(
- node, sclass, bd_id=1, rd_id=1, vnid=1,
- uplink_sw_if_index=0xffffffff, remote_ep_timeout=0xffffffff):
- """Add GBP endpoint group.
-
- :param node: Node to add GBP endpoint group on.
- :param sclass: Source CLASS.
- :param bd_id: GBP bridge domain ID.
- :param rd_id: GBP route domain ID.
- :param uplink_sw_if_index: Uplink interface index.
- :param remote_ep_timeout: Remote endpoint interface index.
- :param vnid: VNID.
- :type node: dict
- :type sclass: int
- :type bd_id: int
- :type rd_id: int
- :type vnid: int
- :type uplink_sw_if_index: int
- :type remote_ep_timeout: int
- """
- cmd = 'gbp_endpoint_group_add'
- err_msg = 'Failed to add GBP endpoint group on {node}!'\
- .format(node=node['host'])
-
- args_in = dict(
- epg = dict (
- uplink_sw_if_index = uplink_sw_if_index,
- bd_id = bd_id,
- rd_id = rd_id,
- vnid = vnid,
- sclass = sclass,
- retention = dict (
- remote_ep_timeout = remote_ep_timeout
- )
- )
- )
-
- with PapiSocketExecutor(node) as papi_exec:
- papi_exec.add(cmd, **args_in).get_reply(err_msg)
-
- @staticmethod
- def gbp_endpoint_add(node, sw_if_index, ip_addr, mac_addr, sclass):
- """Add GBP endpoint.
-
- :param node: Node to add GBP endpoint on.
- :param sw_if_index: SW index of interface.
- :param ip_addr: GBP route domain ID.
- :param mac_addr: MAC address.
- :param sclass: Source CLASS.
- :type node: dict
- :type sw_if_index: int
- :type ip_addr: str
- :type mac_addr: str
- :type sclass: int
- """
- cmd = 'gbp_endpoint_add'
- err_msg = 'Failed to add GBP endpoint on {node}!'\
- .format(node=node['host'])
-
- ips = list()
- ips.append(IPUtil.create_ip_address_object(
- ip_address(unicode(ip_addr))))
- tun_src = IPUtil.create_ip_address_object(
- ip_address(unicode('0.0.0.0')))
- tun_dst = IPUtil.create_ip_address_object(
- ip_address(unicode('0.0.0.0')))
-
- args_in = dict(
- endpoint = dict(
- sw_if_index = sw_if_index,
- ips = ips,
- n_ips = len(ips),
- mac = L2Util.mac_to_bin(mac_addr),
- sclass = sclass,
- flags = getattr(GBPEndpointFlags,
- 'GBP_API_ENDPOINT_FLAG_EXTERNAL').value,
- tun = dict(
- src = tun_src,
- dst = tun_dst
- )
- )
- )
-
- with PapiSocketExecutor(node) as papi_exec:
- papi_exec.add(cmd, **args_in).get_reply(err_msg)
-
- @staticmethod
- def gbp_ext_itf_add_del(node, sw_if_index, bd_id=1, rd_id=1):
- """Add external interface to GBP.
-
- :param node: Node to add external GBP interface on.
- :param sw_if_index: SW index of interface.
- :param bd_id: GBP bridge domain ID.
- :param rd_id: GBP route domain ID.
- :type node: dict
- :type sw_if_index: int
- :type bd_id: int
- :type rd_id: int
- """
- cmd = 'gbp_ext_itf_add_del'
- err_msg = 'Failed to add external GBP interface on {node}!'\
- .format(node=node['host'])
-
- args_in = dict(
- is_add = 1,
- ext_itf = dict(
- sw_if_index = sw_if_index,
- bd_id = bd_id,
- rd_id = rd_id,
- flags = getattr(GBPExtItfFlags,
- 'GBP_API_EXT_ITF_F_NONE').value
- )
- )
-
- with PapiSocketExecutor(node) as papi_exec:
- papi_exec.add(cmd, **args_in).get_reply(err_msg)
-
- @staticmethod
- def gbp_subnet_add_del(
- node, address, subnet_length, sclass, rd_id=1,
- sw_if_index=0xffffffff):
- """Add external interface to GBP.
-
- :param node: Node to add GBP subnet on.
- :param address: IPv4 adddress.
- :param subnet_length: IPv4 address subnet.
- :param sclass: Source CLASS.
- :param rd_id: GBP route domain ID.
- :param sw_if_index: Interface index.
- :type node: dict
- :type address: int
- :type subnet_length: int
- :type sclass: int
- :type rd_id: int
- :type sw_if_index: int
- """
- cmd = 'gbp_subnet_add_del'
- err_msg = 'Failed to add GBP subnet on {node}!'\
- .format(node=node['host'])
-
- args_in = dict(
- is_add = 1,
- subnet = dict(
- type = getattr(GBPSubnetType,
- 'GBP_API_SUBNET_L3_OUT').value,
- sw_if_index = sw_if_index,
- sclass = sclass,
- prefix = dict(
- address = IPUtil.create_ip_address_object(
- ip_address(unicode(address))),
- len = int(subnet_length)
- ),
- rd_id = rd_id
- )
- )
-
- with PapiSocketExecutor(node) as papi_exec:
- papi_exec.add(cmd, **args_in).get_reply(err_msg)
-
- @staticmethod
- def gbp_contract_add_del(node, sclass, dclass, acl_index=0):
- """Add GBP contract.
-
- :param node: Node to add GBP contract on.
- :param sclass: Source CLASS.
- :param dclass: Destination CLASS.
- :param acl_index: Index of ACL rule.
- :type node: dict
- :type sclass: int
- :type dclass: int
- :type acl_index: int
- """
- cmd = 'gbp_contract_add_del'
- err_msg = 'Failed to add GBP contract on {node}!'\
- .format(node=node['host'])
-
- rule_permit = dict(
- action = getattr(GBPRuleAction,
- 'GBP_API_RULE_PERMIT').value,
- nh_set = dict(
- hash_mode = list(),
- n_nhs = 8,
- nhs = [dict()]*8,
- )
- )
- rules = [rule_permit, rule_permit]
-
- args_in = dict(
- is_add = 1,
- contract = dict(
- acl_index = acl_index,
- sclass = sclass,
- dclass = dclass,
- n_rules = len(rules),
- rules = rules,
- n_ether_types = 16,
- allowed_ethertypes = [0x800, 0x86dd] + [0]*14
- )
- )
-
- with PapiSocketExecutor(node) as papi_exec:
- papi_exec.add(cmd, **args_in).get_reply(err_msg)