self.connected = True
- rc = self.barrier()
+ # sync all stats data as a baseline from the server
+ rc = self.barrier(baseline = True)
if not rc:
self.disconnect()
return rc
name = msg['name']
data = msg['data']
type = msg['type']
- sync = msg.get('sync', False)
+ baseline = msg.get('baseline', False)
self.raw_snapshot[name] = data
- self.__dispatch(name, type, data, sync)
+ self.__dispatch(name, type, data, baseline)
# closing of socket must be from the same thread
return self.raw_snapshot
# dispatch the message to the right place
- def __dispatch (self, name, type, data, sync):
+ def __dispatch (self, name, type, data, baseline):
+
# stats
if name == "trex-global":
- self.event_handler.handle_async_stats_update(data, sync)
+ self.event_handler.handle_async_stats_update(data, baseline)
# events
elif name == "trex-event":
self.handle_async_barrier(type, data)
elif name == "flow_stats":
- self.event_handler.handle_async_rx_stats_event(data, sync)
+ self.event_handler.handle_async_rx_stats_event(data, baseline)
else:
pass
# block on barrier for async channel
- def barrier(self, timeout = 5):
+ def barrier(self, timeout = 5, baseline = False):
# set a random key
key = random.getrandbits(32)
while not self.async_barrier['ack']:
# inject
- rc = self.stateless_client._transmit("publish_now", params = {'key' : key})
+ rc = self.stateless_client._transmit("publish_now", params = {'key' : key, 'baseline': baseline})
if not rc:
return rc
pass
- def handle_async_rx_stats_event (self, data, sync):
- self.client.flow_stats.update(data, sync)
+ def handle_async_rx_stats_event (self, data, baseline):
+ self.client.flow_stats.update(data, baseline)
# handles an async stats update from the subscriber
- def handle_async_stats_update(self, dump_data, sync):
+ def handle_async_stats_update(self, dump_data, baseline):
global_stats = {}
port_stats = {}
global_stats[key] = value
# update the general object with the snapshot
- self.client.global_stats.update(global_stats, sync)
+ self.client.global_stats.update(global_stats, baseline)
# update all ports
for port_id, data in port_stats.iteritems():
- self.client.ports[port_id].port_stats.update(data, sync)
+ self.client.ports[port_id].port_stats.update(data, baseline)
# dispatcher for server async events (port started, port stopped and etc.)
self.last_update_ts = time.time()
self.history = deque(maxlen = 10)
self.lock = threading.Lock()
- self.is_synced = False
+ self.has_baseline = False
######## abstract methods ##########
raise NotImplementedError()
# called when a snapshot arrives - add more fields
- def _update (self, snapshot, sync):
+ def _update (self, snapshot, baseline):
raise NotImplementedError()
######## END abstract methods ##########
- def update(self, snapshot, sync):
+ def update(self, snapshot, baseline):
- if not self.is_synced and not sync:
+ if not self.has_baseline and not baseline:
return
rc = self._update(snapshot)
return
# sync one time
- if not self.is_synced and sync:
+ if not self.has_baseline and baseline:
self.reference_stats = copy.deepcopy(self.latest_stats)
- self.is_synced = True
+ self.has_baseline = True
# save history
with self.lock:
}
// return false if no counters changed since last run. true otherwise
-bool CFlowStatRuleMgr::dump_json(std::string & json, bool force_sync) {
+bool CFlowStatRuleMgr::dump_json(std::string & json, bool baseline) {
uint64_t rx_stats[MAX_FLOW_STATS];
tx_per_flow_t tx_stats[MAX_FLOW_STATS];
Json::FastWriter writer;
root["name"] = "flow_stats";
root["type"] = 0;
- if (force_sync) {
- root["sync"] = true;
+ if (baseline) {
+ root["baseline"] = true;
}
Json::Value &data_section = root["data"];
data_section["ts"]["freq"] = Json::Value::UInt64(os_get_hr_freq());
if (m_user_id_map.is_empty()) {
- if (force_sync) {
+ if (baseline) {
json = writer.write(root);
return true;
} else
}
for (uint8_t port = 0; port < m_num_ports; port++) {
std::string str_port = static_cast<std::ostringstream*>( &(std::ostringstream() << int(port) ) )->str();
- if (user_id_info->need_to_send_rx(port) || force_sync) {
+ if (user_id_info->need_to_send_rx(port) || baseline) {
user_id_info->set_no_need_to_send_rx(port);
data_section[str_user_id]["rx_pkts"][str_port] = Json::Value::UInt64(user_id_info->get_rx_counter(port));
}
- if (user_id_info->need_to_send_tx(port) || force_sync) {
+ if (user_id_info->need_to_send_tx(port) || baseline) {
user_id_info->set_no_need_to_send_tx(port);
data_section[str_user_id]["tx_pkts"][str_port] = Json::Value::UInt64(user_id_info->get_tx_counter(port).get_pkts());
data_section[str_user_id]["tx_bytes"][str_port] = Json::Value::UInt64(user_id_info->get_tx_counter(port).get_bytes());
int start_stream(TrexStream * stream, uint16_t &ret_hw_id);
int stop_stream(const TrexStream * stream);
int get_active_pgids(flow_stat_active_t &result);
- bool dump_json(std::string & json, bool force_sync);
+ bool dump_json(std::string & json, bool baseline);
private:
int compile_stream(const TrexStream * stream, Cxl710Parser &parser);
virtual void get_interface_info(uint8_t interface_id, intf_info_st &info) const = 0;
- virtual void publish_async_data_now(uint32_t key) const = 0;
+ virtual void publish_async_data_now(uint32_t key, bool baseline) const = 0;
virtual uint8_t get_dp_core_count() const = 0;
virtual void get_interface_stat_info(uint8_t interface_id, uint16_t &num_counters, uint16_t &capabilities) const =0;
virtual int get_flow_stats(uint8_t port_id, uint64_t *stats, void *tx_stats, int min, int max, bool reset) const = 0;
void get_interface_info(uint8_t interface_id, intf_info_st &info) const;
- void publish_async_data_now(uint32_t key) const;
+ void publish_async_data_now(uint32_t key, bool baseline) const;
uint8_t get_dp_core_count() const;
void get_interface_stat_info(uint8_t interface_id, uint16_t &num_counters, uint16_t &capabilities) const;
int get_flow_stats(uint8_t port_id, uint64_t *stats, void *tx_stats, int min, int max, bool reset) const;
}
}
- virtual void publish_async_data_now(uint32_t key) const {
+ virtual void publish_async_data_now(uint32_t key, bool baseline) const {
}
virtual int get_flow_stats(uint8_t port_id, uint64_t *stats, void *tx_stats, int min, int max, bool reset) const {return 0;};
public:
void Dump(FILE *fd,DumpFormat mode);
void DumpAllPorts(FILE *fd);
- void dump_json(std::string & json, bool force_sync);
+ void dump_json(std::string & json, bool baseline);
private:
std::string get_field(std::string name,float &f);
std::string get_field(std::string name,uint64_t &f);
}
-void CGlobalStats::dump_json(std::string & json, bool force_sync){
+void CGlobalStats::dump_json(std::string & json, bool baseline){
/* refactor this to JSON */
json="{\"name\":\"trex-global\",\"type\":0,";
- if (force_sync) {
- json += "\"sync\": true,";
+ if (baseline) {
+ json += "\"baseline\": true,";
}
json +="\"data\":{";
public:
- void publish_async_data(bool sync_now);
+ void publish_async_data(bool sync_now, bool baseline = false);
void publish_async_barrier(uint32_t key);
void dump_stats(FILE *fd,
}
void
-CGlobalTRex::publish_async_data(bool sync_now) {
+CGlobalTRex::publish_async_data(bool sync_now, bool baseline) {
std::string json;
/* refactor to update, dump, and etc. */
get_stats(m_stats);
}
- m_stats.dump_json(json, sync_now);
+ m_stats.dump_json(json, baseline);
m_zmq_publisher.publish_json(json);
/* generator json , all cores are the same just sample the first one */
m_zmq_publisher.publish_json(json);
if (get_is_stateless()) {
- if (m_trex_stateless->m_rx_flow_stat.dump_json(json, sync_now))
+ if (m_trex_stateless->m_rx_flow_stat.dump_json(json, baseline))
m_zmq_publisher.publish_json(json);
}
}
}
void
-TrexDpdkPlatformApi::publish_async_data_now(uint32_t key) const {
- g_trex.publish_async_data(true);
+TrexDpdkPlatformApi::publish_async_data_now(uint32_t key, bool baseline) const {
+ g_trex.publish_async_data(true, baseline);
g_trex.publish_async_barrier(key);
}
TrexRpcPublishNow::_run(const Json::Value ¶ms, Json::Value &result) {
TrexStateless *main = get_stateless_obj();
- uint32_t key = parse_uint32(params, "key", result);
+ uint32_t key = parse_uint32(params, "key", result);
+ bool baseline = parse_bool(params, "baseline", result);
- main->get_platform_api()->publish_async_data_now(key);
+ main->get_platform_api()->publish_async_data_now(key, baseline);
result["result"] = Json::objectValue;
return (TREX_RPC_CMD_OK);
* general cmds
*/
TREX_RPC_CMD_DEFINE(TrexRpcCmdPing, "ping", 0, false);
-TREX_RPC_CMD_DEFINE(TrexRpcPublishNow, "publish_now", 1, false);
+TREX_RPC_CMD_DEFINE(TrexRpcPublishNow, "publish_now", 2, false);
TREX_RPC_CMD_DEFINE(TrexRpcCmdGetCmds, "get_supported_cmds", 0, false);
TREX_RPC_CMD_DEFINE(TrexRpcCmdGetVersion, "get_version", 0, false);
TREX_RPC_CMD_DEFINE(TrexRpcCmdGetActivePGIds, "get_active_pgids",0, false);