def wait_for_event(self, timeout, name=None):
""" Wait for and return next event. """
+ if name:
+ self.test_class.logger.debug("Expecting event within %ss",
+ timeout)
+ else:
+ self.test_class.logger.debug("Expecting event '%s' within %ss",
+ name, timeout)
if self._events:
self.test_class.logger.debug("Not waiting, event already queued")
limit = time.time() + timeout
(name, e))
return e
time.sleep(0) # yield
- if name is not None:
- raise Exception("Event %s did not occur within timeout" % name)
raise Exception("Event did not occur within timeout")
def __call__(self, name, event):
'outer_fib_id': outer_fib_id}
)
+ def mpls_fib_dump(self):
+ return self.api(self.papi.mpls_fib_dump, {})
+
def mpls_route_add_del(
self,
label,
external_port=0,
addr_only=1,
vrf_id=0,
+ protocol=0,
is_add=1,
is_ip4=1):
"""Add/delete S-NAT static mapping
:param external_port: External port number (Default value = 0)
:param addr_only: 1 if address only mapping, 0 if address and port
:param vrf_id: VRF ID
+ :param protocol: IP protocol (Default value = 0)
:param is_add: 1 if add, 0 if delete (Default value = 1)
:param is_ip4: 1 if address type is IPv4 (Default value = 1)
"""
'local_port': local_port,
'external_port': external_port,
'external_sw_if_index': external_sw_if_index,
- 'vrf_id': vrf_id})
+ 'vrf_id': vrf_id,
+ 'protocol': protocol})
def snat_add_address_range(
self,
self.api(self.papi.control_ping)
def bfd_udp_add(self, sw_if_index, desired_min_tx, required_min_rx,
+ detect_mult, local_addr, peer_addr, is_ipv6=0,
+ bfd_key_id=None, conf_key_id=None):
+ if bfd_key_id is None:
+ return self.api(self.papi.bfd_udp_add,
+ {
+ 'sw_if_index': sw_if_index,
+ 'desired_min_tx': desired_min_tx,
+ 'required_min_rx': required_min_rx,
+ 'local_addr': local_addr,
+ 'peer_addr': peer_addr,
+ 'is_ipv6': is_ipv6,
+ 'detect_mult': detect_mult,
+ })
+ else:
+ return self.api(self.papi.bfd_udp_add,
+ {
+ 'sw_if_index': sw_if_index,
+ 'desired_min_tx': desired_min_tx,
+ 'required_min_rx': required_min_rx,
+ 'local_addr': local_addr,
+ 'peer_addr': peer_addr,
+ 'is_ipv6': is_ipv6,
+ 'detect_mult': detect_mult,
+ 'is_authenticated': 1,
+ 'bfd_key_id': bfd_key_id,
+ 'conf_key_id': conf_key_id,
+ })
+
+ def bfd_udp_mod(self, sw_if_index, desired_min_tx, required_min_rx,
detect_mult, local_addr, peer_addr, is_ipv6=0):
- return self.api(self.papi.bfd_udp_add,
+ return self.api(self.papi.bfd_udp_mod,
{
'sw_if_index': sw_if_index,
'desired_min_tx': desired_min_tx,
'detect_mult': detect_mult,
})
+ def bfd_udp_auth_activate(self, sw_if_index, local_addr, peer_addr,
+ is_ipv6=0, bfd_key_id=None, conf_key_id=None,
+ is_delayed=False):
+ return self.api(self.papi.bfd_udp_auth_activate,
+ {
+ 'sw_if_index': sw_if_index,
+ 'local_addr': local_addr,
+ 'peer_addr': peer_addr,
+ 'is_ipv6': is_ipv6,
+ 'is_delayed': 1 if is_delayed else 0,
+ 'bfd_key_id': bfd_key_id,
+ 'conf_key_id': conf_key_id,
+ })
+
+ def bfd_udp_auth_deactivate(self, sw_if_index, local_addr, peer_addr,
+ is_ipv6=0, is_delayed=False):
+ return self.api(self.papi.bfd_udp_auth_deactivate,
+ {
+ 'sw_if_index': sw_if_index,
+ 'local_addr': local_addr,
+ 'peer_addr': peer_addr,
+ 'is_ipv6': is_ipv6,
+ 'is_delayed': 1 if is_delayed else 0,
+ })
+
def bfd_udp_del(self, sw_if_index, local_addr, peer_addr, is_ipv6=0):
return self.api(self.papi.bfd_udp_del,
{
def bfd_udp_session_dump(self):
return self.api(self.papi.bfd_udp_session_dump, {})
- def bfd_session_set_flags(self, bs_idx, admin_up_down):
- return self.api(self.papi.bfd_session_set_flags, {
- 'bs_index': bs_idx,
+ def bfd_udp_session_set_flags(self, admin_up_down, sw_if_index, local_addr,
+ peer_addr, is_ipv6=0):
+ return self.api(self.papi.bfd_udp_session_set_flags, {
'admin_up_down': admin_up_down,
+ 'sw_if_index': sw_if_index,
+ 'local_addr': local_addr,
+ 'peer_addr': peer_addr,
+ 'is_ipv6': is_ipv6,
})
def want_bfd_events(self, enable_disable=1):
'pid': os.getpid(),
})
+ def bfd_auth_set_key(self, conf_key_id, auth_type, key):
+ return self.api(self.papi.bfd_auth_set_key, {
+ 'conf_key_id': conf_key_id,
+ 'auth_type': auth_type,
+ 'key': key,
+ 'key_len': len(key),
+ })
+
+ def bfd_auth_del_key(self, conf_key_id):
+ return self.api(self.papi.bfd_auth_del_key, {
+ 'conf_key_id': conf_key_id,
+ })
+
+ def bfd_auth_keys_dump(self):
+ return self.api(self.papi.bfd_auth_keys_dump, {})
+
def classify_add_del_table(
self,
is_add,
def mfib_signal_dump(self):
return self.api(self.papi.mfib_signal_dump, {})
+
+ def ip_mfib_dump(self):
+ return self.api(self.papi.ip_mfib_dump, {})