RX stats #2
authorimarom <[email protected]>
Thu, 10 Mar 2016 08:21:37 +0000 (10:21 +0200)
committerimarom <[email protected]>
Thu, 10 Mar 2016 15:16:38 +0000 (17:16 +0200)
scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_async_client.py
scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py
scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_stats.py
src/flow_stat.cpp
src/flow_stat.h
src/internal_api/trex_platform_api.h
src/main_dpdk.cpp
src/rpc-server/commands/trex_rpc_cmd_general.cpp
src/rpc-server/commands/trex_rpc_cmds.h

index 82891b6..ae6cb49 100644 (file)
@@ -178,7 +178,8 @@ class CTRexAsyncClient():
 
         self.connected = True
 
-        rc = self.barrier()
+        # sync all stats data as a baseline from the server
+        rc = self.barrier(baseline = True)
         if not rc:
             self.disconnect()
             return rc
@@ -245,11 +246,11 @@ class CTRexAsyncClient():
             name = msg['name']
             data = msg['data']
             type = msg['type']
-            sync = msg.get('sync', False)
+            baseline = msg.get('baseline', False)
 
             self.raw_snapshot[name] = data
 
-            self.__dispatch(name, type, data, sync)
+            self.__dispatch(name, type, data, baseline)
 
         
         # closing of socket must be from the same thread
@@ -270,10 +271,11 @@ class CTRexAsyncClient():
         return self.raw_snapshot
 
     # dispatch the message to the right place
-    def __dispatch (self, name, type, data, sync):
+    def __dispatch (self, name, type, data, baseline):
+
         # stats
         if name == "trex-global":
-            self.event_handler.handle_async_stats_update(data, sync)
+            self.event_handler.handle_async_stats_update(data, baseline)
 
         # events
         elif name == "trex-event":
@@ -284,7 +286,7 @@ class CTRexAsyncClient():
             self.handle_async_barrier(type, data)
 
         elif name == "flow_stats":
-            self.event_handler.handle_async_rx_stats_event(data, sync)
+            self.event_handler.handle_async_rx_stats_event(data, baseline)
 
         else:
             pass
@@ -297,7 +299,7 @@ class CTRexAsyncClient():
 
 
     # block on barrier for async channel
-    def barrier(self, timeout = 5):
+    def barrier(self, timeout = 5, baseline = False):
         
         # set a random key
         key = random.getrandbits(32)
@@ -309,7 +311,7 @@ class CTRexAsyncClient():
         while not self.async_barrier['ack']:
 
             # inject
-            rc = self.stateless_client._transmit("publish_now", params = {'key' : key})
+            rc = self.stateless_client._transmit("publish_now", params = {'key' : key, 'baseline': baseline})
             if not rc:
                 return rc
 
index 1905d44..5ecb112 100644 (file)
@@ -155,12 +155,12 @@ class AsyncEventHandler(object):
         pass
 
 
-    def handle_async_rx_stats_event (self, data, sync):
-        self.client.flow_stats.update(data, sync)
+    def handle_async_rx_stats_event (self, data, baseline):
+        self.client.flow_stats.update(data, baseline)
 
 
     # handles an async stats update from the subscriber
-    def handle_async_stats_update(self, dump_data, sync):
+    def handle_async_stats_update(self, dump_data, baseline):
         global_stats = {}
         port_stats = {}
 
@@ -182,11 +182,11 @@ class AsyncEventHandler(object):
                 global_stats[key] = value
 
         # update the general object with the snapshot
-        self.client.global_stats.update(global_stats, sync)
+        self.client.global_stats.update(global_stats, baseline)
 
         # update all ports
         for port_id, data in port_stats.iteritems():
-            self.client.ports[port_id].port_stats.update(data, sync)
+            self.client.ports[port_id].port_stats.update(data, baseline)
 
 
     # dispatcher for server async events (port started, port stopped and etc.)
index 60c8229..cf39444 100644 (file)
@@ -389,7 +389,7 @@ class CTRexStats(object):
         self.last_update_ts = time.time()
         self.history = deque(maxlen = 10)
         self.lock = threading.Lock()
-        self.is_synced = False
+        self.has_baseline = False
 
     ######## abstract methods ##########
 
@@ -402,15 +402,15 @@ class CTRexStats(object):
         raise NotImplementedError()
 
     # called when a snapshot arrives - add more fields
-    def _update (self, snapshot, sync):
+    def _update (self, snapshot, baseline):
         raise NotImplementedError()
 
 
     ######## END abstract methods ##########
 
-    def update(self, snapshot, sync):
+    def update(self, snapshot, baseline):
 
-        if not self.is_synced and not sync:
+        if not self.has_baseline and not baseline:
             return
 
         rc = self._update(snapshot)
