from collections import deque
from six import moves
-
+from vpp_mac import mactobinary
from hook import Hook
+from vpp_l2 import L2_PORT_TYPE
# Sphinx creates auto-generated documentation by importing the python source
# files and collecting the docstrings from them. The NO_VPP_PAPI flag allows
# the vpp_papi_provider.py file to be importable without having to build
# the whole vpp api if the user only wishes to generate the test documentation.
-do_import = True
-try:
- no_vpp_papi = os.getenv("NO_VPP_PAPI")
- if no_vpp_papi == "1":
- do_import = False
-except:
- pass
-if do_import:
+try:
from vpp_papi import VPP
- from vpp_l2 import L2_PORT_TYPE
+except ImportError:
+ if not os.getenv("NO_VPP_PAPI") == 1:
+ raise
+ pass
# from vnet/vnet/mpls/mpls_types.h
MPLS_IETF_MAX_LABEL = 0xfffff
"""
self.hook.before_cli(cli)
cli += '\n'
- r = self.papi.cli_inband(length=len(cli), cmd=cli)
+ r = self.papi.cli_inband(length=len(cli), cmd=str(cli).encode('utf8'))
self.hook.after_cli(cli)
if hasattr(r, 'reply'):
return r.reply.decode().rstrip('\x00')
return cli + "\n" + str(self.cli(cli))
def _convert_mac(self, mac):
- return mac.replace(':', '').decode('hex')
+ return mactobinary(mac)
def show_version(self):
""" """
:param is_static: (Default value = 0)
:param is_no_adj_fib: (Default value = 0)
"""
-
return self.api(
self.papi.ip_neighbor_add_del,
{'sw_if_index': sw_if_index,
'namespace_id': namespace_id,
'namespace_id_len': len(namespace_id)})
- def punt_socket_register(self, l4_port, pathname, header_version=1,
- is_ip4=1, l4_protocol=0x11):
- """ Punt to socket """
+ def punt_socket_register(self, port, pathname, protocol=0x11,
+ header_version=1, is_ip4=1):
+ """ Register punt socket """
return self.api(self.papi.punt_socket_register,
- {'is_ip4': is_ip4,
- 'l4_protocol': l4_protocol,
- 'l4_port': l4_port,
- 'pathname': pathname,
- 'header_version': header_version})
+ {'header_version': header_version,
+ 'punt': {'ipv': is_ip4,
+ 'l4_protocol': protocol,
+ 'l4_port': port},
+ 'pathname': pathname})
+
+ def punt_socket_deregister(self, port, protocol=0x11, is_ip4=1):
+ """ Unregister punt socket """
+ return self.api(self.papi.punt_socket_deregister,
+ {'punt': {'ipv': is_ip4,
+ 'l4_protocol': protocol,
+ 'l4_port': port}})
+
+ def punt_socket_dump(self, is_ip6=1):
+ """ Dump punt socket"""
+ return self.api(self.papi.punt_socket_dump,
+ {'is_ipv6': is_ip6})
def ip_reassembly_set(self, timeout_ms, max_reassemblies,
expire_walk_interval_ms, is_ip6=0):