import fnmatch
import time
from hook import Hook
+from collections import deque
# Sphinx creates auto-generated documentation by importing the python source
-# files and collecting the docstrings from them. The NO_VPP_PAPI flag allows the
-# vpp_papi_provider.py file to be importable without having to build the whole
-# vpp api if the user only wishes to generate the test documentation.
+# files and collecting the docstrings from them. The NO_VPP_PAPI flag allows
+# the vpp_papi_provider.py file to be importable without having to build
+# the whole vpp api if the user only wishes to generate the test documentation.
do_import = True
try:
no_vpp_papi = os.getenv("NO_VPP_PAPI")
jsonfiles.append(os.path.join(root, filename))
self.papi = VPP(jsonfiles)
- self._events = list()
+ self._events = deque()
def register_hook(self, hook):
"""Replace hook registration with new hook
self.hook = hook
def collect_events(self):
+ """ Collect all events from the internal queue and clear the queue. """
e = self._events
- self._events = list()
+ self._events = deque()
return e
def wait_for_event(self, timeout, name=None):
+ """ Wait for and return next event. """
+ if self._events:
+ self.test_class.logger.debug("Not waiting, event already queued")
limit = time.time() + timeout
while time.time() < limit:
if self._events:
- e = self._events.pop(0)
+ e = self._events.popleft()
if name and type(e).__name__ != name:
raise Exception(
"Unexpected event received: %s, expected: %s" %
(type(e).__name__, name))
+ self.test_class.logger.debug("Returning event %s:%s" %
+ (name, e))
return e
time.sleep(0) # yield
if name is not None:
raise Exception("Event did not occur within timeout")
def __call__(self, name, event):
+ """ Enqueue event in the internal event queue. """
# FIXME use the name instead of relying on type(e).__name__ ?
# FIXME #2 if this throws, it is eaten silently, Ole?
+ self.test_class.logger.debug("New event: %s: %s" % (name, event))
self._events.append(event)
def connect(self):
self.papi.disconnect()
def api(self, api_fn, api_args, expected_retval=0):
- """Call API function and check it's return value
+ """ Call API function and check it's return value.
Call the appropriate hooks before and after the API call
:param api_fn: API function to call
if hasattr(reply, 'retval') and reply.retval != expected_retval:
msg = "API call failed, expected retval == %d, got %s" % (
expected_retval, repr(reply))
- self.test_class.logger.error(msg)
+ self.test_class.logger.info(msg)
raise Exception(msg)
self.hook.after_api(api_fn.__name__, api_args)
return reply
def cli(self, cli):
- """
- Execute a CLI, calling the before/after hooks appropriately.
+ """ Execute a CLI, calling the before/after hooks appropriately.
:param cli: CLI to execute
:returns: CLI output
return r.reply.decode().rstrip('\x00')
def ppcli(self, cli):
- """
- Helping method to print CLI command in case of info logging level.
+ """ Helper method to print CLI command in case of info logging level.
:param cli: CLI to execute
:returns: CLI output
return self.api(self.papi.sw_interface_dump, args)
def sw_interface_set_table(self, sw_if_index, is_ipv6, table_id):
- """
- Set the IPvX Table-id for the Interface
+ """ Set the IPvX Table-id for the Interface
:param sw_if_index:
:param is_ipv6:
return self.api(self.papi.sw_interface_ip6nd_ra_config,
{'sw_if_index': sw_if_index})
+ def ip6_sw_interface_ra_config(self, sw_if_index,
+ suppress,
+ send_unicast,):
+ return self.api(self.papi.sw_interface_ip6nd_ra_config,
+ {'sw_if_index': sw_if_index,
+ 'suppress': suppress,
+ 'send_unicast': send_unicast})
+
+ def ip6_sw_interface_enable_disable(self, sw_if_index, enable):
+ """
+ Enable/Disable An interface for IPv6
+ """
+ return self.api(self.papi.sw_interface_ip6_enable_disable,
+ {'sw_if_index': sw_if_index,
+ 'enable': enable})
+
def vxlan_add_del_tunnel(
self,
src_addr,
return self.api(self.papi.create_loopback,
{'mac_address': mac})
+ def delete_loopback(self, sw_if_index):
+ return self.api(self.papi.delete_loopback,
+ {'sw_if_index': sw_if_index, })
+
def ip_add_del_route(
self,
dst_address,
'next_hop_via_label': next_hop_via_label,
'next_hop_out_label_stack': next_hop_out_label_stack})
+ def ip_fib_dump(self):
+ return self.api(self.papi.ip_fib_dump, {})
+
def ip_neighbor_add_del(self,
sw_if_index,
mac_address,
}
)
+ def reset_vrf(self,
+ vrf_id,
+ is_ipv6=0,
+ ):
+ """ Reset VRF (remove all routes etc.) request.
+
+ :param int vrf_id: ID of the FIB table / VRF to reset.
+ :param int is_ipv6: 1 for IPv6 neighbor, 0 for IPv4. (Default = 0)
+ """
+
+ return self.api(
+ self.papi.reset_vrf,
+ {'vrf_id': vrf_id,
+ 'is_ipv6': is_ipv6,
+ }
+ )
+
+ def reset_fib(self,
+ vrf_id,
+ is_ipv6=0,
+ ):
+ """ Reset VRF (remove all routes etc.) request.
+
+ :param int vrf_id: ID of the FIB table / VRF to reset.
+ :param int is_ipv6: 1 for IPv6 neighbor, 0 for IPv4. (Default = 0)
+ """
+
+ return self.api(
+ self.papi.reset_fib,
+ {'vrf_id': vrf_id,
+ 'is_ipv6': is_ipv6,
+ }
+ )
+
+ def ip_dump(self,
+ is_ipv6=0,
+ ):
+ """ Return IP dump.
+
+ :param int is_ipv6: 1 for IPv6 neighbor, 0 for IPv4. (Default = 0)
+ """
+
+ return self.api(
+ self.papi.ip_dump,
+ {'is_ipv6': is_ipv6,
+ }
+ )
+
def sw_interface_span_enable_disable(
self, sw_if_index_from, sw_if_index_to, state=1):
"""
:param sw_if_index_from:
:param sw_if_index_to:
- :param enable
-
+ :param state:
"""
return self.api(self.papi.sw_interface_span_enable_disable,
{'sw_if_index_from': sw_if_index_from,
"""
return self.api(self.papi.snat_static_mapping_dump, {})
+ def snat_show_config(self):
+ """Show S-NAT config
+ :return: S-NAT config parameters
+ """
+ return self.api(self.papi.snat_show_config, {})
+
+ def snat_add_interface_addr(
+ self,
+ sw_if_index,
+ is_add=1):
+ """Add/del S-NAT address from interface
+
+ :param sw_if_index: Software index of the interface
+ :param is_add: 1 if add, 0 if delete (Default value = 1)
+ """
+ return self.api(self.papi.snat_add_del_interface_addr,
+ {'is_add': is_add, 'sw_if_index': sw_if_index})
+
+ def snat_interface_addr_dump(self):
+ """Dump S-NAT addresses interfaces
+ :return: Dictionary of S-NAT addresses interfaces
+ """
+ return self.api(self.papi.snat_interface_addr_dump, {})
+
+ def snat_ipfix(
+ self,
+ domain_id=1,
+ src_port=4739,
+ enable=1):
+ """Enable/disable S-NAT IPFIX logging
+
+ :param domain_id: Observation domain ID (Default value = 1)
+ :param src_port: Source port number (Default value = 4739)
+ :param enable: 1 if enable, 0 if disable (Default value = 1)
+ """
+ return self.api(
+ self.papi.snat_ipfix_enable_disable,
+ {'domain_id': domain_id,
+ 'src_port': src_port,
+ 'enable': enable})
+
def control_ping(self):
self.api(self.papi.control_ping)
'enable_disable': enable_disable,
'pid': os.getpid(),
})
+
+ def classify_add_del_table(
+ self,
+ is_add,
+ mask,
+ match_n_vectors=1,
+ table_index=0xFFFFFFFF,
+ nbuckets=2,
+ memory_size=2097152,
+ skip_n_vectors=0,
+ next_table_index=0xFFFFFFFF,
+ miss_next_index=0xFFFFFFFF,
+ current_data_flag=0,
+ current_data_offset=0):
+ """
+ :param is_add:
+ :param mask:
+ :param match_n_vectors: (Default value = 1)
+ :param table_index: (Default value = 0xFFFFFFFF)
+ :param nbuckets: (Default value = 2)
+ :param memory_size: (Default value = 2097152)
+ :param skip_n_vectors: (Default value = 0)
+ :param next_table_index: (Default value = 0xFFFFFFFF)
+ :param miss_next_index: (Default value = 0xFFFFFFFF)
+ :param current_data_flag: (Default value = 0)
+ :param current_data_offset: (Default value = 0)
+ """
+
+ return self.api(
+ self.papi.classify_add_del_table,
+ {'is_add': is_add,
+ 'table_index': table_index,
+ 'nbuckets': nbuckets,
+ 'memory_size': memory_size,
+ 'skip_n_vectors': skip_n_vectors,
+ 'match_n_vectors': match_n_vectors,
+ 'next_table_index': next_table_index,
+ 'miss_next_index': miss_next_index,
+ 'current_data_flag': current_data_flag,
+ 'current_data_offset': current_data_offset,
+ 'mask': mask})
+
+ def classify_add_del_session(
+ self,
+ is_add,
+ table_index,
+ match,
+ opaque_index=0xFFFFFFFF,
+ hit_next_index=0xFFFFFFFF,
+ advance=0,
+ action=0,
+ metadata=0):
+ """
+ :param is_add:
+ :param table_index:
+ :param match:
+ :param opaque_index: (Default value = 0xFFFFFFFF)
+ :param hit_next_index: (Default value = 0xFFFFFFFF)
+ :param advance: (Default value = 0)
+ :param action: (Default value = 0)
+ :param metadata: (Default value = 0)
+ """
+
+ return self.api(
+ self.papi.classify_add_del_session,
+ {'is_add': is_add,
+ 'table_index': table_index,
+ 'hit_next_index': hit_next_index,
+ 'opaque_index': opaque_index,
+ 'advance': advance,
+ 'action': action,
+ 'metadata': metadata,
+ 'match': match})
+
+ def input_acl_set_interface(
+ self,
+ is_add,
+ sw_if_index,
+ ip4_table_index=0xFFFFFFFF,
+ ip6_table_index=0xFFFFFFFF,
+ l2_table_index=0xFFFFFFFF):
+ """
+ :param is_add:
+ :param sw_if_index:
+ :param ip4_table_index: (Default value = 0xFFFFFFFF)
+ :param ip6_table_index: (Default value = 0xFFFFFFFF)
+ :param l2_table_index: (Default value = 0xFFFFFFFF)
+ """
+
+ return self.api(
+ self.papi.input_acl_set_interface,
+ {'sw_if_index': sw_if_index,
+ 'ip4_table_index': ip4_table_index,
+ 'ip6_table_index': ip6_table_index,
+ 'l2_table_index': l2_table_index,
+ 'is_add': is_add})
+
+ def set_ipfix_exporter(
+ self,
+ collector_address,
+ src_address,
+ path_mtu,
+ template_interval,
+ vrf_id=0,
+ collector_port=4739,
+ udp_checksum=0):
+ return self.api(
+ self.papi.set_ipfix_exporter,
+ {
+ 'collector_address': collector_address,
+ 'collector_port': collector_port,
+ 'src_address': src_address,
+ 'vrf_id': vrf_id,
+ 'path_mtu': path_mtu,
+ 'template_interval': template_interval,
+ 'udp_checksum': udp_checksum,
+ })