-import os
import fnmatch
+import os
import time
-from hook import Hook
from collections import deque
+from six import moves
+from vpp_mac import mactobinary
+from hook import Hook
+from vpp_l2 import L2_PORT_TYPE
+
# Sphinx creates auto-generated documentation by importing the python source
# files and collecting the docstrings from them. The NO_VPP_PAPI flag allows
# the vpp_papi_provider.py file to be importable without having to build
# the whole vpp api if the user only wishes to generate the test documentation.
-do_import = True
-try:
- no_vpp_papi = os.getenv("NO_VPP_PAPI")
- if no_vpp_papi == "1":
- do_import = False
-except:
- pass
-if do_import:
+try:
from vpp_papi import VPP
+except ImportError:
+ if not os.getenv("NO_VPP_PAPI") == 1:
+ raise
+ pass
# from vnet/vnet/mpls/mpls_types.h
MPLS_IETF_MAX_LABEL = 0xfffff
IP = 3
-class L2_PORT_TYPE:
- NORMAL = 0
- BVI = 1
- UU_FWD = 2
-
-
-class BRIDGE_FLAGS:
- NONE = 0
- LEARN = 1
- FWD = 2
- FLOOD = 4
- UU_FLOOD = 8
- ARP_TERM = 16
+class SYSLOG_SEVERITY:
+ EMERG = 0
+ ALERT = 1
+ CRIT = 2
+ ERR = 3
+ WARN = 4
+ NOTICE = 5
+ INFO = 6
+ DBG = 7
class UnexpectedApiReturnValueError(Exception):
_zero, _negative = range(2)
- def __init__(self, name, shm_prefix, test_class):
+ def __init__(self, name, shm_prefix, test_class, read_timeout):
self.hook = Hook("vpp-papi-provider")
self.name = name
self.shm_prefix = shm_prefix
self._expect_stack = []
jsonfiles = []
- install_dir = os.getenv('VPP_TEST_INSTALL_PATH')
+ install_dir = os.getenv('VPP_INSTALL_PATH')
for root, dirnames, filenames in os.walk(install_dir):
for filename in fnmatch.filter(filenames, '*.api.json'):
jsonfiles.append(os.path.join(root, filename))
- self.vpp = VPP(jsonfiles, logger=test_class.logger, read_timeout=5)
+ self.vpp = VPP(jsonfiles, logger=test_class.logger,
+ read_timeout=read_timeout)
self._events = deque()
def __enter__(self):
return self
- def expect_negative_api_retval(self):
- """ Expect API failure """
+ def assert_negative_api_retval(self):
+ """ Expect API failure - used with with, e.g.:
+ with self.vapi.assert_negative_api_retval():
+ self.vapi.<api call expected to fail>
+ """
self._expect_stack.append(self._expect_api_retval)
self._expect_api_retval = self._negative
return self
- def expect_zero_api_retval(self):
- """ Expect API success """
+ def assert_zero_api_retval(self):
+ """ Expect API success - used with with, e.g.:
+ with self.vapi.assert_negative_api_retval():
+ self.vapi.<api call expected to succeed>
+
+ note: this is useful only inside another with block
+ as success is the default expected value
+ """
self._expect_stack.append(self._expect_api_retval)
self._expect_api_retval = self._zero
return self
if hasattr(reply, 'retval') and reply.retval >= 0:
msg = "API call passed unexpectedly: expected negative "\
"return value instead of %d in %s" % \
- (reply.retval, repr(reply))
+ (reply.retval, moves.reprlib.repr(reply))
self.test_class.logger.info(msg)
raise UnexpectedApiReturnValueError(msg)
elif self._expect_api_retval == self._zero:
if hasattr(reply, 'retval') and reply.retval != expected_retval:
msg = "API call failed, expected %d return value instead "\
"of %d in %s" % (expected_retval, reply.retval,
- repr(reply))
+ moves.reprlib.repr(reply))
self.test_class.logger.info(msg)
raise UnexpectedApiReturnValueError(msg)
else:
"""
self.hook.before_cli(cli)
cli += '\n'
- r = self.papi.cli_inband(length=len(cli), cmd=cli)
+ r = self.papi.cli_inband(length=len(cli), cmd=str(cli).encode('utf8'))
self.hook.after_cli(cli)
if hasattr(r, 'reply'):
return r.reply.decode().rstrip('\x00')
return cli + "\n" + str(self.cli(cli))
def _convert_mac(self, mac):
- return mac.replace(':', '').decode('hex')
+ return mactobinary(mac)
def show_version(self):
""" """
{'sw_if_index': sw_if_index, 'is_ipv6': is_ipv6,
'vrf_id': table_id})
+ def sw_interface_get_table(self, sw_if_index, is_ipv6):
+ """ Get the IPvX Table-id for the Interface
+
+ :param sw_if_index:
+ :param is_ipv6:
+ :return table_id
+
+ """
+ return self.api(self.papi.sw_interface_get_table,
+ {'sw_if_index': sw_if_index, 'is_ipv6': is_ipv6})
+
def sw_interface_add_del_address(self, sw_if_index, addr, addr_len,
is_ipv6=0, is_add=1, del_all=0):
"""
'address_length': addr_len,
'address': addr})
+ def ip_address_dump(self, sw_if_index, is_ipv6=0):
+ return self.api(self.papi.ip_address_dump,
+ {'sw_if_index': sw_if_index,
+ 'is_ipv6': is_ipv6})
+
def sw_interface_set_unnumbered(self, sw_if_index, ip_sw_if_index,
is_add=1):
""" Set the Interface to be unnumbered
return self.api(self.papi.bd_ip_mac_add_del,
{'bd_id': bd_id,
'is_add': is_add,
- 'is_ipv6': is_ipv6,
- 'ip_address': ip,
- 'mac_address': mac})
+ 'ip': ip,
+ 'mac': mac})
+
+ def bd_ip_mac_dump(self, bd_id):
+ return self.api(self.papi.bd_ip_mac_dump,
+ {'bd_id': bd_id})
def want_ip4_arp_events(self, enable_disable=1, address=0):
return self.api(self.papi.want_ip4_arp_events,
'enable': enable,
'install_default_routes': install_default_routes})
+ def want_interface_events(self, enable_disable=1):
+ return self.api(self.papi.want_interface_events,
+ {'enable_disable': enable_disable,
+ 'pid': os.getpid(), })
+
def want_macs_learn_events(self, enable_disable=1, scan_delay=0,
max_macs_in_event=0, learn_limit=0):
return self.api(self.papi.want_l2_macs_events,
"""
return self.api(self.papi.l2fib_flush_all, {})
+ def l2_fib_table_dump(self, bd_id):
+ """ Dump the L2 FIB """
+ return self.api(self.papi.l2_fib_table_dump,
+ {'bd_id': bd_id})
+
def sw_interface_set_l2_bridge(self, sw_if_index, bd_id,
shg=0, port_type=L2_PORT_TYPE.NORMAL,
enable=1):
:param is_static: (Default value = 0)
:param is_no_adj_fib: (Default value = 0)
"""
-
return self.api(
self.papi.ip_neighbor_add_del,
{'sw_if_index': sw_if_index,
def bfd_udp_del_echo_source(self):
return self.api(self.papi.bfd_udp_del_echo_source, {})
+ def bfd_udp_get_echo_source(self):
+ return self.api(self.papi.bfd_udp_get_echo_source, {})
+
def classify_add_del_table(
self,
is_add,
def map_add_domain(self,
ip6_prefix,
- ip6_prefix_len,
ip6_src,
- ip6_src_prefix_len,
ip4_prefix,
- ip4_prefix_len,
ea_bits_len=0,
psid_offset=0,
psid_length=0,
self.papi.map_add_domain,
{
'ip6_prefix': ip6_prefix,
- 'ip6_prefix_len': ip6_prefix_len,
'ip4_prefix': ip4_prefix,
- 'ip4_prefix_len': ip4_prefix_len,
'ip6_src': ip6_src,
- 'ip6_src_prefix_len': ip6_src_prefix_len,
'ea_bits_len': ea_bits_len,
'psid_offset': psid_offset,
'psid_length': psid_length,
is_add=1,
is_ipv6=0,
encap_table_id=0,
- decap_next_index=0xFFFFFFFF,
vni=0,
instance=0xFFFFFFFF):
"""
'dst': dst,
'mcast_sw_if_index': mcast_sw_if_index,
'encap_table_id': encap_table_id,
- 'decap_next_index': decap_next_index,
'vni': vni,
'instance': instance}})
def ip_punt_redirect(self,
rx_sw_if_index,
tx_sw_if_index,
- nh,
- is_ip6=0,
+ address,
is_add=1):
return self.api(self.papi.ip_punt_redirect,
- {'rx_sw_if_index': rx_sw_if_index,
- 'tx_sw_if_index': tx_sw_if_index,
- 'nh': nh,
- 'is_add': is_add,
- 'is_ip6': is_ip6})
+ {'punt': {'rx_sw_if_index': rx_sw_if_index,
+ 'tx_sw_if_index': tx_sw_if_index,
+ 'nh': address},
+ 'is_add': is_add})
+
+ def ip_punt_redirect_dump(self, sw_if_index, is_ipv6=0):
+ return self.api(self.papi.ip_punt_redirect_dump,
+ {'sw_if_index': sw_if_index,
+ 'is_ipv6': is_ipv6})
def bier_table_add_del(self,
bti,
tunnel_src_address='',
tunnel_dst_address='',
is_tunnel=1,
+ is_tunnel_ipv6=0,
is_add=1,
- udp_encap=0):
+ udp_encap=0,
+ use_anti_replay=0,
+ use_extended_sequence_number=0):
""" IPSEC SA add/del
:param sad_id: security association ID
:param spi: security param index of the SA in decimal
'crypto_key': crypto_key,
'is_add': is_add,
'is_tunnel': is_tunnel,
- 'udp_encap': udp_encap})
+ 'is_tunnel_ipv6': is_tunnel_ipv6,
+ 'udp_encap': udp_encap,
+ 'use_extended_sequence_number': use_extended_sequence_number,
+ 'use_anti_replay': use_anti_replay})
def ipsec_spd_add_del_entry(self,
spd_id,
priority=100,
is_outbound=1,
is_add=1,
+ is_ipv6=0,
is_ip_any=0):
""" IPSEC policy SPD add/del -
Wrapper to configure ipsec SPD policy entries in VPP
'policy': policy,
'priority': priority,
'is_outbound': is_outbound,
+ 'is_ipv6': is_ipv6,
'is_ip_any': is_ip_any})
def ipsec_tunnel_if_add_del(self, local_ip, remote_ip, local_spi,
'show_instance': show_instance
})
+ def ipsec_select_backend(self, protocol, index):
+ return self.api(self.papi.ipsec_select_backend,
+ {'protocol': protocol, 'index': index})
+
+ def ipsec_backend_dump(self):
+ return self.api(self.papi.ipsec_backend_dump, {})
+
def app_namespace_add(self,
namespace_id,
ip4_fib_id=0,
'namespace_id': namespace_id,
'namespace_id_len': len(namespace_id)})
- def punt_socket_register(self, l4_port, pathname, header_version=1,
- is_ip4=1, l4_protocol=0x11):
- """ Punt to socket """
+ def punt_socket_register(self, port, pathname, protocol=0x11,
+ header_version=1, is_ip4=1):
+ """ Register punt socket """
return self.api(self.papi.punt_socket_register,
- {'is_ip4': is_ip4,
- 'l4_protocol': l4_protocol,
- 'l4_port': l4_port,
- 'pathname': pathname,
- 'header_version': header_version})
+ {'header_version': header_version,
+ 'punt': {'ipv': is_ip4,
+ 'l4_protocol': protocol,
+ 'l4_port': port},
+ 'pathname': pathname})
+
+ def punt_socket_deregister(self, port, protocol=0x11, is_ip4=1):
+ """ Unregister punt socket """
+ return self.api(self.papi.punt_socket_deregister,
+ {'punt': {'ipv': is_ip4,
+ 'l4_protocol': protocol,
+ 'l4_port': port}})
+
+ def punt_socket_dump(self, is_ip6=1):
+ """ Dump punt socket"""
+ return self.api(self.papi.punt_socket_dump,
+ {'is_ipv6': is_ip6})
def ip_reassembly_set(self, timeout_ms, max_reassemblies,
expire_walk_interval_ms, is_ip6=0):
'enable_ip6': 1 if enable_ip6 else 0,
})
- def gbp_endpoint_add(self, sw_if_index, ips, mac, epg):
+ def gbp_endpoint_add(self, sw_if_index, ips, mac, epg, flags,
+ tun_src, tun_dst):
""" GBP endpoint Add """
return self.api(self.papi.gbp_endpoint_add,
{'endpoint': {
'sw_if_index': sw_if_index,
+ 'flags': 0,
'ips': ips,
'n_ips': len(ips),
'mac': mac,
- 'epg_id': epg}})
+ 'epg_id': epg,
+ 'flags': flags,
+ 'tun': {
+ 'src': tun_src,
+ 'dst': tun_dst,
+ }}})
def gbp_endpoint_del(self, handle):
""" GBP endpoint Del """
""" GBP endpoint Dump """
return self.api(self.papi.gbp_endpoint_dump, {})
- def gbp_endpoint_group_add_del(self, is_add, epg, bd,
- ip4_rd,
- ip6_rd,
- uplink_sw_if_index):
- """ GBP endpoint group Add/Del """
- return self.api(self.papi.gbp_endpoint_group_add_del,
- {'is_add': is_add,
- 'epg': {
+ def gbp_endpoint_group_add(self, epg, bd,
+ rd, uplink_sw_if_index):
+ """ GBP endpoint group Add """
+ return self.api(self.papi.gbp_endpoint_group_add,
+ {'epg':
+ {
'uplink_sw_if_index': uplink_sw_if_index,
'bd_id': bd,
- 'ip4_table_id': ip4_rd,
- 'ip6_table_id': ip6_rd,
- 'epg_id': epg}})
+ 'rd_id': rd,
+ 'epg_id': epg
+ }})
+
+ def gbp_endpoint_group_del(self, epg):
+ """ GBP endpoint group Del """
+ return self.api(self.papi.gbp_endpoint_group_del,
+ {'epg_id': epg})
def gbp_endpoint_group_dump(self):
""" GBP endpoint group Dump """
return self.api(self.papi.gbp_endpoint_group_dump, {})
+ def gbp_bridge_domain_add(self, bd_id, flags,
+ bvi_sw_if_index,
+ uu_fwd_sw_if_index):
+ """ GBP bridge-domain Add """
+ return self.api(self.papi.gbp_bridge_domain_add,
+ {'bd':
+ {
+ 'flags': flags,
+ 'bvi_sw_if_index': bvi_sw_if_index,
+ 'uu_fwd_sw_if_index': uu_fwd_sw_if_index,
+ 'bd_id': bd_id
+ }})
+
+ def gbp_bridge_domain_del(self, bd_id):
+ """ GBP bridge-domain Del """
+ return self.api(self.papi.gbp_bridge_domain_del,
+ {'bd_id': bd_id})
+
+ def gbp_bridge_domain_dump(self):
+ """ GBP Bridge Domain Dump """
+ return self.api(self.papi.gbp_bridge_domain_dump, {})
+
+ def gbp_route_domain_add(self, rd_id,
+ ip4_table_id,
+ ip6_table_id,
+ ip4_uu_sw_if_index,
+ ip6_uu_sw_if_index):
+ """ GBP route-domain Add """
+ return self.api(self.papi.gbp_route_domain_add,
+ {'rd':
+ {
+ 'ip4_table_id': ip4_table_id,
+ 'ip6_table_id': ip6_table_id,
+ 'ip4_uu_sw_if_index': ip4_uu_sw_if_index,
+ 'ip6_uu_sw_if_index': ip6_uu_sw_if_index,
+ 'rd_id': rd_id
+ }})
+
+ def gbp_route_domain_del(self, rd_id):
+ """ GBP route-domain Del """
+ return self.api(self.papi.gbp_route_domain_del,
+ {'rd_id': rd_id})
+
+ def gbp_route_domain_dump(self):
+ """ GBP Route Domain Dump """
+ return self.api(self.papi.gbp_route_domain_dump, {})
+
def gbp_recirc_add_del(self, is_add, sw_if_index, epg, is_ext):
""" GBP recirc Add/Del """
return self.api(self.papi.gbp_recirc_add_del,
""" GBP recirc Dump """
return self.api(self.papi.gbp_recirc_dump, {})
- def gbp_subnet_add_del(self, is_add, table_id,
- is_internal,
- prefix,
+ def gbp_ext_itf_add_del(self, is_add, sw_if_index, bd_id, rd_id):
+ """ GBP recirc Add/Del """
+ return self.api(self.papi.gbp_ext_itf_add_del,
+ {'is_add': is_add,
+ 'ext_itf': {
+ 'sw_if_index': sw_if_index,
+ 'bd_id': bd_id,
+ 'rd_id': rd_id}})
+
+ def gbp_ext_itf_dump(self):
+ """ GBP recirc Dump """
+ return self.api(self.papi.gbp_ext_itf_dump, {})
+
+ def gbp_subnet_add_del(self, is_add, rd_id,
+ prefix, type,
sw_if_index=0xffffffff,
- epg_id=0xffff,
- is_ip6=False):
+ epg_id=0xffff):
""" GBP Subnet Add/Del """
return self.api(self.papi.gbp_subnet_add_del,
{'is_add': is_add,
'subnet': {
- 'is_internal': is_internal,
- 'is_ip6': is_ip6,
+ 'type': type,
'sw_if_index': sw_if_index,
'epg_id': epg_id,
'prefix': prefix,
- 'table_id': table_id}})
+ 'rd_id': rd_id}})
def gbp_subnet_dump(self):
""" GBP Subnet Dump """
return self.api(self.papi.gbp_subnet_dump, {})
- def gbp_contract_add_del(self, is_add, src_epg, dst_epg, acl_index):
+ def gbp_contract_add_del(self, is_add, src_epg, dst_epg, acl_index, rules):
""" GBP contract Add/Del """
return self.api(self.papi.gbp_contract_add_del,
{'is_add': is_add,
'contract': {
'acl_index': acl_index,
'src_epg': src_epg,
- 'dst_epg': dst_epg}})
+ 'dst_epg': dst_epg,
+ 'n_rules': len(rules),
+ 'rules': rules}})
def gbp_contract_dump(self):
""" GBP contract Dump """
return self.api(self.papi.gbp_contract_dump, {})
+ def gbp_endpoint_learn_set_inactive_threshold(self, threshold):
+ """ GBP set inactive threshold """
+ return self.api(self.papi.gbp_endpoint_learn_set_inactive_threshold,
+ {'threshold': threshold})
+
+ def gbp_vxlan_tunnel_add(self, vni, bd_rd_id, mode):
+ """ GBP VXLAN tunnel add """
+ return self.api(self.papi.gbp_vxlan_tunnel_add,
+ {
+ 'tunnel': {
+ 'vni': vni,
+ 'mode': mode,
+ 'bd_rd_id': bd_rd_id
+ }
+ })
+
+ def gbp_vxlan_tunnel_del(self, vni):
+ """ GBP VXLAN tunnel del """
+ return self.api(self.papi.gbp_vxlan_tunnel_del,
+ {
+ 'vni': vni,
+ })
+
+ def gbp_vxlan_tunnel_dump(self):
+ """ GBP VXLAN tunnel add/del """
+ return self.api(self.papi.gbp_vxlan_tunnel_dump, {})
+
def ipip_6rd_add_tunnel(self, ip6_table_id, ip6_prefix, ip6_prefix_len,
ip4_table_id, ip4_prefix, ip4_prefix_len, ip4_src,
security_check):
'mode': host,
'sw_if_index': sw_if_index})
+ def igmp_proxy_device_add_del(self, vrf_id, sw_if_index, add):
+ """ Add/del IGMP proxy device """
+ return self.api(self.papi.igmp_proxy_device_add_del,
+ {'vrf_id': vrf_id, 'sw_if_index': sw_if_index,
+ 'add': add})
+
+ def igmp_proxy_device_add_del_interface(self, vrf_id, sw_if_index, add):
+ """ Add/del interface to/from IGMP proxy device """
+ return self.api(self.papi.igmp_proxy_device_add_del_interface,
+ {'vrf_id': vrf_id, 'sw_if_index': sw_if_index,
+ 'add': add})
+
def igmp_listen(self, filter, sw_if_index, saddrs, gaddr):
""" Listen for new (S,G) on specified interface
'sw_if_index': sw_if_index,
'n_srcs': len(saddrs),
'saddrs': saddrs,
- 'gaddr':
- {
- 'address': gaddr
- }
+ 'gaddr': gaddr
}
})
def pipe_dump(self):
return self.api(self.papi.pipe_dump, {})
+
+ def memif_create(
+ self,
+ role,
+ mode,
+ rx_queues=None,
+ tx_queues=None,
+ _id=None,
+ socket_id=None,
+ secret=None,
+ ring_size=None,
+ buffer_size=None,
+ hw_addr=None):
+ return self.api(self.papi.memif_create,
+ {'role': role,
+ 'mode': mode,
+ 'rx_queues': rx_queues,
+ 'tx_queues': tx_queues,
+ 'id': _id,
+ 'socket_id': socket_id,
+ 'secret': secret,
+ 'ring_size': ring_size,
+ 'buffer_size': buffer_size,
+ 'hw_addr': hw_addr})
+
+ def memif_delete(self, sw_if_index):
+ return self.api(self.papi.memif_delete, {'sw_if_index': sw_if_index})
+
+ def memif_dump(self):
+ return self.api(self.papi.memif_dump, {})
+
+ def memif_socket_filename_add_del(
+ self, is_add, socket_id, socket_filename):
+ return self.api(
+ self.papi.memif_socket_filename_add_del,
+ {'is_add': is_add,
+ 'socket_id': socket_id,
+ 'socket_filename': socket_filename})
+
+ def memif_socket_filename_dump(self):
+ return self.api(self.papi.memif_socket_filename_dump, {})
+
+ def svs_table_add_del(self, af, table_id, is_add=1):
+ return self.api(self.papi.svs_table_add_del,
+ {
+ 'table_id': table_id,
+ 'is_add': is_add,
+ 'af': af,
+ })
+
+ def svs_route_add_del(self, table_id, prefix, src_table_id, is_add=1):
+ return self.api(self.papi.svs_route_add_del,
+ {
+ 'table_id': table_id,
+ 'source_table_id': src_table_id,
+ 'prefix': prefix,
+ 'is_add': is_add,
+ })
+
+ def svs_enable_disable(self, af, table_id, sw_if_index, is_enable=1):
+ return self.api(self.papi.svs_enable_disable,
+ {
+ 'af': af,
+ 'table_id': table_id,
+ 'sw_if_index': sw_if_index,
+ 'is_enable': is_enable,
+ })
+
+ def svs_dump(self):
+ return self.api(self.papi.svs_dump, {})
+
+ def syslog_set_sender(
+ self,
+ collector,
+ src,
+ collector_port=514,
+ vrf_id=0,
+ max_msg_size=480):
+ """Set syslog sender configuration
+
+ :param collector: colector IP address
+ :param src: source IP address
+ :param collector_port: collector UDP port (Default value = 514)
+ :param vrf_id: VRF id (Default value = 0)
+ :param max_msg_size: maximum message length (Default value = 480)
+ """
+ return self.api(self.papi.syslog_set_sender,
+ {'collector_address': collector,
+ 'src_address': src,
+ 'collector_port': collector_port,
+ 'vrf_id': vrf_id,
+ 'max_msg_size': max_msg_size})
+
+ def syslog_get_sender(self):
+ """Return syslog sender configuration"""
+ return self.api(self.papi.syslog_get_sender, {})
+
+ def syslog_set_filter(self, severity):
+ """Set syslog filter parameters
+
+ :param severity: severity filter (specified severity and greater match)
+ """
+ return self.api(self.papi.syslog_set_filter, {'severity': severity})
+
+ def syslog_get_filter(self):
+ """Return syslog filter parameters"""
+ return self.api(self.papi.syslog_get_filter, {})