RX STATS !
authorimarom <[email protected]>
Wed, 9 Mar 2016 16:04:31 +0000 (18:04 +0200)
committerimarom <[email protected]>
Thu, 10 Mar 2016 15:16:37 +0000 (17:16 +0200)
scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_async_client.py
scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py
scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_stats.py
src/flow_stat.cpp
src/main_dpdk.cpp

index 36103ca..82891b6 100644 (file)
@@ -245,9 +245,11 @@ class CTRexAsyncClient():
             name = msg['name']
             data = msg['data']
             type = msg['type']
+            sync = msg.get('sync', False)
+
             self.raw_snapshot[name] = data
 
-            self.__dispatch(name, type, data)
+            self.__dispatch(name, type, data, sync)
 
         
         # closing of socket must be from the same thread
@@ -268,10 +270,10 @@ class CTRexAsyncClient():
         return self.raw_snapshot
 
     # dispatch the message to the right place
-    def __dispatch (self, name, type, data):
+    def __dispatch (self, name, type, data, sync):
         # stats
         if name == "trex-global":
-            self.event_handler.handle_async_stats_update(data)
+            self.event_handler.handle_async_stats_update(data, sync)
 
         # events
         elif name == "trex-event":
@@ -282,7 +284,7 @@ class CTRexAsyncClient():
             self.handle_async_barrier(type, data)
 
         elif name == "flow_stats":
-            self.event_handler.handle_async_rx_stats_event(data)
+            self.event_handler.handle_async_rx_stats_event(data, sync)
 
         else:
             pass
index 7dc7ff3..1905d44 100644 (file)
@@ -155,12 +155,12 @@ class AsyncEventHandler(object):
         pass
 
 
-    def handle_async_rx_stats_event (self, data):
-        self.client.flow_stats.update(data)
+    def handle_async_rx_stats_event (self, data, sync):
+        self.client.flow_stats.update(data, sync)
 
 
     # handles an async stats update from the subscriber
-    def handle_async_stats_update(self, dump_data):
+    def handle_async_stats_update(self, dump_data, sync):
         global_stats = {}
         port_stats = {}
 
@@ -182,11 +182,11 @@ class AsyncEventHandler(object):
                 global_stats[key] = value
 
         # update the general object with the snapshot
-        self.client.global_stats.update(global_stats)
+        self.client.global_stats.update(global_stats, sync)
 
         # update all ports
         for port_id, data in port_stats.iteritems():
-            self.client.ports[port_id].port_stats.update(data)
+            self.client.ports[port_id].port_stats.update(data, sync)
 
 
     # dispatcher for server async events (port started, port stopped and etc.)
@@ -808,6 +808,7 @@ class STLClient(object):
             self.ports[port_id].invalidate_stats()
 
         self.global_stats.invalidate()
+        self.flow_stats.invalidate()
 
         return RC_OK()
 
index 418affb..60c8229 100644 (file)
@@ -13,6 +13,7 @@ import re
 import math
 import copy
 import threading
+import pprint
 
 GLOBAL_STATS = 'g'
 PORT_STATS = 'p'
@@ -137,7 +138,7 @@ class CTRexInfoGenerator(object):
 
         stats_table = text_tables.TRexTextTable()
         stats_table.set_cols_align(["l"] + ["r"] * stream_count)
