STF publisher: ensure latest results are sent by CPP + get latest dump via Python API
authorYaroslav Brustinov <[email protected]>
Wed, 1 Mar 2017 16:40:58 +0000 (18:40 +0200)
committerYaroslav Brustinov <[email protected]>
Wed, 1 Mar 2017 16:40:58 +0000 (18:40 +0200)
Change-Id: I1836c0366785246acbcd8d238400440f5f3970f5
Signed-off-by: Yaroslav Brustinov <[email protected]>
scripts/automation/trex_control_plane/server/trex_launch_thread.py
scripts/automation/trex_control_plane/server/trex_server.py
scripts/automation/trex_control_plane/server/zmq_monitor_thread.py
scripts/automation/trex_control_plane/stf/trex_stf_lib/trex_client.py
src/main_dpdk.cpp

index f4ee0d6..a4a7a97 100755 (executable)
@@ -27,7 +27,7 @@ class AsynchronousTRexSession(threading.Thread):
         self.session                                = None\r
         self.trexObj                                = trexObj\r
         self.time_stamps                            = {'start' : None, 'run_time' : None}\r
-        self.trexObj.zmq_dump                       = {}\r
+        self.trexObj.clear_zmq_dump()\r
 \r
     def run (self):\r
         try:\r
index 60febc6..cd4af11 100755 (executable)
@@ -25,7 +25,7 @@ import CCustomLogger
 from trex_launch_thread import AsynchronousTRexSession
 from zmq_monitor_thread import ZmqMonitorSession
 from argparse import ArgumentParser, RawTextHelpFormatter
-from json import JSONEncoder
+import json
 import re
 import shlex
 import tempfile
@@ -140,6 +140,7 @@ class CTRexServer(object):
         self.server.register_function(self.get_file)
         self.server.register_function(self.get_files_list)
         self.server.register_function(self.get_files_path)
+        self.server.register_function(self.get_latest_dump)
         self.server.register_function(self.get_running_info)
         self.server.register_function(self.get_running_status)
         self.server.register_function(self.get_trex_cmds)
@@ -432,6 +433,9 @@ class CTRexServer(object):
         logger.info("Processing get_running_info() command.")
         return self.trex.get_running_info()
 
+    def get_latest_dump(self):
+        logger.info("Processing get_latest_dump() command.")
+        return self.trex.get_latest_dump()
 
     def generate_run_cmd (self, iom = 0, export_path="/tmp/trex.txt", stateless = False, debug_image = False, trex_args = '', **kwargs):
         """ generate_run_cmd(self, iom, export_path, kwargs) -> str
@@ -554,11 +558,11 @@ class CTRex(object):
         self.errcode        = None
         self.session        = None
         self.zmq_monitor    = None
-        self.zmq_dump       = None
+        self.__zmq_dump     = {}
+        self.zmq_dump_lock  = threading.Lock()
         self.zmq_error      = None
         self.seq            = None
         self.expect_trex    = threading.Event()
-        self.encoder        = JSONEncoder()
 
     def get_status(self):
         return self.status
@@ -578,9 +582,21 @@ class CTRex(object):
     def get_seq (self):
         return self.seq
 
+    def get_latest_dump(self):
+        with self.zmq_dump_lock:
+            return json.dumps(self.__zmq_dump)
+
+    def update_zmq_dump_key(self, key, val):
+        with self.zmq_dump_lock:
+            self.__zmq_dump[key] = val
+
+    def clear_zmq_dump(self):
+        with self.zmq_dump_lock:
+            self.__zmq_dump = {}
+
     def get_running_info (self):
         if self.status == TRexStatus.Running:
-            return self.encoder.encode(self.zmq_dump)
+            return self.get_latest_dump()
         else:
             logger.info("TRex isn't running. Running information isn't available.")
             if self.status == TRexStatus.Idle:
index f559ebc..172e2eb 100755 (executable)
@@ -72,7 +72,7 @@ class ZmqMonitorSession(threading.Thread):
 \r
         # add to trex_obj zmq latest dump, based on its 'name' header\r
         if dict_obj != {}:\r
-            self.trexObj.zmq_dump[dict_obj['name']] = dict_obj\r
+            self.trexObj.update_zmq_dump_key(dict_obj['name'], dict_obj)\r
             if self.first_dump:\r
                 # change TRexStatus from starting to Running once the first ZMQ dump is obtained and parsed successfully\r
                 self.first_dump = False\r
index 43504c9..09f9b48 100755 (executable)
@@ -11,7 +11,7 @@ import copy
 import binascii
 from distutils.util import strtobool
 from collections import deque, OrderedDict
-from json import JSONDecoder
+import json
 import traceback
 import signal
 
@@ -92,7 +92,6 @@ class CTRexClient(object):
         self.__default_user         = get_current_user()
         self.verbose                = verbose
         self.result_obj             = CTRexResult(max_history_size, filtered_latency_amount)
-        self.decoder                = JSONDecoder()
         self.history                = jsonrpclib.history.History()
         self.master_daemon_path     = "http://{hostname}:{port}/".format( hostname = self.trex_host, port = master_daemon_port )
         self.master_daemon          = jsonrpclib.Server(self.master_daemon_path, history = self.history)
@@ -523,7 +522,7 @@ class CTRexClient(object):
             return self.result_obj.get_latest_dump()
         else:
             try: 
-                latest_dump = self.decoder.decode( self.server.get_running_info() ) # latest dump is not a dict, but json string. decode it.
+                latest_dump = json.loads( self.server.get_running_info() ) # latest dump is not a dict, but json string. decode it.
                 self.result_obj.update_result_data(latest_dump)
                 return latest_dump
             except TypeError as inst:
@@ -606,6 +605,15 @@ class CTRexClient(object):
                 time.sleep(time_between_samples)
         except TRexWarning:
             pass
+
+        # try to get final server dump
+        try:
+            latest_server_dump = json.loads(self.server.get_latest_dump())
+            if latest_server_dump != self.result_obj.get_latest_dump():
+                self.result_obj.update_result_data(latest_server_dump)
+        except ProtocolError:
+            pass
+
         results = self.get_result_obj()
         return results
             
index fb2a6dc..db5af72 100644 (file)
@@ -5033,6 +5033,7 @@ int CGlobalTRex::stop_master(){
 
     dump_stats(stdout,CGlobalStats::dmpSTANDARD);
     dump_post_test_stats(stdout);
+    publish_async_data(false);
 
     return (0);
 }