TX barrier
authorimarom <[email protected]>
Wed, 2 Mar 2016 09:05:51 +0000 (11:05 +0200)
committerimarom <[email protected]>
Wed, 2 Mar 2016 11:35:09 +0000 (13:35 +0200)
21 files changed:
scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py
scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py
src/gtest/trex_stateless_gtest.cpp
src/internal_api/trex_platform_api.h
src/main_dpdk.cpp
src/rpc-server/trex_rpc_req_resp_server.cpp
src/rpc-server/trex_rpc_req_resp_server.h
src/rpc-server/trex_rpc_server.cpp
src/rpc-server/trex_rpc_server_api.h
src/sim/trex_sim.h
src/sim/trex_sim_stateless.cpp
src/stateless/cp/trex_dp_port_events.cpp
src/stateless/cp/trex_dp_port_events.h
src/stateless/cp/trex_stateless.cpp
src/stateless/cp/trex_stateless.h
src/stateless/cp/trex_stateless_port.cpp
src/stateless/cp/trex_stateless_port.h
src/stateless/dp/trex_stateless_dp_core.cpp
src/stateless/dp/trex_stateless_dp_core.h
src/stateless/messaging/trex_stateless_messaging.cpp
src/stateless/messaging/trex_stateless_messaging.h

index 5056685..04dd77e 100644 (file)
@@ -227,7 +227,7 @@ class AsyncEventHandler(object):
             ev = "Port {0} job done".format(port_id)
 
             # call the handler
-            self.__async_event_port_stopped(port_id)
+            self.__async_event_port_job_done(port_id)
             show_event = True
 
         # port was stolen...
@@ -264,6 +264,9 @@ class AsyncEventHandler(object):
 
     # private functions
 
+    def __async_event_port_job_done (self, port_id):
+        self.client.ports[port_id].async_event_port_job_done()
+
     def __async_event_port_stopped (self, port_id):
         self.client.ports[port_id].async_event_port_stopped()
 
@@ -1332,7 +1335,6 @@ class STLClient(object):
         :parameters:
             ports : list
                 ports to execute the command
-                
 
         :raises:
             + :exc:`STLError`
@@ -1354,7 +1356,6 @@ class STLClient(object):
         if not rc:
             raise STLError(rc)
 
-
         
     """
         update traffic on port(s)
@@ -1433,8 +1434,6 @@ class STLClient(object):
         if not rc:
             raise STLError(rc)
 
-              
-    
     """
         resume traffic on port(s)
 
index 6aa1884..4529efa 100644 (file)
@@ -334,7 +334,7 @@ class Port(object):
                   "mul":      mul,
                   "duration": duration,
                   "force":    force}
-
+        
         rc = self.transmit("start_traffic", params)
         if rc.bad():
             return self.err(rc.err())
@@ -363,7 +363,6 @@ class Port(object):
         if rc.bad():
             return self.err(rc.err())
 
-        # only valid state after stop
         self.state = self.STATE_STREAMS
 
         return self.ok()
@@ -383,7 +382,6 @@ class Port(object):
         if rc.bad():
             return self.err(rc.err())
 
-        # only valid state after stop
         self.state = self.STATE_PAUSE
 
         return self.ok()
@@ -400,11 +398,12 @@ class Port(object):
         params = {"handler": self.handler,
                   "port_id": self.port_id}
 
+        # only valid state after stop
+
         rc = self.transmit("resume_traffic", params)
         if rc.bad():
             return self.err(rc.err())
 
-        # only valid state after stop
         self.state = self.STATE_TX
 
         return self.ok()
@@ -591,21 +590,26 @@ class Port(object):
     
 
 
-    ################# events handler ######################
-    def async_event_port_stopped (self):
+  ################# events handler ######################
+    def async_event_port_job_done (self):
         self.state = self.STATE_STREAMS
 
-
-    def async_event_port_started (self):
-        self.state = self.STATE_TX
-
+    # rest of the events are used for TUI / read only sessions
+    def async_event_port_stopped (self):
+        if not self.is_acquired():
+            self.state = self.STATE_STREAMS
 
     def async_event_port_paused (self):
-        self.state = self.STATE_PAUSE
+        if not self.is_acquired():
+            self.state = self.STATE_PAUSE
 
+    def async_event_port_started (self):
+        if not self.is_acquired():
+            self.state = self.STATE_TX
 
     def async_event_port_resumed (self):
-        self.state = self.STATE_TX
+        if not self.is_acquired():
+            self.state = self.STATE_TX
 
     def async_event_forced_acquired (self):
         self.handler = None
index 4cc40cd..c3dfcb9 100644 (file)
@@ -3206,7 +3206,6 @@ public:
         /* first the message must be an event */
         TrexDpPortEventMsg *event = dynamic_cast<TrexDpPortEventMsg *>(msg);
         EXPECT_TRUE(event != NULL);
-        EXPECT_TRUE(event->get_event_type() == TrexDpPortEvent::EVENT_STOP);
 
         EXPECT_TRUE(event->get_event_id() == m_event_id);
         EXPECT_TRUE(event->get_port_id() == 0);
index 7f7ca21..f6d7278 100644 (file)
@@ -148,6 +148,7 @@ public:
     virtual int del_rx_flow_stat_rule(uint8_t port_id, uint8_t type, uint16_t proto, uint16_t id) const = 0;
     virtual void set_promiscuous(uint8_t port_id, bool enabled) const = 0;
     virtual bool get_promiscuous(uint8_t port_id) const = 0;
+    virtual void flush_dp_messages() const = 0;
 
     virtual ~TrexPlatformApi() {}
 };
