_zero, _negative = range(2)
- def __init__(self, name, shm_prefix, test_class):
+ def __init__(self, name, shm_prefix, test_class, read_timeout):
self.hook = Hook("vpp-papi-provider")
self.name = name
self.shm_prefix = shm_prefix
for filename in fnmatch.filter(filenames, '*.api.json'):
jsonfiles.append(os.path.join(root, filename))
- self.vpp = VPP(jsonfiles, logger=test_class.logger, read_timeout=5)
+ self.vpp = VPP(jsonfiles, logger=test_class.logger,
+ read_timeout=read_timeout)
self._events = deque()
def __enter__(self):
{'bd_id': bd_id,
'is_add': is_add,
'is_ipv6': is_ipv6,
- 'ip_address': ip,
- 'mac_address': mac})
+ 'ip': ip,
+ 'mac': mac})
def want_ip4_arp_events(self, enable_disable=1, address=0):
return self.api(self.papi.want_ip4_arp_events,
'enable': enable,
'install_default_routes': install_default_routes})
+ def want_interface_events(self, enable_disable=1):
+ return self.api(self.papi.want_interface_events,
+ {'enable_disable': enable_disable,
+ 'pid': os.getpid(), })
+
def want_macs_learn_events(self, enable_disable=1, scan_delay=0,
max_macs_in_event=0, learn_limit=0):
return self.api(self.papi.want_l2_macs_events,
tunnel_src_address='',
tunnel_dst_address='',
is_tunnel=1,
+ is_tunnel_ipv6=0,
is_add=1,
udp_encap=0):
""" IPSEC SA add/del
'crypto_key': crypto_key,
'is_add': is_add,
'is_tunnel': is_tunnel,
+ 'is_tunnel_ipv6': is_tunnel_ipv6,
'udp_encap': udp_encap})
def ipsec_spd_add_del_entry(self,
priority=100,
is_outbound=1,
is_add=1,
+ is_ipv6=0,
is_ip_any=0):
""" IPSEC policy SPD add/del -
Wrapper to configure ipsec SPD policy entries in VPP
'policy': policy,
'priority': priority,
'is_outbound': is_outbound,
+ 'is_ipv6': is_ipv6,
'is_ip_any': is_ip_any})
def ipsec_tunnel_if_add_del(self, local_ip, remote_ip, local_spi,
'mode': host,
'sw_if_index': sw_if_index})
+ def igmp_proxy_device_add_del(self, vrf_id, sw_if_index, add):
+ """ Add/del IGMP proxy device """
+ return self.api(self.papi.igmp_proxy_device_add_del,
+ {'vrf_id': vrf_id, 'sw_if_index': sw_if_index,
+ 'add': add})
+
+ def igmp_proxy_device_add_del_interface(self, vrf_id, sw_if_index, add):
+ """ Add/del interface to/from IGMP proxy device """
+ return self.api(self.papi.igmp_proxy_device_add_del_interface,
+ {'vrf_id': vrf_id, 'sw_if_index': sw_if_index,
+ 'add': add})
+
def igmp_listen(self, filter, sw_if_index, saddrs, gaddr):
""" Listen for new (S,G) on specified interface