-        stats_table.set_cols_width([20] + [17]   * stream_count)
+        stats_table.set_cols_width([10] + [17]   * stream_count)
         stats_table.set_cols_dtype(['t'] + ['t'] * stream_count)
 
         stats_table.add_rows([[k] + v
@@ -388,7 +389,7 @@ class CTRexStats(object):
         self.last_update_ts = time.time()
         self.history = deque(maxlen = 10)
         self.lock = threading.Lock()
-
+        self.is_synced = False
 
     ######## abstract methods ##########
 
@@ -401,37 +402,29 @@ class CTRexStats(object):
         raise NotImplementedError()
 
     # called when a snapshot arrives - add more fields
-    def preprocess_snapshot (self, snapshot):
+    def _update (self, snapshot, sync):
         raise NotImplementedError()
 
-    ######## END abstract methods ##########
 
-    def __update_ref (self):
-        deep_merge_dicts(self.reference_stats, self.latest_stats)
+    ######## END abstract methods ##########
 
+    def update(self, snapshot, sync):
 
-    def update(self, snapshot):
+        if not self.is_synced and not sync:
+            return
 
-        # some extended generated values (from base values)
-        self.preprocess_snapshot(snapshot)
+        rc = self._update(snapshot)
+        if not rc:
+            return
 
-        # update
-        self.latest_stats = snapshot
+        # sync one time
+        if not self.is_synced and sync:
+            self.reference_stats = copy.deepcopy(self.latest_stats)
+            self.is_synced = True
 
+        # save history
         with self.lock:
-            self.history.append(snapshot)
-
-        diff_time = time.time() - self.last_update_ts
-
-        # handle the reference (base)
-        self.__update_ref()
-
-        # 3 seconds its a timeout
-        if diff_time > 3:
-            self.clear_stats()
-
-
-        self.last_update_ts = time.time()
+            self.history.append(self.latest_stats)
 
 
     def clear_stats(self):
@@ -467,14 +460,13 @@ class CTRexStats(object):
 
 
     def get_rel(self, field, format=False, suffix=""):
-        ref_value = self._get(self.reference_stats, field)
-        if ref_value == None:
-            return "N/A"
-
-        # if the latest does not have the value - its like the ref
+        
+        ref_value = self._get(self.reference_stats, field, default = 0)
         latest_value = self._get(self.latest_stats, field)
+
+        # latest value is an aggregation - must contain the value
         if latest_value == None:
-            latest_value = ref_value
+            return "N/A"
 
 
         value = latest_value - ref_value
@@ -564,7 +556,7 @@ class CGlobalStats(CTRexStats):
         return stats
 
 
-    def preprocess_snapshot (self, snapshot):
+    def pre_update (self, snapshot):
         # L1 bps
         bps = snapshot.get("m_tx_bps")
         pps = snapshot.get("m_tx_pps")
@@ -578,6 +570,16 @@ class CGlobalStats(CTRexStats):
         snapshot['m_tx_bps_L1'] = bps_L1
 
 
+    def _update(self, snapshot):
+
+        self.pre_update(snapshot)
+
+        # simple...
+        self.latest_stats = snapshot
+
+        return True
+
+
     def generate_stats(self):
         return OrderedDict([("connection", "{host}, Port {port}".format(host=self.connection_info.get("server"),
                                                                      port=self.connection_info.get("sync_port"))),
@@ -674,7 +676,7 @@ class CPortStats(CTRexStats):
         return stats
 
 
-    def preprocess_snapshot (self, snapshot):
+    def pre_update (self, snapshot):
         # L1 bps
         bps = snapshot.get("m_total_tx_bps")
         pps = snapshot.get("m_total_tx_pps")
@@ -689,6 +691,16 @@ class CPortStats(CTRexStats):
         snapshot['m_percentage'] = (bps_L1 / self._port_obj.get_speed_bps()) * 100
 
 
+    def _update(self, snapshot):
+
+        self.pre_update(snapshot)
+
+        # simple...
+        self.latest_stats = snapshot
+
+        return True
+
+
     def generate_stats(self):
 
         state = self._port_obj.get_port_state_name() if self._port_obj else "" 
@@ -749,6 +761,9 @@ class CPortStats(CTRexStats):
                 }
 
 
+
+
+# RX stats objects - COMPLEX :-(
 class CRxStats(CTRexStats):
     def __init__(self):
         super(CRxStats, self).__init__()
@@ -760,98 +775,177 @@ class CRxStats(CTRexStats):
         factor = bps / (pps * 8.0)
         return bps * ( 1 + (20 / factor) )
 
+    def calculate_diff_sec (self, current, prev):
+        if not 'ts' in current:
+            raise ValueError("INTERNAL ERROR: RX stats snapshot MUST contain 'ts' field")
 
-    def preprocess_snapshot (self, snapshot):
-        # heavy pre-processing here...
-        new_snapshot = {}
+        if prev:
+            prev_ts   = prev['ts']
+            now_ts    = current['ts']
+            diff_sec  = (now_ts['value'] - prev_ts['value']) / float(now_ts['freq'])
+        else:
+            diff_sec = 0.0
 
-        if not 'ts' in snapshot:
-            raise ValueError("INTERNAL ERROR: RX stats snapshot MUST contain 'ts' field")
+        return diff_sec
+
+
+    def process_single_pg (self, current_pg, prev_pg):
+
+        # start with the previous PG
+        output = copy.deepcopy(prev_pg)
+
+        for field in ['tx_pkts', 'tx_bytes', 'rx_pkts']:
+            if not field in output:
+                output[field] = {}
+
+            if field in current_pg:
+                for port, pv in current_pg[field].iteritems():
+                    if not self.is_intable(port):
+                        continue
 
-        new_snapshot['ts'] = snapshot['ts']
+                    output[field][port] = pv
 
-        for key, value in snapshot.iteritems():
-            # skip non int values (simply copy)
-            if key == 'ts':
-                new_snapshot[key] = value
+            # sum up
+            total = 0
+            for port, pv in output[field].iteritems():
+                if not self.is_intable(port):
+                    continue
+                total += pv
+
+            output[field]['total'] = total
+
+        return output
+            
+
+    def is_intable (self, value):
+        try:
+            int(value)
+            return True
+        except ValueError:
+            return False
+
+    def process_snapshot (self, current, prev):
+
+        # timestamp
+        diff_sec = self.calculate_diff_sec(current, prev)
+
+        # final output
+        output = {}
+
+        # copy timestamp field
+        output['ts'] = current['ts']
+
+        pg_ids = set(prev.keys() + current.keys())
+
+        for pg_id in pg_ids:
+            if not self.is_intable(pg_id):
                 continue
 
-            # all the rest MUST be ints
-            try:
-                pg_id = int(key)
-            except ValueError:
-                assert(0)
+            current_pg = current.get(pg_id, {})
+            prev_pg = prev.get(pg_id, {})
+            
+            if current_pg.get('first_time'):
+                # new value - ignore history
+                output[pg_id] = self.process_single_pg(current_pg, {})
+                self.reference_stats[pg_id] = {}
+                self.calculate_bw_for_pg(output[pg_id], prev_pg, 0)
+            else:
+                # aggregate
+                output[pg_id] = self.process_single_pg(current_pg, prev_pg)
 
-            # handle PG ID
-            new_snapshot[pg_id] = {}
-            for field in ['tx_pkts', 'tx_bytes', 'rx_pkts']:
-                new_snapshot[pg_id][field] = {'total': 0}
-                if field in value:
-                    for port, pv in value[field].iteritems():
-                        new_snapshot[pg_id][field][int(port)] = pv
-                        new_snapshot[pg_id][field]['total'] +=  pv
+                self.calculate_bw_for_pg(output[pg_id], prev_pg, diff_sec)
 
-            # add B/W calcs for a PG id
-            self.calculate_bw(new_snapshot, pg_id)
 
-        snapshot.clear()
-        snapshot.update(new_snapshot)
+        return output
 
 
-    def calculate_bw (self, snapshot, pg_id):
-        if not self.latest_stats:
-            snapshot[pg_id]['tx_pps'] = 0.0
-            snapshot[pg_id]['tx_bps'] = 0.0
-            snapshot[pg_id]['rx_pps'] = 0.0
-            snapshot[pg_id]['rx_bps'] = 0.0
+
+    def calculate_bw_for_pg (self, pg_current, pg_prev, diff_sec):
+
+        if diff_sec == 0:
+            pg_current['tx_pps'] = 0.0
+            pg_current['tx_bps'] = 0.0
+            pg_current['tx_bps_L1'] = 0.0
+            pg_current['rx_pps'] = 0.0
+            pg_current['rx_bps'] = 0.0
             return
 
+
         # prev
-        prev_ts       = self._get(self.latest_stats, 'ts')
-        prev_tx_pkts  = self._get(self.latest_stats, [pg_id, 'tx_pkts', 'total'], default = 0.0)
-        prev_tx_bytes = self._get(self.latest_stats, [pg_id, 'tx_bytes', 'total'], default = 0.0)
-        prev_tx_pps   = self._get(self.latest_stats, [pg_id, 'tx_pps'], default = 0.0)
-        prev_tx_bps   = self._get(self.latest_stats, [pg_id, 'tx_bps'], default = 0.0)
+        prev_tx_pkts  = pg_prev['tx_pkts']['total']
+        prev_tx_bytes = pg_prev['tx_bytes']['total']
+        prev_tx_pps   = pg_prev['tx_pps']
+        prev_tx_bps   = pg_prev['tx_bps']
 
         # now
-        now_ts       = snapshot['ts']
-        now_tx_pkts  = snapshot[pg_id]['tx_pkts']['total']
-        now_tx_bytes = snapshot[pg_id]['tx_bytes']['total']
+        now_tx_pkts  = pg_current['tx_pkts']['total']
+        now_tx_bytes = pg_current['tx_bytes']['total']
 
-        # diff seconds
-        diff_sec = (now_ts['value'] - prev_ts['value']) / float(now_ts['freq'])
-        
-        # calculate fields
-        snapshot[pg_id]['tx_pps'] = (0.5 * prev_tx_pps) + (0.5 * ( (now_tx_pkts - prev_tx_pkts) / diff_sec) )
-        snapshot[pg_id]['tx_bps'] = (0.5 * prev_tx_bps) + (0.5 * ( (now_tx_bytes - prev_tx_bytes) * 8 / diff_sec) )
+        if not (now_tx_pkts >= prev_tx_pkts):
+            print "CURRENT:\n"
+            pprint.pprint(pg_current)
+            print "PREV:\n"
+            pprint.pprint(pg_prev)
+            assert(now_tx_pkts > prev_tx_pkts)
+
+        pg_current['tx_pps'] = (0.5 * prev_tx_pps) + (0.5 * ( (now_tx_pkts - prev_tx_pkts) / diff_sec) )
+        pg_current['tx_bps'] = (0.5 * prev_tx_bps) + (0.5 * ( (now_tx_bytes - prev_tx_bytes) * 8 / diff_sec) )
+
+        pg_current['rx_pps'] = 0.0
+        pg_current['rx_bps'] = 0.0
+
+        pg_current['tx_bps_L1'] = self.bps_L1(pg_current['tx_bps'], pg_current['tx_pps'])
 
-        snapshot[pg_id]['rx_pps'] = 0.0
-        snapshot[pg_id]['rx_bps'] = 0.0
 
-        snapshot[pg_id]['tx_bps_L1'] = self.bps_L1(snapshot[pg_id]['tx_bps'], snapshot[pg_id]['tx_pps'])
 
 
+    def _update (self, snapshot):
+
+        # generate a new snapshot
+        new_snapshot = self.process_snapshot(snapshot, self.latest_stats)
+
+        #print new_snapshot
+        # advance
+        self.latest_stats = new_snapshot
+
+
+        return True
+      
+
+
+    # for API
     def get_stats (self):
         stats = {}
 
-        for pg_id in self.get_streams_keys():
-            stats[pg_id] = {}
+        for pg_id, value in self.latest_stats.iteritems():
+            # skip non ints
+            try:
+                int(pg_id)
+            except ValueError:
+                continue
+
+            stats[int(pg_id)] = {}
             for field in ['tx_pkts', 'tx_bytes', 'rx_pkts']:
-                stats[pg_id][field] = {'total': self.get_rel([pg_id, field, 'total'])}
+                stats[int(pg_id)][field] = {'total': self.get_rel([pg_id, field, 'total'])}
 
-                for port, pv in self.reference_stats[pg_id][field].iteritems():
-                    stats[pg_id][field][port] = self.get_rel([pg_id, field, port])
+                for port, pv in value[field].iteritems():
+                    try:
+                        int(port)
+                    except ValueError:
+                        continue
+                    stats[int(pg_id)][field][int(port)] = self.get_rel([pg_id, field, port])
 
         return stats
 
-    
+
 
     def get_streams_keys (self):
         keys = []
-        for user_id, user_id_data in self.reference_stats.iteritems():
+        for key in self.latest_stats.keys():
             # ignore non user ID keys
             try:
-                keys.append(int(user_id))
+                int(key)
+                keys.append(key)
             except ValueError:
                 continue
 
@@ -860,11 +954,9 @@ class CRxStats(CTRexStats):
 
     def generate_stats (self):
 
-        stats = self.get_stats()
-        pg_ids = stats.keys()[:4]
+        pg_ids = self.get_streams_keys()
         cnt = len(pg_ids)
 
-        
         formatted_stats = OrderedDict([ ('Tx pps',  []),
                                         ('Tx bps L2',      []),
                                         ('Tx bps L1',      []),
@@ -911,3 +1003,4 @@ class CRxStats(CTRexStats):
 
 if __name__ == "__main__":
     pass
+
index 4e05de8..b2714b1 100644 (file)
@@ -630,8 +630,12 @@ bool CFlowStatRuleMgr::dump_json(std::string & json, bool force_sync) {
 
     root["name"] = "flow_stats";
     root["type"] = 0;
-    Json::Value &data_section = root["data"];
     
+    if (force_sync) {
+        root["sync"] = true;
+    }
+
+    Json::Value &data_section = root["data"];
     data_section["ts"]["value"] = Json::Value::UInt64(os_get_hr_tick_64());
     data_section["ts"]["freq"] = Json::Value::UInt64(os_get_hr_freq());
 
index 3f53f83..2d087c8 100644 (file)
@@ -2307,7 +2307,7 @@ public:
 public:
     void Dump(FILE *fd,DumpFormat mode);
     void DumpAllPorts(FILE *fd);
-    void dump_json(std::string & json);
+    void dump_json(std::string & json, bool force_sync);
 private:
     std::string get_field(std::string name,float &f);
     std::string get_field(std::string name,uint64_t &f);
@@ -2341,8 +2341,15 @@ std::string CGlobalStats::get_field_port(int port,std::string name,uint64_t &f){
 }
 
 
-void CGlobalStats::dump_json(std::string & json){
-    json="{\"name\":\"trex-global\",\"type\":0,\"data\":{";
+void CGlobalStats::dump_json(std::string & json, bool force_sync){
+    /* refactor this to JSON */
+
+    json="{\"name\":\"trex-global\",\"type\":0,";
+    if (force_sync) {
+        json += "\"sync\": true,";
+    }
+
+    json +="\"data\":{";
 
 #define GET_FIELD(f) get_field(std::string(#f),f)
 #define GET_FIELD_PORT(p,f) get_field_port(p,std::string(#f),lp->f)
@@ -3565,7 +3572,7 @@ CGlobalTRex::publish_async_data(bool sync_now) {
         get_stats(m_stats);
     }
 
-    m_stats.dump_json(json);
+    m_stats.dump_json(json, sync_now);
     m_zmq_publisher.publish_json(json);
 
     /* generator json , all cores are the same just sample the first one */