-import os
import fnmatch
+import os
import time
-from hook import Hook
from collections import deque
+from six import moves
+from vpp_papi import mac_pton
+from hook import Hook
+from vpp_l2 import L2_PORT_TYPE
+
# Sphinx creates auto-generated documentation by importing the python source
# files and collecting the docstrings from them. The NO_VPP_PAPI flag allows
# the vpp_papi_provider.py file to be importable without having to build
# the whole vpp api if the user only wishes to generate the test documentation.
-do_import = True
-try:
- no_vpp_papi = os.getenv("NO_VPP_PAPI")
- if no_vpp_papi == "1":
- do_import = False
-except:
- pass
-if do_import:
+try:
from vpp_papi import VPP
- from vpp_l2 import L2_PORT_TYPE
+except ImportError:
+ if not os.getenv("NO_VPP_PAPI") == 1:
+ raise
+ pass
# from vnet/vnet/mpls/mpls_types.h
MPLS_IETF_MAX_LABEL = 0xfffff
self._expect_stack = []
jsonfiles = []
- install_dir = os.getenv('VPP_TEST_INSTALL_PATH')
+ install_dir = os.getenv('VPP_INSTALL_PATH')
for root, dirnames, filenames in os.walk(install_dir):
for filename in fnmatch.filter(filenames, '*.api.json'):
jsonfiles.append(os.path.join(root, filename))
if hasattr(reply, 'retval') and reply.retval >= 0:
msg = "API call passed unexpectedly: expected negative "\
"return value instead of %d in %s" % \
- (reply.retval, repr(reply))
+ (reply.retval, moves.reprlib.repr(reply))
self.test_class.logger.info(msg)
raise UnexpectedApiReturnValueError(msg)
elif self._expect_api_retval == self._zero:
if hasattr(reply, 'retval') and reply.retval != expected_retval:
msg = "API call failed, expected %d return value instead "\
"of %d in %s" % (expected_retval, reply.retval,
- repr(reply))
+ moves.reprlib.repr(reply))
self.test_class.logger.info(msg)
raise UnexpectedApiReturnValueError(msg)
else:
"""
self.hook.before_cli(cli)
cli += '\n'
- r = self.papi.cli_inband(length=len(cli), cmd=cli)
+ r = self.papi.cli_inband(cmd=cli)
self.hook.after_cli(cli)
if hasattr(r, 'reply'):
- return r.reply.decode().rstrip('\x00')
+ return r.reply
def ppcli(self, cli):
""" Helper method to print CLI command in case of info logging level.
"""
return cli + "\n" + str(self.cli(cli))
- def _convert_mac(self, mac):
- return mac.replace(':', '').decode('hex')
-
def show_version(self):
""" """
return self.api(self.papi.show_version, {})
interface. (Default value = 0)
"""
return self.api(self.papi.l2fib_add_del,
- {'mac': self._convert_mac(mac),
+ {'mac': mac,
'bd_id': bd_id,
'sw_if_index': sw_if_index,
'is_add': is_add,
:param is_static: (Default value = 0)
:param is_no_adj_fib: (Default value = 0)
"""
-
return self.api(
self.papi.ip_neighbor_add_del,
{'sw_if_index': sw_if_index,
def map_add_domain(self,
ip6_prefix,
- ip6_prefix_len,
ip6_src,
- ip6_src_prefix_len,
ip4_prefix,
- ip4_prefix_len,
ea_bits_len=0,
psid_offset=0,
psid_length=0,
self.papi.map_add_domain,
{
'ip6_prefix': ip6_prefix,
- 'ip6_prefix_len': ip6_prefix_len,
'ip4_prefix': ip4_prefix,
- 'ip4_prefix_len': ip4_prefix_len,
'ip6_src': ip6_src,
- 'ip6_src_prefix_len': ip6_src_prefix_len,
'ea_bits_len': ea_bits_len,
'psid_offset': psid_offset,
'psid_length': psid_length,
def vxlan_gbp_tunnel_dump(self, sw_if_index=0xffffffff):
return self.api(self.papi.vxlan_gbp_tunnel_dump,
- {'sw_if_index': sw_if_index})
+ {'sw_if_index': sw_if_index,
+ '_no_type_conversion': True})
def pppoe_add_del_session(
self,
def ip_punt_redirect(self,
rx_sw_if_index,
tx_sw_if_index,
- nh,
- is_ip6=0,
+ address,
is_add=1):
return self.api(self.papi.ip_punt_redirect,
- {'rx_sw_if_index': rx_sw_if_index,
- 'tx_sw_if_index': tx_sw_if_index,
- 'nh': nh,
- 'is_add': is_add,
- 'is_ip6': is_ip6})
+ {'punt': {'rx_sw_if_index': rx_sw_if_index,
+ 'tx_sw_if_index': tx_sw_if_index,
+ 'nh': address},
+ 'is_add': is_add})
+
+ def ip_punt_redirect_dump(self, sw_if_index, is_ipv6=0):
+ return self.api(self.papi.ip_punt_redirect_dump,
+ {'sw_if_index': sw_if_index,
+ 'is_ipv6': is_ipv6})
def bier_table_add_del(self,
bti,
is_tunnel=1,
is_tunnel_ipv6=0,
is_add=1,
- udp_encap=0):
+ udp_encap=0,
+ use_anti_replay=0,
+ use_extended_sequence_number=0):
""" IPSEC SA add/del
:param sad_id: security association ID
:param spi: security param index of the SA in decimal
'is_add': is_add,
'is_tunnel': is_tunnel,
'is_tunnel_ipv6': is_tunnel_ipv6,
- 'udp_encap': udp_encap})
+ 'udp_encap': udp_encap,
+ 'use_extended_sequence_number': use_extended_sequence_number,
+ 'use_anti_replay': use_anti_replay})
def ipsec_spd_add_del_entry(self,
spd_id,
'namespace_id': namespace_id,
'namespace_id_len': len(namespace_id)})
- def punt_socket_register(self, l4_port, pathname, header_version=1,
- is_ip4=1, l4_protocol=0x11):
- """ Punt to socket """
+ def punt_socket_register(self, port, pathname, protocol=0x11,
+ header_version=1, is_ip4=1):
+ """ Register punt socket """
return self.api(self.papi.punt_socket_register,
- {'is_ip4': is_ip4,
- 'l4_protocol': l4_protocol,
- 'l4_port': l4_port,
- 'pathname': pathname,
- 'header_version': header_version})
+ {'header_version': header_version,
+ 'punt': {'ipv': is_ip4,
+ 'l4_protocol': protocol,
+ 'l4_port': port},
+ 'pathname': pathname})
+
+ def punt_socket_deregister(self, port, protocol=0x11, is_ip4=1):
+ """ Unregister punt socket """
+ return self.api(self.papi.punt_socket_deregister,
+ {'punt': {'ipv': is_ip4,
+ 'l4_protocol': protocol,
+ 'l4_port': port}})
+
+ def punt_socket_dump(self, is_ip6=1):
+ """ Dump punt socket"""
+ return self.api(self.papi.punt_socket_dump,
+ {'is_ipv6': is_ip6})
def ip_reassembly_set(self, timeout_ms, max_reassemblies,
expire_walk_interval_ms, is_ip6=0):
def gbp_endpoint_dump(self):
""" GBP endpoint Dump """
- return self.api(self.papi.gbp_endpoint_dump, {})
+ return self.api(self.papi.gbp_endpoint_dump,
+ {'_no_type_conversion': True})
def gbp_endpoint_group_add(self, epg, bd,
rd, uplink_sw_if_index):
""" GBP recirc Dump """
return self.api(self.papi.gbp_recirc_dump, {})
+ def gbp_ext_itf_add_del(self, is_add, sw_if_index, bd_id, rd_id):
+ """ GBP recirc Add/Del """
+ return self.api(self.papi.gbp_ext_itf_add_del,
+ {'is_add': is_add,
+ 'ext_itf': {
+ 'sw_if_index': sw_if_index,
+ 'bd_id': bd_id,
+ 'rd_id': rd_id}})
+
+ def gbp_ext_itf_dump(self):
+ """ GBP recirc Dump """
+ return self.api(self.papi.gbp_ext_itf_dump, {})
+
def gbp_subnet_add_del(self, is_add, rd_id,
prefix, type,
sw_if_index=0xffffffff,
def gbp_subnet_dump(self):
""" GBP Subnet Dump """
- return self.api(self.papi.gbp_subnet_dump, {})
+ return self.api(self.papi.gbp_subnet_dump,
+ {'_no_type_conversion': True})
def gbp_contract_add_del(self, is_add, src_epg, dst_epg, acl_index, rules):
""" GBP contract Add/Del """
'sw_if_index': sw_if_index,
'n_srcs': len(saddrs),
'saddrs': saddrs,
- 'gaddr':
- {
- 'address': gaddr
- }
+ 'gaddr': gaddr
}
})
:param max_msg_size: maximum message length (Default value = 480)
"""
return self.api(self.papi.syslog_set_sender,
- {'collector_address': {
- 'address': collector},
- 'src_address': {
- 'address': src},
+ {'collector_address': collector,
+ 'src_address': src,
'collector_port': collector_port,
'vrf_id': vrf_id,
'max_msg_size': max_msg_size})