_zero, _negative = range(2)
- def __init__(self, name, shm_prefix, test_class):
+ def __init__(self, name, shm_prefix, test_class, read_timeout):
self.hook = Hook("vpp-papi-provider")
self.name = name
self.shm_prefix = shm_prefix
for filename in fnmatch.filter(filenames, '*.api.json'):
jsonfiles.append(os.path.join(root, filename))
- self.vpp = VPP(jsonfiles, logger=test_class.logger, read_timeout=5)
+ self.vpp = VPP(jsonfiles, logger=test_class.logger,
+ read_timeout=read_timeout)
self._events = deque()
def __enter__(self):
'enable': enable,
'install_default_routes': install_default_routes})
+ def want_interface_events(self, enable_disable=1):
+ return self.api(self.papi.want_interface_events,
+ {'enable_disable': enable_disable,
+ 'pid': os.getpid(), })
+
def want_macs_learn_events(self, enable_disable=1, scan_delay=0,
max_macs_in_event=0, learn_limit=0):
return self.api(self.papi.want_l2_macs_events,
'mt_next_hop_via_label': next_hop_via_label,
'mt_next_hop_out_label_stack': next_hop_out_label_stack})
- def mpls_tunnel_dump(self):
- return self.api(self.papi.mpls_tunnel_dump, {})
+ def mpls_tunnel_dump(self, sw_if_index=0xffffffff):
+ return self.api(self.papi.mpls_tunnel_dump,
+ {'sw_if_index': sw_if_index})
def nat44_interface_add_del_feature(
self,
def bfd_udp_del_echo_source(self):
return self.api(self.papi.bfd_udp_del_echo_source, {})
+ def bfd_udp_get_echo_source(self):
+ return self.api(self.papi.bfd_udp_get_echo_source, {})
+
def classify_add_del_table(
self,
is_add,
tunnel_src_address='',
tunnel_dst_address='',
is_tunnel=1,
+ is_tunnel_ipv6=0,
is_add=1,
udp_encap=0):
""" IPSEC SA add/del
'crypto_key': crypto_key,
'is_add': is_add,
'is_tunnel': is_tunnel,
+ 'is_tunnel_ipv6': is_tunnel_ipv6,
'udp_encap': udp_encap})
def ipsec_spd_add_del_entry(self,
priority=100,
is_outbound=1,
is_add=1,
+ is_ipv6=0,
is_ip_any=0):
""" IPSEC policy SPD add/del -
Wrapper to configure ipsec SPD policy entries in VPP
'policy': policy,
'priority': priority,
'is_outbound': is_outbound,
+ 'is_ipv6': is_ipv6,
'is_ip_any': is_ip_any})
def ipsec_tunnel_if_add_del(self, local_ip, remote_ip, local_spi,
def pipe_dump(self):
return self.api(self.papi.pipe_dump, {})
+
+ def memif_create(
+ self,
+ role,
+ mode,
+ rx_queues=None,
+ tx_queues=None,
+ _id=None,
+ socket_id=None,
+ secret=None,
+ ring_size=None,
+ buffer_size=None,
+ hw_addr=None):
+ return self.api(self.papi.memif_create,
+ {'role': role,
+ 'mode': mode,
+ 'rx_queues': rx_queues,
+ 'tx_queues': tx_queues,
+ 'id': _id,
+ 'socket_id': socket_id,
+ 'secret': secret,
+ 'ring_size': ring_size,
+ 'buffer_size': buffer_size,
+ 'hw_addr': hw_addr})
+
+ def memif_delete(self, sw_if_index):
+ return self.api(self.papi.memif_delete, {'sw_if_index': sw_if_index})
+
+ def memif_dump(self):
+ return self.api(self.papi.memif_dump, {})
+
+ def memif_socket_filename_add_del(
+ self, is_add, socket_id, socket_filename):
+ return self.api(
+ self.papi.memif_socket_filename_add_del,
+ {'is_add': is_add,
+ 'socket_id': socket_id,
+ 'socket_filename': socket_filename})
+
+ def memif_socket_filename_dump(self):
+ return self.api(self.papi.memif_socket_filename_dump, {})
+
+ def svs_table_add_del(self, af, table_id, is_add=1):
+ return self.api(self.papi.svs_table_add_del,
+ {
+ 'table_id': table_id,
+ 'is_add': is_add,
+ 'af': af,
+ })
+
+ def svs_route_add_del(self, table_id, prefix, src_table_id, is_add=1):
+ return self.api(self.papi.svs_route_add_del,
+ {
+ 'table_id': table_id,
+ 'source_table_id': src_table_id,
+ 'prefix': prefix,
+ 'is_add': is_add,
+ })
+
+ def svs_enable_disable(self, af, table_id, sw_if_index, is_enable=1):
+ return self.api(self.papi.svs_enable_disable,
+ {
+ 'af': af,
+ 'table_id': table_id,
+ 'sw_if_index': sw_if_index,
+ 'is_enable': is_enable,
+ })
+
+ def svs_dump(self):
+ return self.api(self.papi.svs_dump, {})