from os import remove
from input_data_files import download_and_unzip_data_file
+from utils import Worker
class ExecutionChecker(ResultVisitor):
REGEX_TOLERANCE = re.compile(r'^[\D\d]*LOSS_ACCEPTANCE:\s(\d*\.\d*)\s'
r'[\D\d]*')
- REGEX_VERSION = re.compile(r"(return STDOUT Version:\s*)(.*)")
+ REGEX_VERSION_VPP = re.compile(r"(return STDOUT Version:\s*)(.*)")
+
+ REGEX_VERSION_DPDK = re.compile(r"(return STDOUT testpmd)([\d\D\n]*)"
+ r"(RTE Version: 'DPDK )(.*)(')")
REGEX_TCP = re.compile(r'Total\s(rps|cps|throughput):\s([0-9]*).*$')
# VPP version
self._version = None
+ # Timestamp
+ self._timestamp = None
+
# Number of VAT History messages found:
# 0 - no message
# 1 - VAT History of DUT1
# Dictionary defining the methods used to parse different types of
# messages
self.parse_msg = {
- "setup-version": self._get_version,
+ "timestamp": self._get_timestamp,
+ "vpp-version": self._get_vpp_version,
+ "dpdk-version": self._get_dpdk_version,
"teardown-vat-history": self._get_vat_history,
"test-show-runtime": self._get_show_run
}
"""
return self._data
- def _get_version(self, msg):
+ def _get_vpp_version(self, msg):
"""Called when extraction of VPP version is required.
:param msg: Message to process.
"""
if msg.message.count("return STDOUT Version:"):
- self._version = str(re.search(self.REGEX_VERSION, msg.message).
+ self._version = str(re.search(self.REGEX_VERSION_VPP, msg.message).
group(2))
self._data["metadata"]["version"] = self._version
- self._data["metadata"]["generated"] = msg.timestamp
self._msg_type = None
+ def _get_dpdk_version(self, msg):
+ """Called when extraction of DPDK version is required.
+
+ :param msg: Message to process.
+ :type msg: Message
+ :returns: Nothing.
+ """
+
+ if msg.message.count("return STDOUT testpmd"):
+ try:
+ self._version = str(re.search(
+ self.REGEX_VERSION_DPDK, msg.message). group(4))
+ self._data["metadata"]["version"] = self._version
+ except IndexError:
+ pass
+ finally:
+ self._msg_type = None
+
+ def _get_timestamp(self, msg):
+ """Called when extraction of timestamp is required.
+
+ :param msg: Message to process.
+ :type msg: Message
+ :returns: Nothing.
+ """
+
+ self._timestamp = msg.timestamp[:14]
+ self._data["metadata"]["generated"] = self._timestamp
+ self._msg_type = None
+
def _get_vat_history(self, msg):
"""Called when extraction of VAT command history is required.
self._lookup_kw_nr += 1
self._show_run_lookup_nr = 0
self._msg_type = "test-show-runtime"
- test_kw.messages.visit(self)
+ elif test_kw.name.count("Start The L2fwd Test") and not self._version:
+ self._msg_type = "dpdk-version"
+ else:
+ return
+ test_kw.messages.visit(self)
def end_test_kw(self, test_kw):
"""Called when keyword ends. Default implementation does nothing.
"""
if setup_kw.name.count("Show Vpp Version On All Duts") \
and not self._version:
- self._msg_type = "setup-version"
- setup_kw.messages.visit(self)
+ self._msg_type = "vpp-version"
+
+ elif setup_kw.name.count("Setup performance global Variables") \
+ and not self._timestamp:
+ self._msg_type = "timestamp"
+ else:
+ return
+ setup_kw.messages.visit(self)
def end_setup_kw(self, setup_kw):
"""Called when keyword ends. Default implementation does nothing.
logging.info("Downloading and parsing input files ...")
work_queue = multiprocessing.JoinableQueue()
-
manager = multiprocessing.Manager()
-
data_queue = manager.Queue()
-
cpus = multiprocessing.cpu_count()
+
workers = list()
for cpu in range(cpus):
worker = Worker(work_queue,
:rtype pandas.Series
"""
- logging.info(" Creating the data set for the {0} '{1}'.".
- format(element.get("type", ""), element.get("title", "")))
-
try:
if element["filter"] in ("all", "template"):
cond = "True"
merged_data[ID] = item_data
return merged_data
-
-
-class Worker(multiprocessing.Process):
- """Worker class used to download and process input files in separate
- parallel processes.
- """
-
- def __init__(self, work_queue, data_queue, func):
- """Initialization.
-
- :param work_queue: Queue with items to process.
- :param data_queue: Shared memory between processes. Queue which keeps
- the result data. This data is then read by the main process and used
- in further processing.
- :param func: Function which is executed by the worker.
- :type work_queue: multiprocessing.JoinableQueue
- :type data_queue: multiprocessing.Manager().Queue()
- :type func: Callable object
- """
- super(Worker, self).__init__()
- self._work_queue = work_queue
- self._data_queue = data_queue
- self._func = func
-
- def run(self):
- """Method representing the process's activity.
- """
-
- while True:
- try:
- self.process(self._work_queue.get())
- finally:
- self._work_queue.task_done()
-
- def process(self, item_to_process):
- """Method executed by the runner.
-
- :param item_to_process: Data to be processed by the function.
- :type item_to_process: tuple
- """
- self._func(self.pid, self._data_queue, *item_to_process)