stf daemon: add support for zipped zmq
authorYaroslav Brustinov <[email protected]>
Wed, 3 Aug 2016 14:24:59 +0000 (17:24 +0300)
committerYaroslav Brustinov <[email protected]>
Wed, 3 Aug 2016 14:24:59 +0000 (17:24 +0300)
scripts/automation/trex_control_plane/server/zipmsg.py [new file with mode: 0644]
scripts/automation/trex_control_plane/server/zmq_monitor_thread.py

diff --git a/scripts/automation/trex_control_plane/server/zipmsg.py b/scripts/automation/trex_control_plane/server/zipmsg.py
new file mode 100644 (file)
index 0000000..397ada1
--- /dev/null
@@ -0,0 +1,32 @@
+import zlib
+import struct
+
+class ZippedMsg:
+
+    MSG_COMPRESS_THRESHOLD    = 256
+    MSG_COMPRESS_HEADER_MAGIC = 0xABE85CEA
+
+    def check_threshold (self, msg):
+        return len(msg) >= self.MSG_COMPRESS_THRESHOLD
+
+    def compress (self, msg):
+        # compress
+        compressed = zlib.compress(msg)
+        new_msg = struct.pack(">II", self.MSG_COMPRESS_HEADER_MAGIC, len(msg)) + compressed
+        return new_msg
+
+
+    def decompress (self, msg):
+        if len(msg) < 8:
+            return None
+
+        t = struct.unpack(">II", msg[:8])
+        if (t[0] != self.MSG_COMPRESS_HEADER_MAGIC):
+            return None
+
+        x = zlib.decompress(msg[8:])
+        if len(x) != t[1]:
+            return None
+
+        return x
+
index 4fc263d..f559ebc 100755 (executable)
@@ -6,6 +6,7 @@ import zmq
 import threading\r
 import logging\r
 import CCustomLogger\r
+import zipmsg\r
 from json import JSONDecoder\r
 from common.trex_status_e import TRexStatus\r
 \r
@@ -24,6 +25,7 @@ class ZmqMonitorSession(threading.Thread):
         self.trexObj        = trexObj\r
         self.expect_trex    = self.trexObj.expect_trex     # used to signal if TRex is expected to run and if data should be considered\r
         self.decoder        = JSONDecoder()\r
+        self.zipped         = zipmsg.ZippedMsg()\r
         logger.info("ZMQ monitor initialization finished")\r
 \r
     def run(self):\r
@@ -60,14 +62,16 @@ class ZmqMonitorSession(threading.Thread):
         super(ZmqMonitorSession, self).join(timeout)\r
 \r
     def parse_and_update_zmq_dump(self, zmq_dump):\r
-        try:\r
-            dict_obj = self.decoder.decode(zmq_dump.decode(errors = 'replace'))\r
-        except ValueError:\r
-            logger.error("ZMQ dump failed JSON-RPC decode. Ignoring. Bad dump was: {dump}".format(dump=zmq_dump))\r
-            dict_obj = None\r
+        unzipped = self.zipped.decompress(zmq_dump)\r
+        if unzipped:\r
+            zmq_dump = unzipped\r
+        dict_obj = self.decoder.decode(zmq_dump.decode(errors = 'replace'))\r
+\r
+        if type(dict_obj) is not dict:\r
+            raise Exception('Expected ZMQ dump of type dict, got: %s' % type(dict_obj))\r
 \r
         # add to trex_obj zmq latest dump, based on its 'name' header\r
-        if dict_obj is not None and dict_obj != {}:\r
+        if dict_obj != {}:\r
             self.trexObj.zmq_dump[dict_obj['name']] = dict_obj\r
             if self.first_dump:\r
                 # change TRexStatus from starting to Running once the first ZMQ dump is obtained and parsed successfully\r