from collections import deque
from six import moves
-
+from vpp_papi import VPP, mac_pton
from hook import Hook
-
-# Sphinx creates auto-generated documentation by importing the python source
-# files and collecting the docstrings from them. The NO_VPP_PAPI flag allows
-# the vpp_papi_provider.py file to be importable without having to build
-# the whole vpp api if the user only wishes to generate the test documentation.
-do_import = True
-try:
- no_vpp_papi = os.getenv("NO_VPP_PAPI")
- if no_vpp_papi == "1":
- do_import = False
-except:
- pass
-
-if do_import:
- from vpp_papi import VPP
- from vpp_l2 import L2_PORT_TYPE
+from vpp_l2 import L2_PORT_TYPE
# from vnet/vnet/mpls/mpls_types.h
MPLS_IETF_MAX_LABEL = 0xfffff
self.test_class = test_class
self._expect_api_retval = self._zero
self._expect_stack = []
- jsonfiles = []
- install_dir = os.getenv('VPP_TEST_INSTALL_PATH')
- for root, dirnames, filenames in os.walk(install_dir):
- for filename in fnmatch.filter(filenames, '*.api.json'):
- jsonfiles.append(os.path.join(root, filename))
+ install_dir = os.getenv('VPP_INSTALL_PATH')
+
+ # Vapi requires 'VPP_API_DIR', not set when run from Makefile.
+ if 'VPP_API_DIR' not in os.environ:
+ os.environ['VPP_API_DIR'] = os.getenv('VPP_INSTALL_PATH')
- self.vpp = VPP(jsonfiles, logger=test_class.logger,
+ self.vpp = VPP(logger=test_class.logger,
read_timeout=read_timeout)
self._events = deque()
"""
self.hook.before_cli(cli)
cli += '\n'
- r = self.papi.cli_inband(length=len(cli), cmd=cli)
+ r = self.papi.cli_inband(cmd=cli)
self.hook.after_cli(cli)
if hasattr(r, 'reply'):
- return r.reply.decode().rstrip('\x00')
+ return r.reply
def ppcli(self, cli):
""" Helper method to print CLI command in case of info logging level.
"""
return cli + "\n" + str(self.cli(cli))
- def _convert_mac(self, mac):
- return mac.replace(':', '').decode('hex')
-
def show_version(self):
""" """
return self.api(self.papi.show_version, {})
'reverse': reverse,
'is_ipv6': is_ip6})
- def ip6_nd_proxy(self, address, sw_if_index, is_del=0):
+ def ip6_nd_proxy(self, ip, sw_if_index, is_del=0):
return self.api(self.papi.ip6nd_proxy_add_del,
- {'address': address,
+ {'ip': ip,
'sw_if_index': sw_if_index,
'is_del': is_del})
pref_lifetime=0xffffffff):
return self.api(self.papi.sw_interface_ip6nd_ra_prefix,
{'sw_if_index': sw_if_index,
- 'address': address,
- 'address_length': address_length,
+ 'prefix': {
+ 'address': address,
+ 'address_length': address_length,
+ },
'use_default': use_default,
'no_advertise': no_advertise,
'off_link': off_link,
return self.api(self.papi.bd_ip_mac_dump,
{'bd_id': bd_id})
- def want_ip4_arp_events(self, enable_disable=1, address=0):
+ def want_ip4_arp_events(self, enable_disable=1, ip="0.0.0.0"):
return self.api(self.papi.want_ip4_arp_events,
{'enable_disable': enable_disable,
- 'address': address,
+ 'ip': ip,
'pid': os.getpid(), })
- def want_ip6_nd_events(self, enable_disable=1, address=0):
+ def want_ip6_nd_events(self, enable_disable=1, ip="::"):
return self.api(self.papi.want_ip6_nd_events,
{'enable_disable': enable_disable,
- 'address': address,
+ 'ip': ip,
'pid': os.getpid(), })
def want_ip6_ra_events(self, enable_disable=1):
interface. (Default value = 0)
"""
return self.api(self.papi.l2fib_add_del,
- {'mac': self._convert_mac(mac),
+ {'mac': mac,
'bd_id': bd_id,
'sw_if_index': sw_if_index,
'is_add': is_add,
def ip_neighbor_add_del(self,
sw_if_index,
mac_address,
- dst_address,
+ ip_address,
is_add=1,
- is_ipv6=0,
- is_static=0,
- is_no_adj_fib=0,
- ):
+ flags=0):
""" Add neighbor MAC to IPv4 or IPv6 address.
:param sw_if_index:
:param mac_address:
:param dst_address:
:param is_add: (Default value = 1)
- :param is_ipv6: (Default value = 0)
- :param is_static: (Default value = 0)
- :param is_no_adj_fib: (Default value = 0)
+ :param flags: (Default value = 0/NONE)
"""
-
return self.api(
self.papi.ip_neighbor_add_del,
- {'sw_if_index': sw_if_index,
- 'is_add': is_add,
- 'is_ipv6': is_ipv6,
- 'is_static': is_static,
- 'is_no_adj_fib': is_no_adj_fib,
- 'mac_address': mac_address,
- 'dst_address': dst_address
- }
+ {
+ 'is_add': is_add,
+ 'neighbor': {
+ 'sw_if_index': sw_if_index,
+ 'flags': flags,
+ 'mac_address': mac_address,
+ 'ip_address': ip_address
+ }
+ }
)
def ip_neighbor_dump(self,
)
def proxy_arp_add_del(self,
- low_address,
- hi_address,
- vrf_id=0,
+ low,
+ hi,
+ table_id=0,
is_add=1):
""" Config Proxy Arp Range.
self.papi.proxy_arp_add_del,
{'proxy':
{
- 'vrf_id': vrf_id,
- 'low_address': low_address,
- 'hi_address': hi_address,
+ 'table_id': table_id,
+ 'low': low,
+ 'hi': hi,
},
'is_add': is_add})
'local_num': local_num,
'locals': locals})
+ def nat44_lb_static_mapping_add_del_local(
+ self,
+ external_addr,
+ external_port,
+ local_addr,
+ local_port,
+ protocol,
+ probability,
+ vrf_id=0,
+ is_add=1):
+ """Add/delete NAT44 load-balancing static mapping rule backend
+
+ :param external_addr: external IPv4 address of the servic
+ :param external_port: external L4 port number of the service
+ :param local_addr: IPv4 address of the internal node
+ :param local_port: L4 port number of the internal node
+ :param protocol: IP protocol number
+ :param probability: probability of the internal node
+ :param vrf_id: VRF id of the internal node
+ :param is_add: 1 if add, 0 if delete
+ """
+ return self.api(
+ self.papi.nat44_lb_static_mapping_add_del_local,
+ {'is_add': is_add,
+ 'external_addr': external_addr,
+ 'external_port': external_port,
+ 'local': {
+ 'addr': local_addr,
+ 'port': local_port,
+ 'probability': probability,
+ 'vrf_id': vrf_id},
+ 'protocol': protocol})
+
def nat44_lb_static_mapping_dump(self):
"""Dump NAT44 load balancing static mappings
ea_bits_len=0,
psid_offset=0,
psid_length=0,
- is_translation=0,
- is_rfc6052=0,
mtu=1280):
+
return self.api(
self.papi.map_add_domain,
{
'ea_bits_len': ea_bits_len,
'psid_offset': psid_offset,
'psid_length': psid_length,
- 'is_translation': is_translation,
- 'is_rfc6052': is_rfc6052,
'mtu': mtu
})
+ def map_if_enable_disable(self, is_enable, sw_if_index, is_translation):
+ return self.api(
+ self.papi.map_if_enable_disable,
+ {
+ 'is_enable': is_enable,
+ 'sw_if_index': sw_if_index,
+ 'is_translation': is_translation,
+ })
+
+ def map_param_set_tcp(self, tcp_mss):
+ return self.api(
+ self.papi.map_param_set_tcp,
+ {
+ 'tcp_mss': tcp_mss,
+ })
+
def gtpu_add_del_tunnel(
self,
src_addr,
def vxlan_gbp_tunnel_dump(self, sw_if_index=0xffffffff):
return self.api(self.papi.vxlan_gbp_tunnel_dump,
- {'sw_if_index': sw_if_index})
+ {'sw_if_index': sw_if_index,
+ '_no_type_conversion': True})
def pppoe_add_del_session(
self,
def ip_punt_redirect(self,
rx_sw_if_index,
tx_sw_if_index,
- nh,
- is_ip6=0,
+ address,
is_add=1):
return self.api(self.papi.ip_punt_redirect,
- {'rx_sw_if_index': rx_sw_if_index,
- 'tx_sw_if_index': tx_sw_if_index,
- 'nh': nh,
- 'is_add': is_add,
- 'is_ip6': is_ip6})
+ {'punt': {'rx_sw_if_index': rx_sw_if_index,
+ 'tx_sw_if_index': tx_sw_if_index,
+ 'nh': address},
+ 'is_add': is_add})
+
+ def ip_punt_redirect_dump(self, sw_if_index, is_ipv6=0):
+ return self.api(self.papi.ip_punt_redirect_dump,
+ {'sw_if_index': sw_if_index,
+ 'is_ipv6': is_ipv6})
def bier_table_add_del(self,
bti,
self.papi.ipsec_spd_add_del, {
'spd_id': spd_id, 'is_add': is_add})
+ def ipsec_spds_dump(self):
+ return self.api(self.papi.ipsec_spds_dump, {})
+
def ipsec_interface_add_del_spd(self, spd_id, sw_if_index, is_add=1):
""" IPSEC interface SPD add/del - \
Wrapper to associate/disassociate SPD to interface in VPP
self.papi.ipsec_interface_add_del_spd,
{'spd_id': spd_id, 'sw_if_index': sw_if_index, 'is_add': is_add})
- def ipsec_sad_add_del_entry(self,
+ def ipsec_spd_interface_dump(self, spd_index=None):
+ return self.api(self.papi.ipsec_spd_interface_dump,
+ {'spd_index': spd_index if spd_index else 0,
+ 'spd_index_valid': 1 if spd_index else 0})
+
+ def ipsec_sad_entry_add_del(self,
sad_id,
spi,
integrity_algorithm,
protocol,
tunnel_src_address='',
tunnel_dst_address='',
- is_tunnel=1,
- is_tunnel_ipv6=0,
- is_add=1,
- udp_encap=0):
+ flags=0,
+ is_add=1):
""" IPSEC SA add/del
:param sad_id: security association ID
:param spi: security param index of the SA in decimal
crypto and ipsec algorithms
"""
return self.api(
- self.papi.ipsec_sad_add_del_entry,
- {'sad_id': sad_id,
- 'spi': spi,
- 'tunnel_src_address': tunnel_src_address,
- 'tunnel_dst_address': tunnel_dst_address,
- 'protocol': protocol,
- 'integrity_algorithm': integrity_algorithm,
- 'integrity_key_length': len(integrity_key),
- 'integrity_key': integrity_key,
- 'crypto_algorithm': crypto_algorithm,
- 'crypto_key_length': len(crypto_key) if crypto_key is not None
- else 0,
- 'crypto_key': crypto_key,
- 'is_add': is_add,
- 'is_tunnel': is_tunnel,
- 'is_tunnel_ipv6': is_tunnel_ipv6,
- 'udp_encap': udp_encap})
+ self.papi.ipsec_sad_entry_add_del,
+ {
+ 'is_add': is_add,
+ 'entry':
+ {
+ 'sad_id': sad_id,
+ 'spi': spi,
+ 'tunnel_src': tunnel_src_address,
+ 'tunnel_dst': tunnel_dst_address,
+ 'protocol': protocol,
+ 'integrity_algorithm': integrity_algorithm,
+ 'integrity_key': {
+ 'length': len(integrity_key),
+ 'data': integrity_key,
+ },
+ 'crypto_algorithm': crypto_algorithm,
+ 'crypto_key': {
+ 'length': len(crypto_key),
+ 'data': crypto_key,
+ },
+ 'flags': flags,
+ }
+ })
- def ipsec_spd_add_del_entry(self,
+ def ipsec_sa_dump(self, sa_id=None):
+ return self.api(self.papi.ipsec_sa_dump,
+ {'sa_id': sa_id if sa_id else 0xffffffff})
+
+ def ipsec_spd_entry_add_del(self,
spd_id,
sa_id,
local_address_start,
:param is_add: (Default value = 1)
"""
return self.api(
- self.papi.ipsec_spd_add_del_entry,
- {'spd_id': spd_id,
- 'sa_id': sa_id,
- 'local_address_start': local_address_start,
- 'local_address_stop': local_address_stop,
- 'remote_address_start': remote_address_start,
- 'remote_address_stop': remote_address_stop,
- 'local_port_start': local_port_start,
- 'local_port_stop': local_port_stop,
- 'remote_port_start': remote_port_start,
- 'remote_port_stop': remote_port_stop,
- 'is_add': is_add,
- 'protocol': protocol,
- 'policy': policy,
- 'priority': priority,
- 'is_outbound': is_outbound,
- 'is_ipv6': is_ipv6,
- 'is_ip_any': is_ip_any})
+ self.papi.ipsec_spd_entry_add_del,
+ {
+ 'is_add': is_add,
+ 'entry':
+ {
+ 'spd_id': spd_id,
+ 'sa_id': sa_id,
+ 'local_address_start': local_address_start,
+ 'local_address_stop': local_address_stop,
+ 'remote_address_start': remote_address_start,
+ 'remote_address_stop': remote_address_stop,
+ 'local_port_start': local_port_start,
+ 'local_port_stop': local_port_stop,
+ 'remote_port_start': remote_port_start,
+ 'remote_port_stop': remote_port_stop,
+ 'protocol': protocol,
+ 'policy': policy,
+ 'priority': priority,
+ 'is_outbound': is_outbound,
+ 'is_ip_any': is_ip_any
+ }
+ })
+
+ def ipsec_spd_dump(self, spd_id, sa_id=0xffffffff):
+ return self.api(self.papi.ipsec_spd_dump,
+ {'spd_id': spd_id,
+ 'sa_id': sa_id})
def ipsec_tunnel_if_add_del(self, local_ip, remote_ip, local_spi,
remote_spi, crypto_alg, local_crypto_key,
'namespace_id': namespace_id,
'namespace_id_len': len(namespace_id)})
- def punt_socket_register(self, l4_port, pathname, header_version=1,
- is_ip4=1, l4_protocol=0x11):
- """ Punt to socket """
+ def punt_socket_register(self, port, pathname, protocol=0x11,
+ header_version=1, is_ip4=1):
+ """ Register punt socket """
return self.api(self.papi.punt_socket_register,
- {'is_ip4': is_ip4,
- 'l4_protocol': l4_protocol,
- 'l4_port': l4_port,
- 'pathname': pathname,
- 'header_version': header_version})
+ {'header_version': header_version,
+ 'punt': {'ipv': is_ip4,
+ 'l4_protocol': protocol,
+ 'l4_port': port},
+ 'pathname': pathname})
+
+ def punt_socket_deregister(self, port, protocol=0x11, is_ip4=1):
+ """ Unregister punt socket """
+ return self.api(self.papi.punt_socket_deregister,
+ {'punt': {'ipv': is_ip4,
+ 'l4_protocol': protocol,
+ 'l4_port': port}})
+
+ def punt_socket_dump(self, is_ip6=1):
+ """ Dump punt socket"""
+ return self.api(self.papi.punt_socket_dump,
+ {'is_ipv6': is_ip6})
def ip_reassembly_set(self, timeout_ms, max_reassemblies,
expire_walk_interval_ms, is_ip6=0):
return self.api(self.papi.gbp_endpoint_add,
{'endpoint': {
'sw_if_index': sw_if_index,
- 'flags': 0,
'ips': ips,
'n_ips': len(ips),
'mac': mac,
def gbp_endpoint_dump(self):
""" GBP endpoint Dump """
- return self.api(self.papi.gbp_endpoint_dump, {})
+ return self.api(self.papi.gbp_endpoint_dump,
+ {'_no_type_conversion': True})
- def gbp_endpoint_group_add(self, epg, bd,
+ def gbp_endpoint_group_add(self, epg, sclass, bd,
rd, uplink_sw_if_index):
""" GBP endpoint group Add """
return self.api(self.papi.gbp_endpoint_group_add,
'uplink_sw_if_index': uplink_sw_if_index,
'bd_id': bd,
'rd_id': rd,
- 'epg_id': epg
+ 'epg_id': epg,
+ 'sclass': sclass
}})
def gbp_endpoint_group_del(self, epg):
def gbp_bridge_domain_add(self, bd_id, flags,
bvi_sw_if_index,
- uu_fwd_sw_if_index):
+ uu_fwd_sw_if_index,
+ bm_flood_sw_if_index):
""" GBP bridge-domain Add """
return self.api(self.papi.gbp_bridge_domain_add,
{'bd':
'flags': flags,
'bvi_sw_if_index': bvi_sw_if_index,
'uu_fwd_sw_if_index': uu_fwd_sw_if_index,
+ 'bm_flood_sw_if_index': bm_flood_sw_if_index,
'bd_id': bd_id
}})
""" GBP recirc Dump """
return self.api(self.papi.gbp_recirc_dump, {})
+ def gbp_ext_itf_add_del(self, is_add, sw_if_index, bd_id, rd_id):
+ """ GBP recirc Add/Del """
+ return self.api(self.papi.gbp_ext_itf_add_del,
+ {'is_add': is_add,
+ 'ext_itf': {
+ 'sw_if_index': sw_if_index,
+ 'bd_id': bd_id,
+ 'rd_id': rd_id}})
+
+ def gbp_ext_itf_dump(self):
+ """ GBP recirc Dump """
+ return self.api(self.papi.gbp_ext_itf_dump, {})
+
def gbp_subnet_add_del(self, is_add, rd_id,
prefix, type,
sw_if_index=0xffffffff,
def gbp_subnet_dump(self):
""" GBP Subnet Dump """
- return self.api(self.papi.gbp_subnet_dump, {})
+ return self.api(self.papi.gbp_subnet_dump,
+ {'_no_type_conversion': True})
- def gbp_contract_add_del(self, is_add, src_epg, dst_epg, acl_index, rules):
+ def gbp_contract_add_del(self, is_add, src_epg, dst_epg, acl_index,
+ rules, allowed_ethertypes):
""" GBP contract Add/Del """
return self.api(self.papi.gbp_contract_add_del,
{'is_add': is_add,
'src_epg': src_epg,
'dst_epg': dst_epg,
'n_rules': len(rules),
- 'rules': rules}})
+ 'rules': rules,
+ 'n_ether_types': len(allowed_ethertypes),
+ 'allowed_ethertypes': allowed_ethertypes}})
def gbp_contract_dump(self):
""" GBP contract Dump """
'sw_if_index': sw_if_index,
'n_srcs': len(saddrs),
'saddrs': saddrs,
- 'gaddr':
- {
- 'address': gaddr
- }
+ 'gaddr': gaddr
}
})
mode,
lb,
use_custom_mac,
- mac_address=''):
+ mac_address='',
+ interface_id=0xFFFFFFFF):
"""
:param mode: mode
:param lb: load balance
:param use_custom_mac: use custom mac
:param mac_address: mac address
+ :param interface_id: custom interface ID
"""
return self.api(
self.papi.bond_create,
{'mode': mode,
'lb': lb,
'use_custom_mac': use_custom_mac,
- 'mac_address': mac_address
+ 'mac_address': mac_address,
+ 'id': interface_id
})
def bond_delete(
:param max_msg_size: maximum message length (Default value = 480)
"""
return self.api(self.papi.syslog_set_sender,
- {'collector_address': {
- 'address': collector},
- 'src_address': {
- 'address': src},
+ {'collector_address': collector,
+ 'src_address': src,
'collector_port': collector_port,
'vrf_id': vrf_id,
'max_msg_size': max_msg_size})