kwargs['_vl_msg_id'] = i
no_type_conversion = kwargs.pop('_no_type_conversion', False)
+ timeout = kwargs.pop('_timeout', None)
try:
if self.transport.socket_index:
# Block until we get a reply.
rl = []
while (True):
- r = self.read_blocking(no_type_conversion)
+ r = self.read_blocking(no_type_conversion, timeout)
if r is None:
raise VPPIOError(2, 'VPP API client: read failed')
msgname = type(r).__name__
self.transport.write(b)
return context
- def read_blocking(self, no_type_conversion=False):
+ def read_blocking(self, no_type_conversion=False, timeout=None):
"""Get next received message from transport within timeout, decoded.
- Note that noticifations have context zero
+ Note that notifications have context zero
and are not put into receive queue (at least for socket transport),
use async_thread with registered callback for processing them.
:type no_type_conversion: bool
:returns: Decoded message, or None if no message (within timeout).
:rtype: Whatever VPPType.unpack returns, depends on no_type_conversion.
+ :raises VppTransportShmemIOError if timed out.
"""
- msg = self.transport.read()
+ msg = self.transport.read(timeout=timeout)
if not msg:
return None
return self.decode_incoming_msg(msg, no_type_conversion)
raise VppTransportShmemIOError(1, 'Not connected')
return vpp_api.vac_write(bytes(buf), len(buf))
- def read(self):
+ def read(self, timeout=None):
if not self.connected:
raise VppTransportShmemIOError(1, 'Not connected')
+ if timeout is None:
+ timeout = self.read_timeout
mem = ffi.new("char **")
size = ffi.new("int *")
- rv = vpp_api.vac_read(mem, size, self.read_timeout)
+ rv = vpp_api.vac_read(mem, size, timeout)
if rv:
strerror = 'vac_read failed. It is likely that VPP died.'
raise VppTransportShmemIOError(rv, strerror)
return msg
raise VppTransportSocketIOError(1, 'Unknown socket read error')
- def read(self):
+ def read(self, timeout=None):
if not self.connected:
raise VppTransportSocketIOError(1, 'Not connected')
+ if timeout is None:
+ timeout = self.read_timeout
try:
- return self.q.get(True, self.read_timeout)
+ return self.q.get(True, timeout)
except queue.Empty:
return None
@classmethod
def setUpClass(cls):
# using the framework default
- # cls.vapi_response_timeout = 5
+ cls.vapi_response_timeout = 5
super(TestCLI, cls).setUpClass()
@classmethod
vpp_transport_shmem.VppTransportShmemIOError) as ctx:
rv = self.vapi.papi.cli_inband(cmd='wait 10')
+ def test_long_cli_delay_override(self):
+ """ Test per-command _timeout option.""" # noqa
+ rv = self.vapi.papi.cli_inband(cmd='wait 10', _timeout=15)
+ self.assertEqual(rv.retval, 0)
+
class TestCLIExtendedVapiTimeout(VppTestCase):
maxDiff = None