from .trex_stl_async_client import CTRexAsyncClient
from .utils import parsing_opts, text_tables, common
-from .utils.common import list_intersect, list_difference, is_sub_list
+from .utils.common import list_intersect, list_difference, is_sub_list, PassiveTimer
from .utils.text_opts import *
from functools import wraps
@__api_check(True)
- def wait_on_traffic (self, ports = None, timeout = 60, rx_delay_ms = 10):
+ def wait_on_traffic (self, ports = None, timeout = None, rx_delay_ms = 10):
"""
.. _wait_on_traffic:
timeout : int
timeout in seconds
+ default will be blocking
rx_delay_ms : int
Time to wait (in milliseconds) after last packet was sent, until RX filters used for
ports = self._validate_port_list(ports)
- expr = time.time() + timeout
+ timer = PassiveTimer(timeout)
# wait while any of the required ports are active
while set(self.get_active_ports()).intersection(ports):
raise STLError("subscriber thread is dead")
time.sleep(0.01)
- if time.time() > expr:
+ if timer.has_expired():
raise STLTimeoutError(timeout)
# remove any RX filters
import sys
import string
import random
+import time
try:
import pwd
def is_sub_list (l1, l2):
return set(l1) <= set(l2)
+# a simple passive timer
+class PassiveTimer(object):
+
+ # timeout_sec = None means forever
+ def __init__ (self, timeout_sec):
+ if timeout_sec != None:
+ self.expr_sec = time.time() + timeout_sec
+ else:
+ self.expr_sec = None
+
+ def has_expired (self):
+ # if no timeout was set - return always false
+ if self.expr_sec == None:
+ return False
+
+ return (time.time() > self.expr_sec)
+