@@ -418,9 +418,9 @@ class CTRexStats(object):
             return
 
         # sync one time
-        if not self.is_synced and sync:
+        if not self.has_baseline and baseline:
             self.reference_stats = copy.deepcopy(self.latest_stats)
-            self.is_synced = True
+            self.has_baseline = True
 
         # save history
         with self.lock:
index b2714b1..de081ff 100644 (file)
@@ -622,7 +622,7 @@ int CFlowStatRuleMgr::get_active_pgids(flow_stat_active_t &result) {
 }
 
 // return false if no counters changed since last run. true otherwise
-bool CFlowStatRuleMgr::dump_json(std::string & json, bool force_sync) {
+bool CFlowStatRuleMgr::dump_json(std::string & json, bool baseline) {
     uint64_t rx_stats[MAX_FLOW_STATS];
     tx_per_flow_t tx_stats[MAX_FLOW_STATS];
     Json::FastWriter writer;
@@ -631,8 +631,8 @@ bool CFlowStatRuleMgr::dump_json(std::string & json, bool force_sync) {
     root["name"] = "flow_stats";
     root["type"] = 0;
     
-    if (force_sync) {
-        root["sync"] = true;
+    if (baseline) {
+        root["baseline"] = true;
     }
 
     Json::Value &data_section = root["data"];
@@ -640,7 +640,7 @@ bool CFlowStatRuleMgr::dump_json(std::string & json, bool force_sync) {
     data_section["ts"]["freq"] = Json::Value::UInt64(os_get_hr_freq());
 
     if (m_user_id_map.is_empty()) {
-        if (force_sync) {
+        if (baseline) {
             json = writer.write(root);
             return true;
         } else
@@ -692,11 +692,11 @@ bool CFlowStatRuleMgr::dump_json(std::string & json, bool force_sync) {
         }
         for (uint8_t port = 0; port < m_num_ports; port++) {
             std::string str_port = static_cast<std::ostringstream*>( &(std::ostringstream() << int(port) ) )->str();
-            if (user_id_info->need_to_send_rx(port) || force_sync) {
+            if (user_id_info->need_to_send_rx(port) || baseline) {
                 user_id_info->set_no_need_to_send_rx(port);
                 data_section[str_user_id]["rx_pkts"][str_port] = Json::Value::UInt64(user_id_info->get_rx_counter(port));
             }
-            if (user_id_info->need_to_send_tx(port) || force_sync) {
+            if (user_id_info->need_to_send_tx(port) || baseline) {
                 user_id_info->set_no_need_to_send_tx(port);
                 data_section[str_user_id]["tx_pkts"][str_port] = Json::Value::UInt64(user_id_info->get_tx_counter(port).get_pkts());
                 data_section[str_user_id]["tx_bytes"][str_port] = Json::Value::UInt64(user_id_info->get_tx_counter(port).get_bytes());
index 6966b11..3e00a18 100644 (file)
@@ -202,7 +202,7 @@ class CFlowStatRuleMgr {
     int start_stream(TrexStream * stream, uint16_t &ret_hw_id);
     int stop_stream(const TrexStream * stream);
     int get_active_pgids(flow_stat_active_t &result);
-    bool dump_json(std::string & json, bool force_sync);
+    bool dump_json(std::string & json, bool baseline);
 
  private:
     int compile_stream(const TrexStream * stream, Cxl710Parser &parser);
index e9cf56d..f8f7658 100644 (file)
@@ -139,7 +139,7 @@ public:
 
     virtual void get_interface_info(uint8_t interface_id, intf_info_st &info) const = 0;
 
-    virtual void publish_async_data_now(uint32_t key) const = 0;
+    virtual void publish_async_data_now(uint32_t key, bool baseline) const = 0;
     virtual uint8_t get_dp_core_count() const = 0;
     virtual void get_interface_stat_info(uint8_t interface_id, uint16_t &num_counters, uint16_t &capabilities) const =0;
     virtual int get_flow_stats(uint8_t port_id, uint64_t *stats, void *tx_stats, int min, int max, bool reset) const = 0;
@@ -168,7 +168,7 @@ public:
 
     void get_interface_info(uint8_t interface_id, intf_info_st &info) const;
 
-    void publish_async_data_now(uint32_t key) const;
+    void publish_async_data_now(uint32_t key, bool baseline) const;
     uint8_t get_dp_core_count() const;
     void get_interface_stat_info(uint8_t interface_id, uint16_t &num_counters, uint16_t &capabilities) const;
     int get_flow_stats(uint8_t port_id, uint64_t *stats, void *tx_stats, int min, int max, bool reset) const;
@@ -222,7 +222,7 @@ public:
         }
     }
 
-    virtual void publish_async_data_now(uint32_t key) const {
+    virtual void publish_async_data_now(uint32_t key, bool baseline) const {
 
     }
     virtual int get_flow_stats(uint8_t port_id, uint64_t *stats, void *tx_stats, int min, int max, bool reset) const {return 0;};
index 2d087c8..1b750bb 100644 (file)
@@ -2307,7 +2307,7 @@ public:
 public:
     void Dump(FILE *fd,DumpFormat mode);
     void DumpAllPorts(FILE *fd);
-    void dump_json(std::string & json, bool force_sync);
+    void dump_json(std::string & json, bool baseline);
 private:
     std::string get_field(std::string name,float &f);
     std::string get_field(std::string name,uint64_t &f);
@@ -2341,12 +2341,12 @@ std::string CGlobalStats::get_field_port(int port,std::string name,uint64_t &f){
 }
 
 
-void CGlobalStats::dump_json(std::string & json, bool force_sync){
+void CGlobalStats::dump_json(std::string & json, bool baseline){
     /* refactor this to JSON */
 
     json="{\"name\":\"trex-global\",\"type\":0,";
-    if (force_sync) {
-        json += "\"sync\": true,";
+    if (baseline) {
+        json += "\"baseline\": true,";
     }
 
     json +="\"data\":{";
@@ -2638,7 +2638,7 @@ private:
 
 public:
 
-    void publish_async_data(bool sync_now);
+    void publish_async_data(bool sync_now, bool baseline = false);
     void publish_async_barrier(uint32_t key);
 
     void dump_stats(FILE *fd,
@@ -3563,7 +3563,7 @@ void CGlobalTRex::dump_stats(FILE *fd, CGlobalStats::DumpFormat format){
 }
 
 void
-CGlobalTRex::publish_async_data(bool sync_now) {
+CGlobalTRex::publish_async_data(bool sync_now, bool baseline) {
     std::string json;
 
     /* refactor to update, dump, and etc. */
@@ -3572,7 +3572,7 @@ CGlobalTRex::publish_async_data(bool sync_now) {
         get_stats(m_stats);
     }
 
-    m_stats.dump_json(json, sync_now);
+    m_stats.dump_json(json, baseline);
     m_zmq_publisher.publish_json(json);
 
     /* generator json , all cores are the same just sample the first one */
@@ -3599,7 +3599,7 @@ CGlobalTRex::publish_async_data(bool sync_now) {
     m_zmq_publisher.publish_json(json);
 
     if (get_is_stateless()) {
-        if (m_trex_stateless->m_rx_flow_stat.dump_json(json, sync_now))
+        if (m_trex_stateless->m_rx_flow_stat.dump_json(json, baseline))
             m_zmq_publisher.publish_json(json);
     }
 }
@@ -5224,8 +5224,8 @@ TrexDpdkPlatformApi::get_interface_info(uint8_t interface_id, intf_info_st &info
 }
 
 void
-TrexDpdkPlatformApi::publish_async_data_now(uint32_t key) const {
-    g_trex.publish_async_data(true);
+TrexDpdkPlatformApi::publish_async_data_now(uint32_t key, bool baseline) const {
+    g_trex.publish_async_data(true, baseline);
     g_trex.publish_async_barrier(key);
 }
 
index dcf74b5..f054c0e 100644 (file)
@@ -398,9 +398,10 @@ trex_rpc_cmd_rc_e
 TrexRpcPublishNow::_run(const Json::Value &params, Json::Value &result) {
     TrexStateless *main = get_stateless_obj();
 
-    uint32_t key = parse_uint32(params, "key", result);
+    uint32_t key  = parse_uint32(params, "key", result);
+    bool baseline = parse_bool(params, "baseline", result);
 
-    main->get_platform_api()->publish_async_data_now(key);
+    main->get_platform_api()->publish_async_data_now(key, baseline);
 
     result["result"] = Json::objectValue;
     return (TREX_RPC_CMD_OK);
index 386ccc2..c4b01b8 100644 (file)
@@ -57,7 +57,7 @@ TREX_RPC_CMD_DEFINE(TrexRpcCmdTestSub,    "test_sub", 2, false);
  * general cmds
  */
 TREX_RPC_CMD_DEFINE(TrexRpcCmdPing,       "ping",                 0, false);
-TREX_RPC_CMD_DEFINE(TrexRpcPublishNow,    "publish_now",          1, false);
+TREX_RPC_CMD_DEFINE(TrexRpcPublishNow,    "publish_now",          2, false);
 TREX_RPC_CMD_DEFINE(TrexRpcCmdGetCmds,    "get_supported_cmds",   0, false);
 TREX_RPC_CMD_DEFINE(TrexRpcCmdGetVersion, "get_version",          0, false);
 TREX_RPC_CMD_DEFINE(TrexRpcCmdGetActivePGIds, "get_active_pgids",0, false);