@@ -176,6 +177,7 @@ public:
     int del_rx_flow_stat_rule(uint8_t port_id, uint8_t type, uint16_t proto, uint16_t id) const;
     void set_promiscuous(uint8_t port_id, bool enabled) const;
     bool get_promiscuous(uint8_t port_id) const;
+    void flush_dp_messages() const;
 };
 
 
@@ -234,6 +236,9 @@ public:
         return false;
     }
 
+    void flush_dp_messages() const {
+    }
+
 private:
     int m_dp_core_count;
 };
index af6efe1..3404d6b 100644 (file)
@@ -2599,9 +2599,9 @@ private:
     /* send message to all dp cores */
     int  send_message_all_dp(TrexStatelessCpToDpMsgBase *msg);
     void check_for_dp_message_from_core(int thread_id);
-    void check_for_dp_messages();
-
+    
 public:
+    void check_for_dp_messages();
     int start_master_statefull();
     int start_master_stateless();
     int run_in_core(virtual_thread_id_t virt_core_id);
@@ -2640,7 +2640,7 @@ private:
 
 public:
 
-    void publish_async_data();
+    void publish_async_data(bool sync_now);
     void publish_async_barrier(uint32_t key);
 
     void dump_stats(FILE *fd,
@@ -2686,9 +2686,11 @@ private:
     CLatencyPktInfo     m_latency_pkt;
     TrexPublisher       m_zmq_publisher;
     CGlobalStats        m_stats;
+    std::mutex          m_cp_lock;
 
 public:
     TrexStateless       *m_trex_stateless;
+     
 };
 
 int  CGlobalTRex::reset_counters(){
@@ -2738,6 +2740,7 @@ CGlobalTRex::check_for_dp_message_from_core(int thread_id) {
  */
 void 
 CGlobalTRex::check_for_dp_messages() {
+
     /* for all the cores - check for a new message */
     for (int i = 0; i < get_cores_tx(); i++) {
         check_for_dp_message_from_core(i);
@@ -3070,10 +3073,10 @@ bool CGlobalTRex::Create(){
 
        cfg.m_port_count         = CGlobalInfo::m_options.m_expected_portd;
        cfg.m_rpc_req_resp_cfg   = &rpc_req_resp_cfg;
-       cfg.m_rpc_async_cfg      = NULL;
        cfg.m_rpc_server_verbose = false;
        cfg.m_platform_api       = new TrexDpdkPlatformApi();
        cfg.m_publisher          = &m_zmq_publisher;
+       cfg.m_global_lock        = &m_cp_lock;
 
        m_trex_stateless = new TrexStateless(cfg);
    }
@@ -3531,9 +3534,15 @@ void CGlobalTRex::dump_stats(FILE *fd, CGlobalStats::DumpFormat format){
 
 
 void 
-CGlobalTRex::publish_async_data() {
+CGlobalTRex::publish_async_data(bool sync_now) {
      std::string json;
 
+     /* refactor to update, dump, and etc. */
+     if (sync_now) {
+         update_stats();
+         get_stats(m_stats);
+     }
+
      m_stats.dump_json(json);
      m_zmq_publisher.publish_json(json);
 
@@ -3572,7 +3581,7 @@ CGlobalTRex::publish_async_barrier(uint32_t key) {
 }
 
 int CGlobalTRex::run_in_master() {
-
+    
    
     bool was_stopped=false;
 
@@ -3580,6 +3589,9 @@ int CGlobalTRex::run_in_master() {
         m_trex_stateless->launch_control_plane();
     }
 
+    /* exception and scope safe */
+    std::unique_lock<std::mutex> cp_lock(m_cp_lock);
+
     while ( true ) {
 
         if ( CGlobalInfo::m_options.preview.get_no_keyboard() ==false ){
@@ -3669,18 +3681,23 @@ int CGlobalTRex::run_in_master() {
         }
 
         /* publish data */
-        publish_async_data();
+        publish_async_data(false);
 
         /* check from messages from DP */
         check_for_dp_messages();
 
+        cp_lock.unlock();
         delay(500);
+        cp_lock.lock();
 
         if ( is_all_cores_finished() ) {
             break;
         }
     }
 
+    /* on exit release the lock */
+    cp_lock.unlock();
+
     if (!is_all_cores_finished()) {
         /* probably CLTR-C */
         try_stop_all_dp();
@@ -5177,7 +5194,7 @@ TrexDpdkPlatformApi::get_interface_info(uint8_t interface_id, intf_info_st &info
 
 void
 TrexDpdkPlatformApi::publish_async_data_now(uint32_t key) const {
-    g_trex.publish_async_data();
+    g_trex.publish_async_data(true);
     g_trex.publish_async_barrier(key);
 }
 
@@ -5213,4 +5230,6 @@ bool TrexDpdkPlatformApi::get_promiscuous(uint8_t port_id) const {
     return g_trex.m_ports[port_id].get_promiscuous();
 }
 
-
+void TrexDpdkPlatformApi::flush_dp_messages() const {
+    g_trex.check_for_dp_messages();
+}
index da7e8c5..5c587e0 100644 (file)
@@ -173,10 +173,8 @@ void TrexRpcServerReqRes::process_request_raw(const std::string &request, std::s
 
     int index = 0;
 
-    /* if lock was provided, take it  */
-    if (m_lock) {
-        m_lock->lock();
-    }
+    /* expcetion safe */
+    std::unique_lock<std::mutex> lock(*m_lock);
 
     /* for every command parsed - launch it */
     for (auto command : commands) {
@@ -190,9 +188,7 @@ void TrexRpcServerReqRes::process_request_raw(const std::string &request, std::s
     }
 
     /* done with the lock */
-    if (m_lock) {
-        m_lock->unlock();
-    }
+    lock.unlock();
 
     /* write the JSON to string and sever on ZMQ */
 
@@ -254,28 +250,3 @@ TrexRpcServerReqRes::test_inject_request(const std::string &req) {
     return response;
 }
 
-
-/**
- * MOCK req resp server
- */
-TrexRpcServerReqResMock::TrexRpcServerReqResMock(const TrexRpcServerConfig &cfg) : TrexRpcServerReqRes(cfg) {
-}
-
-/**
- * override start
- * 
- */
-void
-TrexRpcServerReqResMock::start() {
-
-}
-
-
-/**
- * override stop
- */
-void
-TrexRpcServerReqResMock::stop() {
-
-}
-
index 979bf9a..26b3248 100644 (file)
@@ -55,7 +55,6 @@ protected:
     void               *m_socket;
 };
 
-
 /**
  * a mock req resp server (for tests)
  * 
@@ -73,5 +72,6 @@ public:
 
 };
 
+
 #endif /* __TREX_RPC_REQ_RESP_API_H__ */
 
index 1dfc449..7d2e31a 100644 (file)
@@ -33,6 +33,9 @@ limitations under the License.
 TrexRpcServerInterface::TrexRpcServerInterface(const TrexRpcServerConfig &cfg, const std::string &name, std::mutex *lock) : m_cfg(cfg), m_name(name), m_lock(lock)  {
     m_is_running = false;
     m_is_verbose = false;
+    if (m_lock == NULL) {
+        m_lock = &m_dummy_lock;
+    }
 }
 
 TrexRpcServerInterface::~TrexRpcServerInterface() {
@@ -117,7 +120,6 @@ get_current_date_time() {
 const std::string TrexRpcServer::s_server_uptime = get_current_date_time();
 
 TrexRpcServer::TrexRpcServer(const TrexRpcServerConfig *req_resp_cfg,
-                             const TrexRpcServerConfig *async_cfg,
                              std::mutex *lock) {
 
     m_req_resp = NULL;
@@ -134,10 +136,6 @@ TrexRpcServer::TrexRpcServer(const TrexRpcServerConfig *req_resp_cfg,
         m_servers.push_back(m_req_resp);
     }
     
-    /* add async publisher */
-    if (async_cfg) {
-        m_servers.push_back(new TrexRpcServerAsync(*async_cfg, lock));
-    }
 }
 
 TrexRpcServer::~TrexRpcServer() {
@@ -187,3 +185,27 @@ TrexRpcServer::test_inject_request(const std::string &req_str) {
         return "";
     }
 }
+
+/**
+ * MOCK req resp server
+ */
+TrexRpcServerReqResMock::TrexRpcServerReqResMock(const TrexRpcServerConfig &cfg) : TrexRpcServerReqRes(cfg) {
+}
+
+/**
+ * override start
+ * 
+ */
+void
+TrexRpcServerReqResMock::start() {
+
+}
+
+
+/**
+ * override stop
+ */
+void
+TrexRpcServerReqResMock::stop() {
+
+}
index 1ab5dce..a02b2cc 100644 (file)
@@ -133,6 +133,7 @@ protected:
     std::thread                          *m_thread;
     std::string                          m_name;
     std::mutex                           *m_lock;
+    std::mutex                           m_dummy_lock;
 };
 
 /**
@@ -147,7 +148,6 @@ public:
   
     /* creates the collection of servers using configurations */
     TrexRpcServer(const TrexRpcServerConfig *req_resp_cfg,
-                  const TrexRpcServerConfig *async_cfg,
                   std::mutex *m_lock = NULL);
 
     ~TrexRpcServer();
index 3a3a62e..59184b7 100644 (file)
@@ -77,7 +77,6 @@ public:
     
         cfg.m_port_count         = 2;
         cfg.m_rpc_req_resp_cfg   = NULL;
-        cfg.m_rpc_async_cfg      = NULL;
         cfg.m_rpc_server_verbose = false;
         cfg.m_platform_api       = new SimPlatformApi(1);
         cfg.m_publisher          = NULL;
index 30d60b1..87c61ae 100644 (file)
@@ -186,7 +186,6 @@ SimStateless::prepare_control_plane() {
 
     cfg.m_port_count         = m_port_count;
     cfg.m_rpc_req_resp_cfg   = &rpc_req_resp_cfg;
-    cfg.m_rpc_async_cfg      = NULL;
     cfg.m_rpc_server_verbose = false;
     cfg.m_platform_api       = new SimPlatformApi(m_dp_core_count);
     cfg.m_publisher          = m_publisher;
index ba327e5..8e098ad 100644 (file)
@@ -20,6 +20,7 @@ limitations under the License.
 */
 
 #include <trex_dp_port_events.h>
+#include <trex_stateless_messaging.h>
 #include <sstream>
 #include <os_time.h>
 #include <trex_stateless.h>
@@ -27,24 +28,20 @@ limitations under the License.
 /**
  * port events
  */
-void 
-TrexDpPortEvents::create(TrexStatelessPort *port) {
+TrexDpPortEvents::TrexDpPortEvents(TrexStatelessPort *port) {
     m_port = port;
-
-    for (int i = 0; i < TrexDpPortEvent::EVENT_MAX; i++) {
-        m_events[i].create((TrexDpPortEvent::event_e) i, port);
-    }
-
     m_event_id_counter = EVENT_ID_INVALID;
 }
 
-/**
- * generate a new event ID
- * 
- */
-int
-TrexDpPortEvents::generate_event_id() {
-    return (++m_event_id_counter);
+TrexDpPortEvent *
+TrexDpPortEvents::lookup(int event_id) {
+    auto search = m_events.find(event_id);
+
+    if (search != m_events.end()) {
+        return search->second;
+    } else {
+        return NULL;
+    }
 }
 
 /**
@@ -52,21 +49,49 @@ TrexDpPortEvents::generate_event_id() {
  * all other events will be disabled 
  * 
  */
-void
-TrexDpPortEvents::wait_for_event(TrexDpPortEvent::event_e ev, int event_id, int timeout_ms) {
+int
+TrexDpPortEvents::create_event(TrexDpPortEvent *event, int timeout_ms) {
+    /* allocate ID for event */
+    int event_id = ++m_event_id_counter;
 
-    /* first disable all events */
-    for (TrexDpPortEvent & e : m_events) {
-        e.disable();
-    }
+    /* init and add */
+    event->init(m_port, event_id, timeout_ms);
+    m_events[event_id] = event;
 
-    /* mark this event as allowed */
-    m_events[ev].wait_for_event(event_id, timeout_ms);
+    return event_id;
 }
 
 void 
-TrexDpPortEvents::disable(TrexDpPortEvent::event_e ev) {
-    m_events[ev].disable();
+TrexDpPortEvents::destroy_event(int event_id) {
+    TrexDpPortEvent *event = lookup(event_id);
+    if (!event) {
+        /* cannot find event */
+        throw TrexException("internal error - cannot find event");
+    }
+
+    m_events.erase(event_id);
+    delete event;
+}
+
+class DPBarrier : public TrexDpPortEvent {
+protected:
+    virtual void on_event() {
+        /* do nothing */
+    }
+};
+
+void
+TrexDpPortEvents::barrier() {
+    int barrier_id = create_event(new DPBarrier());
+
+    TrexStatelessCpToDpMsgBase *barrier_msg = new TrexStatelessDpBarrier(m_port->m_port_id, barrier_id);
+    m_port->send_message_to_all_dp(barrier_msg);
+
+    get_stateless_obj()->get_platform_api()->flush_dp_messages();
+    while (lookup(barrier_id) != NULL) {
+        delay(1);
+        get_stateless_obj()->get_platform_api()->flush_dp_messages();
+    }
 }
 
 /**
@@ -74,39 +99,33 @@ TrexDpPortEvents::disable(TrexDpPortEvent::event_e ev) {
  * 
  */
 void 
-TrexDpPortEvents::handle_event(TrexDpPortEvent::event_e ev, int thread_id, int event_id) {
-    m_events[ev].handle_event(thread_id, event_id);
-}
-
-/*********** 
- * single event object
- * 
- */
+TrexDpPortEvents::on_core_reporting_in(int event_id, int thread_id) {
+    TrexDpPortEvent *event = lookup(event_id);
+    /* event might have been deleted */
+    if (!event) {
+        return;
+    }
 
-void
-TrexDpPortEvent::create(event_e type, TrexStatelessPort *port) {
-    m_event_type = type;
-    m_port = port;
+    bool done = event->on_core_reporting_in(thread_id);
 
-    /* add the core ids to the hash */
-    m_signal.clear();
-    for (int core_id : m_port->get_core_id_list()) {
-        m_signal[core_id] = false;
+    if (done) {
+        destroy_event(event_id);
     }
-
-    /* event is disabled */
-    disable();
 }
 
 
-/**
- * wait the event using event id and timeout
+/***************************
+ * event
  * 
- */
-void
-TrexDpPortEvent::wait_for_event(int event_id, int timeout_ms) {
+ **************************/
+TrexDpPortEvent::TrexDpPortEvent() {
+    m_port = NULL;
+    m_event_id = -1;
+}
 
-    /* set a new event id */
+void 
+TrexDpPortEvent::init(TrexStatelessPort *port, int event_id, int timeout_ms) {
+    m_port = port;
     m_event_id = event_id;
 
     /* do we have a timeout ? */
@@ -118,103 +137,33 @@ TrexDpPortEvent::wait_for_event(int event_id, int timeout_ms) {
 
     /* prepare the signal array */
     m_pending_cnt = 0;
-    for (auto & core_pair : m_signal) {
-        core_pair.second = false;
+    for (int core_id : m_port->get_core_id_list()) {
+        m_signal[core_id] = false;
         m_pending_cnt++;
     }
 }
 
-void
-TrexDpPortEvent::disable() {
-    m_event_id = TrexDpPortEvents::EVENT_ID_INVALID;
-}
-
-/**
- * get the event status
- * 
- */
-
-TrexDpPortEvent::event_status_e
-TrexDpPortEvent::status() {
-
-    /* is it even active ? */
-    if (m_event_id == TrexDpPortEvents::EVENT_ID_INVALID) {
-        return (EVENT_DISABLE);
-    }
-
-    /* did it occured ? */
-    if (m_pending_cnt == 0) {
-        return (EVENT_OCCURED);
-    }
-
-    /* so we are enabled and the event did not occur - maybe we timed out ? */
-    if ( (m_expire_limit_ms > 0) && (os_get_time_msec() > m_expire_limit_ms) ) {
-        return (EVENT_TIMED_OUT);
-    }
-
-    /* so we are still waiting... */
-    return (EVENT_PENDING);
-
-}
-
-void
-TrexDpPortEvent::err(int thread_id, int event_id, const std::string &err_msg)  {
-    std::stringstream err;
-    err << "DP event '" << event_name(m_event_type)  << "' on thread id '" << thread_id << "' with key '" << event_id <<"' - ";
-}
-
-/**
- * event occured
- * 
- */
-void 
-TrexDpPortEvent::handle_event(int thread_id, int event_id) {
-
-    /* if the event is disabled - we don't care */
-    if (!is_active()) {
-        return;
-    }
-
-    /* check the event id is matching the required event - if not maybe its an old signal */
-    if (event_id != m_event_id) {
-        return;
-    }
-
+bool
+TrexDpPortEvent::on_core_reporting_in(int thread_id) {
     /* mark sure no double signal */
     if (m_signal.at(thread_id)) {
-        err(thread_id, event_id, "double signal");
+        std::stringstream err;
+        err << "double signal detected on event id: " << m_event_id;
+        throw TrexException(err.str());
 
-    } else {
-        /* mark */
-        m_signal.at(thread_id) = true;
-        m_pending_cnt--;
     }
 
+    /* mark */
+    m_signal.at(thread_id) = true;
+    m_pending_cnt--;
+
     /* event occured */
     if (m_pending_cnt == 0) {
-        m_port->on_dp_event_occured(m_event_type);
-        m_event_id = TrexDpPortEvents::EVENT_ID_INVALID;
+        on_event();
+        return true;
+    } else {
+        return false;
     }
 }
 
-bool
-TrexDpPortEvent::is_active() {
-    return (status() != EVENT_DISABLE);
-}
-
-bool 
-TrexDpPortEvent::has_timeout_expired() {
-    return (status() == EVENT_TIMED_OUT);
-}
-
-const char *
-TrexDpPortEvent::event_name(event_e type) {
-    switch (type) {
-    case EVENT_STOP:
-        return "DP STOP";
 
-    default:
-        throw TrexException("unknown event type");
-    }
-
-}
index 557e590..3b8c863 100644 (file)
@@ -25,95 +25,43 @@ limitations under the License.
 #include <string>
 
 class TrexStatelessPort;
+class TrexDpPortEvents;
 
 /**
- * describes a single DP event related to port
+ * interface class for DP events
  * 
- * @author imarom (18-Nov-15)
+ * @author imarom (29-Feb-16)
  */
 class TrexDpPortEvent {
-public:
-
-    enum event_e {
-        EVENT_STOP   = 1,
-        EVENT_MAX
-    };
-
-    /**
-     * status of the event for the port
-     */
-    enum event_status_e {
-        EVENT_DISABLE,
-        EVENT_PENDING,
-        EVENT_TIMED_OUT,
-        EVENT_OCCURED
-    };
-
-    /**
-     * init for the event
-     * 
-     */
-    void create(event_e type, TrexStatelessPort *port);
-
-    /**
-     * create a new pending event
-     * 
-     */
-    void wait_for_event(int event_id, int timeout_ms = -1);
-
-    /**
-     * mark event as not allowed to happen
-     * 
-     */
-    void disable();
-
-    /**
-     * get the event status
-     * 
-     */
-    event_status_e status();
-
-    /**
-     * event occured
-     * 
-     */
-    void handle_event(int thread_id, int event_id);
+    friend TrexDpPortEvents;
 
-    /**
-     * returns true if event is active
-     * 
-     */
-    bool is_active();
-
-    /**
-     * has timeout already expired ?
-     * 
-     */
-    bool has_timeout_expired();
-
-    /**
-     * generate error
-     * 
-     */
-    void err(int thread_id, int event_id, const std::string &err_msg);
+public:
+    TrexDpPortEvent();
+    virtual ~TrexDpPortEvent() {}
 
+protected:
     /**
-     * event to name
+     * what to do when an event has been completed (all cores 
+     * reported in 
      * 
+     * @author imarom (29-Feb-16)
      */
-    static const char * event_name(event_e type);
+    virtual void on_event() = 0;
 
+    TrexStatelessPort *get_port() {
+        return m_port;
+    }
 
 private:
+    void init(TrexStatelessPort *port, int event_id, int timeout_ms);
+    bool on_core_reporting_in(int thread_id);
 
-    event_e                        m_event_type;
     std::unordered_map<int, bool>  m_signal;
     int                            m_pending_cnt;
 
     TrexStatelessPort             *m_port;
     int                            m_event_id;
     int                            m_expire_limit_ms;
-    
 };
 
 /**
@@ -124,44 +72,39 @@ class TrexDpPortEvents {
 public:
     friend class TrexDpPortEvent;
 
-    void create(TrexStatelessPort *port);
+    static const int INVALID_ID = -1;
 
-    /**
-     * generate a new event ID to be used with wait_for_event
-     * 
-     */
-    int generate_event_id();
+    TrexDpPortEvents(TrexStatelessPort *port);
 
     /**
      * wait a new DP event on the port 
      * returns a key which will be used to identify 
      * the event happened 
      * 
-     * @author imarom (18-Nov-15)
-     * 
-     * @param ev - type of event
-     * @param event_id - a unique identifier for the event
-     * @param timeout_ms - does it has a timeout ?
-     * 
      */
-    void wait_for_event(TrexDpPortEvent::event_e ev, int event_id, int timeout_ms = -1);
+    int create_event(TrexDpPortEvent *event, int timeout_ms = -1);
 
     /**
-     * disable an event (don't care)
+     * destroy an event
      * 
      */
-    void disable(TrexDpPortEvent::event_e ev);
+    void destroy_event(int event_id);
 
     /**
-     * event has occured
-     * 
+     * return when all DP cores have responsed on a barrier
+     */
+    void barrier();
+
+    /**
+     * a core has reached the event 
      */
-    void handle_event(TrexDpPortEvent::event_e ev, int thread_id, int event_id);
+    void on_core_reporting_in(int event_id, int thread_id);
 
 private:
-    static const int EVENT_ID_INVALID = -1;
+    TrexDpPortEvent *lookup(int event_id);
 
-    TrexDpPortEvent m_events[TrexDpPortEvent::EVENT_MAX];
+    static const int EVENT_ID_INVALID = -1;
+    std::unordered_map<int, TrexDpPortEvent *> m_events;
     int m_event_id_counter;
 
     TrexStatelessPort *m_port;
index a452283..9e24802 100644 (file)
@@ -40,7 +40,7 @@ TrexStateless::TrexStateless(const TrexStatelessCfg &cfg) {
     /* create RPC servers */
 
     /* set both servers to mutex each other */
-    m_rpc_server = new TrexRpcServer(cfg.m_rpc_req_resp_cfg, cfg.m_rpc_async_cfg, &m_global_cp_lock);
+    m_rpc_server = new TrexRpcServer(cfg.m_rpc_req_resp_cfg, cfg.m_global_lock);
     m_rpc_server->set_verbose(cfg.m_rpc_server_verbose);
 
     /* configure ports */
index cc47da6..6e5e0c4 100644 (file)
@@ -92,18 +92,19 @@ public:
     TrexStatelessCfg() {
         m_port_count          = 0;
         m_rpc_req_resp_cfg    = NULL;
-        m_rpc_async_cfg       = NULL;
         m_rpc_server_verbose  = false;
         m_platform_api        = NULL;
         m_publisher           = NULL;
+        m_global_lock         = NULL;
     }
 
     const TrexRpcServerConfig  *m_rpc_req_resp_cfg;
-    const TrexRpcServerConfig  *m_rpc_async_cfg;
     const TrexPlatformApi      *m_platform_api;
     bool                        m_rpc_server_verbose;
     uint8_t                     m_port_count;
     TrexPublisher              *m_publisher;
+    std::mutex                 *m_global_lock;
+
 };
 
 /**
@@ -186,7 +187,6 @@ protected:
 
     TrexPublisher                        *m_publisher;
 
-    std::mutex m_global_cp_lock;
 };
 
 /**
index c60b0e8..257af54 100644 (file)
@@ -48,11 +48,35 @@ port_id_to_cores(uint8_t port_id, std::vector<std::pair<uint8_t, uint8_t>> &core
 
 using namespace std;
 
+
+
+/***************************
+ * trex DP events handlers
+ * 
+ **************************/
+class AsyncStopEvent : public TrexDpPortEvent {
+
+protected:
+    /**
+     * when an async event occurs (all cores have reported in)
+     * 
+     * @author imarom (29-Feb-16)
+     */
+    virtual void on_event() {
+        get_port()->change_state(TrexStatelessPort::PORT_STATE_STREAMS);
+
+        get_port()->common_port_stop_actions(true);
+
+        assert(get_port()->m_pending_async_stop_event != TrexDpPortEvents::INVALID_ID);
+        get_port()->m_pending_async_stop_event = TrexDpPortEvents::INVALID_ID;
+    }
+};
+
 /***************************
  * trex stateless port
  * 
  **************************/
-TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api) {
+TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api) : m_dp_events(this) {
     std::vector<std::pair<uint8_t, uint8_t>> core_pair_list;
 
     m_port_id = port_id;
@@ -73,16 +97,20 @@ TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api
         m_cores_id_list.push_back(core_pair.first);
     }
 
-    /* init the events DP DB */
-    m_dp_events.create(this);
-
     m_graph_obj = NULL;
+
+    m_pending_async_stop_event = TrexDpPortEvents::INVALID_ID;
 }
 
 TrexStatelessPort::~TrexStatelessPort() {
     if (m_graph_obj) {
         delete m_graph_obj;
     }
+
+    if (m_pending_async_stop_event != TrexDpPortEvents::INVALID_ID) {
+        m_dp_events.destroy_event(m_pending_async_stop_event);
+        m_pending_async_stop_event = TrexDpPortEvents::INVALID_ID;
+    }
 }
 
 /**
@@ -170,16 +198,14 @@ TrexStatelessPort::start_traffic(const TrexPortMultiplier &mul, double duration,
     }
 
     /* generate a message to all the relevant DP cores to start transmitting */
-    int event_id = m_dp_events.generate_event_id();
-
-    /* mark that DP event of stoppped is possible */
-    m_dp_events.wait_for_event(TrexDpPortEvent::EVENT_STOP, event_id);
-
+    assert(m_pending_async_stop_event == TrexDpPortEvents::INVALID_ID);
+    m_pending_async_stop_event = m_dp_events.create_event(new AsyncStopEvent());
 
     /* update object status */
     m_factor = factor;
     m_last_all_streams_continues = compiled_objs[0]->get_all_streams_continues();
     m_last_duration = duration;
+
     change_state(PORT_STATE_TX);
 
     /* update the DP - messages will be freed by the DP */
@@ -188,17 +214,23 @@ TrexStatelessPort::start_traffic(const TrexPortMultiplier &mul, double duration,
 
         /* was the core assigned a compiled object ? */
         if (compiled_objs[index]) {
-            TrexStatelessCpToDpMsgBase *start_msg = new TrexStatelessDpStart(m_port_id, event_id, compiled_objs[index], duration);
+            TrexStatelessCpToDpMsgBase *start_msg = new TrexStatelessDpStart(m_port_id,
+                                                                             m_pending_async_stop_event,
+                                                                             compiled_objs[index],
+                                                                             duration);
             send_message_to_dp(core_id, start_msg);
         } else {
 
             /* mimic an end event */
-            m_dp_events.handle_event(TrexDpPortEvent::EVENT_STOP, core_id, event_id);
+            m_dp_events.on_core_reporting_in(m_pending_async_stop_event, core_id);
         }
 
         index++;
     }
     
+    /* for debug - this can be turn on */
+    //m_dp_events.barrier();
+
     /* update subscribers */    
     Json::Value data;
     data["port_id"] = m_port_id;
@@ -225,17 +257,23 @@ TrexStatelessPort::stop_traffic(void) {
     /* delete any previous graphs */
     delete_streams_graph();
 
-    /* mask out the DP stop event */
-    m_dp_events.disable(TrexDpPortEvent::EVENT_STOP);
+    /* to avoid race, first destroy any previous stop/pause events */
+    if (m_pending_async_stop_event != TrexDpPortEvents::INVALID_ID) {
+        m_dp_events.destroy_event(m_pending_async_stop_event);
+        m_pending_async_stop_event = TrexDpPortEvents::INVALID_ID;
 
+    }
+    
     /* generate a message to all the relevant DP cores to start transmitting */
     TrexStatelessCpToDpMsgBase *stop_msg = new TrexStatelessDpStop(m_port_id);
-
     send_message_to_all_dp(stop_msg);
 
-    /* continue to general actions */
+    /* a barrier - make sure all the DP cores stopped */
+    m_dp_events.barrier();
+
+    change_state(PORT_STATE_STREAMS);
+
     common_port_stop_actions(false);
-  
 }
 
 /**
@@ -243,14 +281,12 @@ TrexStatelessPort::stop_traffic(void) {
  * 
  */
 void
-TrexStatelessPort::common_port_stop_actions(bool event_triggered) {
+TrexStatelessPort::common_port_stop_actions(bool async) {
 
-    change_state(PORT_STATE_STREAMS);
-    
     Json::Value data;
     data["port_id"] = m_port_id;
 
-    if (event_triggered) {
+    if (async) {
         get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_FINISHED_TX, data);
     } else {
         get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_STOPPED, data);
@@ -274,17 +310,18 @@ TrexStatelessPort::pause_traffic(void) {
         throw TrexException(" pause is supported when duration is not enable is start command ");
     }
 
+    /* send a pause message */
     TrexStatelessCpToDpMsgBase *pause_msg = new TrexStatelessDpPause(m_port_id);
-
     send_message_to_all_dp(pause_msg);
 
-    change_state(PORT_STATE_PAUSE);
+    /* make sure all DP cores paused */
+    m_dp_events.barrier();
 
-    Json::Value data;
-    data["port_id"] = m_port_id;
-    get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_PAUSED, data);
+    /* change state */
+    change_state(PORT_STATE_PAUSE);
 }
 
+
 void
 TrexStatelessPort::resume_traffic(void) {
 
@@ -294,7 +331,6 @@ TrexStatelessPort::resume_traffic(void) {
     TrexStatelessCpToDpMsgBase *resume_msg = new TrexStatelessDpResume(m_port_id);
 
     send_message_to_all_dp(resume_msg);
-
     change_state(PORT_STATE_TX);
 
 
@@ -335,7 +371,6 @@ TrexStatelessPort::update_traffic(const TrexPortMultiplier &mul, bool force) {
     }
 
     TrexStatelessCpToDpMsgBase *update_msg = new TrexStatelessDpUpdate(m_port_id, factor);
-
     send_message_to_all_dp(update_msg);
 
     m_factor *= factor;
@@ -439,25 +474,6 @@ TrexStatelessPort::send_message_to_dp(uint8_t core_id, TrexStatelessCpToDpMsgBas
     ring->Enqueue((CGenNode *)msg);
 }
 
-/**
- * when a DP (async) event occurs - handle it
- * 
- */
-void 
-TrexStatelessPort::on_dp_event_occured(TrexDpPortEvent::event_e event_type) {
-    Json::Value data;
-
-    switch (event_type) {
-
-    case TrexDpPortEvent::EVENT_STOP:
-        common_port_stop_actions(true);
-        break;
-
-    default:
-        assert(0);
-
-    }
-}
 
 uint64_t
 TrexStatelessPort::get_port_speed_bps() const {
index 192d0d1..d3c4dcb 100644 (file)
@@ -102,13 +102,17 @@ private:
 };
 
 
+class AsyncStopEvent;
+
 /**
  * describes a stateless port
  * 
  * @author imarom (31-Aug-15)
  */
 class TrexStatelessPort {
-    friend class TrexDpPortEvent;
+    friend TrexDpPortEvents;
+    friend TrexDpPortEvent;
+    friend AsyncStopEvent;
 
 public:
 
@@ -363,18 +367,12 @@ private:
      */
     void send_message_to_dp(uint8_t core_id, TrexStatelessCpToDpMsgBase *msg);
 
-    /**
-     * triggered when event occurs
-     * 
-     */
-    void on_dp_event_occured(TrexDpPortEvent::event_e event_type);
-
 
     /**
      * when a port stops, perform various actions
      * 
      */
-    void common_port_stop_actions(bool event_triggered);
+    void common_port_stop_actions(bool async);
 
     /**
      * calculate effective M per core
@@ -421,6 +419,8 @@ private:
 
     /* owner information */
     TrexPortOwner       m_owner;
+
+    int m_pending_async_stop_event;
 };
 
 
index 0f578b9..19eface 100644 (file)
@@ -265,9 +265,9 @@ bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){
 }
 
 
-bool TrexStatelessDpPerPort::stop_traffic(uint8_t port_id,
-                                          bool stop_on_id, 
-                                          int event_id){
+bool TrexStatelessDpPerPort::stop_traffic(uint8_t  port_id,
+                                          bool     stop_on_id, 
+                                          int      event_id){
 
 
     if (m_state == TrexStatelessDpPerPort::ppSTATE_IDLE) {
@@ -829,9 +829,9 @@ TrexStatelessDpCore::update_traffic(uint8_t port_id, double factor) {
 
 
 void
-TrexStatelessDpCore::stop_traffic(uint8_t port_id,
-                                  bool stop_on_id, 
-                                  int event_id) {
+TrexStatelessDpCore::stop_traffic(uint8_t  port_id,
+                                  bool     stop_on_id, 
+                                  int      event_id) {
     /* we cannot remove nodes not from the top of the queue so
        for every active node - make sure next time
        the scheduler invokes it, it will be free */
@@ -843,20 +843,19 @@ TrexStatelessDpCore::stop_traffic(uint8_t port_id,
         //printf(" skip .. %f\n",m_core->m_cur_time_sec);
         return;
     }
-
-#if 0
-    if ( are_all_ports_idle() ) {
-        /* just a place holder if we will need to do somthing in that case */
-    }
-#endif
  
     /* inform the control plane we stopped - this might be a async stop
        (streams ended)
-     */
+    */
+    #if 0
+    if ( are_all_ports_idle() ) {
+        /* just a place holder if we will need to do somthing in that case */
+    }
+    #endif
+
     CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
     TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
                                                                    port_id,
-                                                                   TrexDpPortEvent::EVENT_STOP,
                                                                    lp_port->get_event_id());
     ring->Enqueue((CGenNode *)event_msg);
 
@@ -872,3 +871,12 @@ TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) {
     delete msg;
 }
 
+void
+TrexStatelessDpCore::barrier(uint8_t port_id, int event_id) {
+
+    CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
+    TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
+                                                                   port_id,
+                                                                   event_id);
+    ring->Enqueue((CGenNode *)event_msg);
+}
index 3d21465..cb102b8 100644 (file)
@@ -235,7 +235,8 @@ public:
         return (&m_ports[local_port_id]);
     }
         
-
+    /* simply sends a message back (acts as a barrier for previous messages) */
+    void barrier(uint8_t port_id, int event_id);
 
 private:
 
index 257de16..333aec8 100644 (file)
@@ -180,11 +180,29 @@ TrexStatelessDpUpdate::clone() {
     return new_msg;
 }
 
+/*************************
+  barrier message
+ ************************/
+
+bool
+TrexStatelessDpBarrier::handle(TrexStatelessDpCore *dp_core) {
+    dp_core->barrier(m_port_id, m_event_id);
+    return true;
+}
+
+TrexStatelessCpToDpMsgBase *
+TrexStatelessDpBarrier::clone() {
+
+    TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpBarrier(m_port_id, m_event_id);
+
+    return new_msg;
+}
+
 /************************* messages from DP to CP **********************/
 bool
 TrexDpPortEventMsg::handle() {
     TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(m_port_id);
-    port->get_dp_events().handle_event(m_event_type, m_thread_id, m_event_id);
+    port->get_dp_events().on_core_reporting_in(m_event_id, m_thread_id);
 
     return (true);
 }
index d56596b..dda086b 100644 (file)
@@ -145,7 +145,7 @@ public:
 
     TrexStatelessDpStop(uint8_t port_id) : m_port_id(port_id) {
         m_stop_only_for_event_id=false;
-        m_event_id=0;
+        m_event_id = 0;
         m_core = NULL;
     }
 
@@ -245,6 +245,26 @@ private:
     double   m_factor;
 };
 
+/**
+ * barrier message for DP core
+ * 
+ */
+class TrexStatelessDpBarrier : public TrexStatelessCpToDpMsgBase {
+public:
+
+    TrexStatelessDpBarrier(uint8_t port_id, int event_id) {
+        m_port_id  = port_id;
+        m_event_id = event_id;
+    }
+
+    virtual bool handle(TrexStatelessDpCore *dp_core);
+
+    virtual TrexStatelessCpToDpMsgBase * clone();
+
+private:
+    uint8_t m_port_id;
+    int     m_event_id;
+};
 
 /************************* messages from DP to CP **********************/
 
@@ -282,10 +302,9 @@ public:
 class TrexDpPortEventMsg : public TrexStatelessDpToCpMsgBase {
 public:
 
-    TrexDpPortEventMsg(int thread_id, uint8_t port_id, TrexDpPortEvent::event_e type, int event_id) {
+    TrexDpPortEventMsg(int thread_id, uint8_t port_id, int event_id) {
         m_thread_id  = thread_id;
         m_port_id    = port_id;
-        m_event_type = type;
         m_event_id   = event_id;
     }
 
@@ -299,10 +318,6 @@ public:
         return m_port_id;
     }
 
-    TrexDpPortEvent::event_e get_event_type() {
-        return m_event_type;
-    }
-
     int get_event_id() {
         return m_event_id;
     }
@@ -310,7 +325,6 @@ public:
 private:
     int                         m_thread_id;
     uint8_t                     m_port_id;
-    TrexDpPortEvent::event_e    m_event_type;
     int                         m_event_id;
     
 };