--- /dev/null
+import zlib
+import struct
+
+class ZippedMsg:
+
+ MSG_COMPRESS_THRESHOLD = 256
+ MSG_COMPRESS_HEADER_MAGIC = 0xABE85CEA
+
+ def check_threshold (self, msg):
+ return len(msg) >= self.MSG_COMPRESS_THRESHOLD
+
+ def compress (self, msg):
+ # compress
+ compressed = zlib.compress(msg)
+ new_msg = struct.pack(">II", self.MSG_COMPRESS_HEADER_MAGIC, len(msg)) + compressed
+ return new_msg
+
+
+ def decompress (self, msg):
+ if len(msg) < 8:
+ return None
+
+ t = struct.unpack(">II", msg[:8])
+ if (t[0] != self.MSG_COMPRESS_HEADER_MAGIC):
+ return None
+
+ x = zlib.decompress(msg[8:])
+ if len(x) != t[1]:
+ return None
+
+ return x
+
import threading\r
import logging\r
import CCustomLogger\r
+import zipmsg\r
from json import JSONDecoder\r
from common.trex_status_e import TRexStatus\r
\r
self.trexObj = trexObj\r
self.expect_trex = self.trexObj.expect_trex # used to signal if TRex is expected to run and if data should be considered\r
self.decoder = JSONDecoder()\r
+ self.zipped = zipmsg.ZippedMsg()\r
logger.info("ZMQ monitor initialization finished")\r
\r
def run(self):\r
super(ZmqMonitorSession, self).join(timeout)\r
\r
def parse_and_update_zmq_dump(self, zmq_dump):\r
- try:\r
- dict_obj = self.decoder.decode(zmq_dump.decode(errors = 'replace'))\r
- except ValueError:\r
- logger.error("ZMQ dump failed JSON-RPC decode. Ignoring. Bad dump was: {dump}".format(dump=zmq_dump))\r
- dict_obj = None\r
+ unzipped = self.zipped.decompress(zmq_dump)\r
+ if unzipped:\r
+ zmq_dump = unzipped\r
+ dict_obj = self.decoder.decode(zmq_dump.decode(errors = 'replace'))\r
+\r
+ if type(dict_obj) is not dict:\r
+ raise Exception('Expected ZMQ dump of type dict, got: %s' % type(dict_obj))\r
\r
# add to trex_obj zmq latest dump, based on its 'name' header\r
- if dict_obj is not None and dict_obj != {}:\r
+ if dict_obj != {}:\r
self.trexObj.zmq_dump[dict_obj['name']] = dict_obj\r
if self.first_dump:\r
# change TRexStatus from starting to Running once the first ZMQ dump is obtained and parsed successfully\r