Added CP to RX messages
authorIdo Barnea <[email protected]>
Thu, 17 Mar 2016 14:10:11 +0000 (16:10 +0200)
committerIdo Barnea <[email protected]>
Tue, 22 Mar 2016 15:40:03 +0000 (17:40 +0200)
14 files changed:
linux/ws_main.py
linux_dpdk/ws_main.py
src/flow_stat.cpp
src/flow_stat.h
src/main_dpdk.cpp
src/msg_manager.cpp
src/msg_manager.h
src/stateless/cp/trex_stateless_port.cpp
src/stateless/cp/trex_stateless_port.h
src/stateless/dp/trex_stateless_dp_core.cpp
src/stateless/messaging/trex_stateless_messaging.cpp
src/stateless/messaging/trex_stateless_messaging.h
src/stateless/rx/trex_stateless_rx_core.cpp
src/stateless/rx/trex_stateless_rx_core.h

index 9422a8f..58f5b66 100755 (executable)
@@ -258,6 +258,7 @@ includes_path =''' ../src/pal/linux/
                    ../src/rpc-server/
                    ../src/stateless/cp/
                    ../src/stateless/dp/
+                   ../src/stateless/rx/
                    ../src/stateless/messaging/
                    ../external_libs/json/
                    ../external_libs/zmq/include/
index faaca0d..2aa06e3 100755 (executable)
@@ -424,6 +424,7 @@ includes_path =''' ../src/pal/linux_dpdk/
                    ../src/rpc-server/
                    ../src/stateless/cp/
                    ../src/stateless/dp/
+                   ../src/stateless/rx/
                    ../src/stateless/messaging/
 
                    ../external_libs/yaml-cpp/include/
index 0103829..d44a91d 100644 (file)
@@ -25,6 +25,7 @@
 #include <os_time.h>
 #include "internal_api/trex_platform_api.h"
 #include "trex_stateless.h"
+#include "trex_stateless_messaging.h"
 #include "trex_stream.h"
 #include "flow_stat_parser.h"
 #include "flow_stat.h"
@@ -385,6 +386,8 @@ void CFlowStatHwIdMap::unmap(uint16_t hw_id) {
 CFlowStatRuleMgr::CFlowStatRuleMgr() {
     m_api = NULL;
     m_max_hw_id = -1;
+    m_num_started_streams = 0;
+    m_ring_to_rx = CMsgIns::Ins()->getCpRx()->getRingCpToDp(0);
 }
 
 std::ostream& operator<<(std::ostream& os, const CFlowStatRuleMgr& cf) {
@@ -488,6 +491,12 @@ int CFlowStatRuleMgr::del_stream(const TrexStream * stream) {
         return 0;
     }
 
+    if (m_user_id_map.is_started(stream->m_rx_check.m_pg_id)) {
+        std::cerr << "Error: Trying to delete flow statistics stream " << stream->m_rx_check.m_pg_id
+                  << " which is not stopped." << std::endl;
+        return -1;
+    }
+
     return m_user_id_map.del_stream(stream->m_rx_check.m_pg_id);
 }
 
@@ -556,6 +565,10 @@ int CFlowStatRuleMgr::start_stream(TrexStream * stream, uint16_t &ret_hw_id) {
     std::cout << "exit:" << __METHOD_NAME__ << " hw_id:" << ret_hw_id << std::endl;
 #endif
 
+    if (m_num_started_streams == 0) {
+        send_start_stop_msg_to_rx(true); // First transmitting stream. Rx core should start reading packets;
+    }
+    m_num_started_streams++;
     return 0;
 }
 
@@ -605,6 +618,11 @@ int CFlowStatRuleMgr::stop_stream(const TrexStream * stream) {
             m_hw_id_map.unmap(hw_id);
         }
     }
+    m_num_started_streams--;
+    assert (m_num_started_streams >= 0);
+    if (m_num_started_streams == 0) {
+        send_start_stop_msg_to_rx(false); // No more transmittig streams. Rx core shoulde get into idle loop.
+    }
     return 0;
 }
 
@@ -618,6 +636,18 @@ int CFlowStatRuleMgr::get_active_pgids(flow_stat_active_t &result) {
     return 0;
 }
 
+extern bool rx_should_stop;
+void CFlowStatRuleMgr::send_start_stop_msg_to_rx(bool is_start) {
+    TrexStatelessCpToRxMsgBase *msg;
+
+    if (is_start) {
+        msg = new TrexRxStartMsg();
+    } else {
+        msg = new TrexRxStopMsg();
+    }
+    m_ring_to_rx->Enqueue((CGenNode *)msg);
+}
+
 // return false if no counters changed since last run. true otherwise
 bool CFlowStatRuleMgr::dump_json(std::string & json, bool baseline) {
     rx_per_flow_t rx_stats[MAX_FLOW_STATS];
@@ -627,7 +657,7 @@ bool CFlowStatRuleMgr::dump_json(std::string & json, bool baseline) {
 
     root["name"] = "flow_stats";
     root["type"] = 0;
-    
+
     if (baseline) {
         root["baseline"] = true;
     }
index 0fb4fed..83f076d 100644 (file)
@@ -26,6 +26,7 @@
 #include <map>
 #include "trex_defs.h"
 #include "trex_stream.h"
+#include "msg_manager.h"
 #include <internal_api/trex_platform_api.h>
 
 // range reserved for rx stat measurement is from IP_ID_RESERVE_BASE to 0xffff
@@ -144,8 +145,8 @@ class CFlowStatUserIdInfo {
     tx_per_flow_t m_tx_counter_base[TREX_MAX_PORTS];
     uint16_t m_hw_id;     // Associated hw id. UINT16_MAX if no associated hw id.
     uint8_t m_proto;      // protocol (UDP, TCP, other), associated with this user id.
-    uint8_t m_ref_count;  // How many streams with this ref count exists
-    uint8_t m_trans_ref_count;  // How many streams with this ref count currently transmit
+    uint8_t m_ref_count;  // How many streams with this user id exists
+    uint8_t m_trans_ref_count;  // How many streams with this user id currently transmit
     bool m_was_sent; // Did we send this info to clients once?
 };
 
@@ -208,6 +209,7 @@ class CFlowStatRuleMgr {
  private:
     int compile_stream(const TrexStream * stream, Cxl710Parser &parser);
     int add_hw_rule(uint16_t hw_id, uint8_t proto);
+    void send_start_stop_msg_to_rx(bool is_start);
 
  private:
     CFlowStatHwIdMap m_hw_id_map; // map hw ids to user ids
@@ -215,6 +217,8 @@ class CFlowStatRuleMgr {
     uint8_t m_num_ports; // How many ports are being used
     const TrexPlatformApi *m_api;
     int m_max_hw_id; // max hw id we ever used
+    uint32_t m_num_started_streams; // How many started (transmitting) streams we have
+    CNodeRing *m_ring_to_rx; // handle for sending messages to Rx core
 };
 
 #endif
index 4fc048f..9e69095 100644 (file)
@@ -2873,14 +2873,14 @@ void CGlobalTRex::rx_sl_configure(void) {
 
     if ( get_vm_one_queue_enable() ) {
 #if 0
-        ???
+        /// what to do here ???
         /* vm mode, indirect queues  */
         for (i=0; i < m_max_ports; i++) {
             CMessagingManager * rx_dp = CMsgIns::Ins()->getRxDp();
             uint8_t thread_id = (i >> 1);
-            CNodeRing * r = rx_dp->getRingCpToDp(thread_id); ///??? should be rx to dp?
-            m_latency_vm_vports[i].Create((uint8_t)i,r,&m_mg);
-            rx_sl_cfg.m_ports[i] =&m_latency_vm_vports[i];
+            CNodeRing * r = rx_dp->getRingCpToDp(thread_id);
+            m_latency_vm_vports[i].Create((uint8_t)i, r, &m_mg);
+            rx_sl_cfg.m_ports[i] = &m_latency_vm_vports[i];
         }
 #endif
     } else {
index 9ade1bf..7e39391 100755 (executable)
@@ -4,7 +4,7 @@
 */
 
 /*
-Copyright (c) 2015-2015 Cisco Systems, Inc.
+Copyright (c) 2015-2016 Cisco Systems, Inc.
 
 Licensed under the Apache License, Version 2.0 (the "License");
 you may not use this file except in compliance with the License.
@@ -65,12 +65,12 @@ void CMessagingManager::Delete(){
         delete [] m_dp_to_cp;
         m_dp_to_cp = NULL;
     }
-    
+
     if (m_cp_to_dp) {
         delete [] m_cp_to_dp;
         m_cp_to_dp = NULL;
     }
-    
+
 }
 
 CNodeRing * CMessagingManager::getRingCpToDp(uint8_t thread_id){
@@ -84,7 +84,6 @@ CNodeRing * CMessagingManager::getRingDpToCp(uint8_t thread_id){
 
 }
 
-
 void CMsgIns::Free(){
     if (m_ins) {
         m_ins->Delete();
@@ -107,6 +106,11 @@ bool CMsgIns::Create(uint8_t num_threads){
     if (!res) {
         return (res);
     }
+    res = m_cp_rx.Create(1, "cp_rx");
+    if (!res) {
+        return (res);
+    }
+
     return (m_rx_dp.Create(num_threads,"rx_dp"));
 }
 
@@ -114,9 +118,8 @@ bool CMsgIns::Create(uint8_t num_threads){
 void CMsgIns::Delete(){
     m_cp_dp.Delete();
     m_rx_dp.Delete();
+    m_cp_rx.Delete();
 }
 
 
-CMsgIns  * CMsgIns::m_ins=0; 
-
-
+CMsgIns  * CMsgIns::m_ins=0;
index 0390ce1..de11edb 100755 (executable)
@@ -6,7 +6,7 @@
 */
 
 /*
-Copyright (c) 2015-2015 Cisco Systems, Inc.
+Copyright (c) 2015-2016 Cisco Systems, Inc.
 
 Licensed under the Apache License, Version 2.0 (the "License");
 you may not use this file except in compliance with the License.
@@ -40,37 +40,37 @@ public:
 
 /*
 
-e.g DP with 4 threads 
-will look like this 
+e.g DP with 4 threads
+will look like this
 
-      cp_to_dp  
+      cp_to_dp
 
       master :push
       dpx    : pop
-                 
+
       -       --> dp0
 cp    -       --> dp1
       -       --> dp2
       -       --> dp3
 
-      dp_to_cp 
+      dp_to_cp
 
       cp     : pop
       dpx    : push
 
-      
+
        <-      -- dp0
 cp     <-      -- dp1
        <-      -- dp2
        <-      -- dp3
 
 
-*/  
+*/
 
 class CGenNode ;
 typedef CTRingSp<CGenNode>  CNodeRing;
 
-/* CP == latency thread 
+/* CP == latency thread
    DP == traffic pkt generator */
 class CMessagingManager {
 public:
@@ -83,6 +83,7 @@ public:
     void Delete();
     CNodeRing * getRingCpToDp(uint8_t thread_id);
     CNodeRing * getRingDpToCp(uint8_t thread_id);
+    CNodeRing * getRingCpToRx();
     uint8_t get_num_threads(){
         return (m_num_dp_threads);
     }
@@ -106,6 +107,9 @@ public:
     CMessagingManager * getCpDp(){
         return (&m_cp_dp);
     }
+    CMessagingManager * getCpRx(){
+        return (&m_cp_rx);
+    }
 
     uint8_t get_num_threads(){
         return (m_rx_dp.get_num_threads());
@@ -114,11 +118,11 @@ public:
 private:
     CMessagingManager m_rx_dp;
     CMessagingManager m_cp_dp;
-
+    CMessagingManager m_cp_rx;
 
 private:
     /* one instance */
-    static  CMsgIns  * m_ins; 
+    static  CMsgIns  * m_ins;
 };
 
 #endif
index 5947aaf..90589d7 100644 (file)
@@ -473,6 +473,13 @@ TrexStatelessPort::send_message_to_dp(uint8_t core_id, TrexStatelessCpToDpMsgBas
     ring->Enqueue((CGenNode *)msg);
 }
 
+void
+TrexStatelessPort::send_message_to_rx(TrexStatelessCpToRxMsgBase *msg) {
+
+    /* send the message to the core */
+    CNodeRing *ring = CMsgIns::Ins()->getCpRx()->getRingCpToDp(0);
+    ring->Enqueue((CGenNode *)msg);
+}
 
 uint64_t
 TrexStatelessPort::get_port_speed_bps() const {
index d3c4dcb..7e1838d 100644 (file)
@@ -4,7 +4,7 @@
 */
 
 /*
-Copyright (c) 2015-2015 Cisco Systems, Inc.
+Copyright (c) 2015-2016 Cisco Systems, Inc.
 
 Licensed under the Apache License, Version 2.0 (the "License");
 you may not use this file except in compliance with the License.
@@ -21,20 +21,21 @@ limitations under the License.
 #ifndef __TREX_STATELESS_PORT_H__
 #define __TREX_STATELESS_PORT_H__
 
-#include <trex_stream.h>
-#include <trex_dp_port_events.h>
-#include <internal_api/trex_platform_api.h>
+#include "internal_api/trex_platform_api.h"
+#include "trex_dp_port_events.h"
+#include "trex_stream.h"
 
 class TrexStatelessCpToDpMsgBase;
+class TrexStatelessCpToRxMsgBase;
 class TrexStreamsGraphObj;
 class TrexPortMultiplier;
 
-/** 
+/**
  * TRex port owner can perform
  * write commands
  * while port is owned - others can
  * do read only commands
- * 
+ *
  */
 class TrexPortOwner {
 public:
@@ -92,7 +93,7 @@ private:
 
     /* handler genereated internally */
     std::string  m_handler;
-    
+
     /* seed for generating random values */
     unsigned int m_seed;
 
@@ -106,7 +107,7 @@ class AsyncStopEvent;
 
 /**
  * describes a stateless port
- * 
+ *
  * @author imarom (31-Aug-15)
  */
 class TrexStatelessPort {
@@ -137,9 +138,9 @@ public:
         RC_ERR_FAILED_TO_COMPILE_STREAMS
     };
 
-  
+
     TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api);
-    
+
     ~TrexStatelessPort();
 
     /**
@@ -155,11 +156,11 @@ public:
     void release(void);
 
     /**
-     * validate the state of the port before start 
-     * it will return a stream graph 
-     * containing information about the streams 
-     * configured on this port 
-     *  
+     * validate the state of the port before start
+     * it will return a stream graph
+     * containing information about the streams
+     * configured on this port
+     *
      * on error it throws TrexException
      */
     const TrexStreamsGraphObj *validate(void);
@@ -190,13 +191,13 @@ public:
 
     /**
      * update current traffic on port
-     * 
+     *
      */
     void update_traffic(const TrexPortMultiplier &mul, bool force);
 
     /**
      * get the port state
-     * 
+     *
      */
     port_state_e get_state() const {
         return m_port_state;
@@ -204,23 +205,23 @@ public:
 
     /**
      * port state as string
-     * 
+     *
      */
     std::string get_state_as_string() const;
 
     /**
      * the the max stream id currently assigned
-     * 
+     *
      */
     int get_max_stream_id() const;
 
     /**
      * fill up properties of the port
-     * 
+     *
      * @author imarom (16-Sep-15)
-     * 
-     * @param driver 
-     * @param speed 
+     *
+     * @param driver
+     * @param speed
      */
     void get_properties(std::string &driver, TrexPlatformApi::driver_speed_e &speed);
 
@@ -237,7 +238,7 @@ public:
 
     /**
      * delegators
-     * 
+     *
      */
 
     void add_stream(TrexStream *stream);
@@ -267,7 +268,7 @@ public:
 
     /**
      * returns the number of DP cores linked to this port
-     * 
+     *
      */
     uint8_t get_dp_core_count() {
         return m_cores_id_list.size();
@@ -275,7 +276,7 @@ public:
 
     /**
      * returns the traffic multiplier currently being used by the DP
-     * 
+     *
      */
     double get_multiplier() {
         return (m_factor);
@@ -283,13 +284,13 @@ public:
 
     /**
      * get port speed in bits per second
-     * 
+     *
      */
     uint64_t get_port_speed_bps() const;
 
     /**
      * return RX caps
-     * 
+     *
      */
     int get_rx_caps() const {
         return m_rx_caps;
@@ -300,12 +301,12 @@ public:
     }
 
     /**
-     * return true if port adds CRC to a packet (not occurs for 
-     * VNICs) 
-     * 
+     * return true if port adds CRC to a packet (not occurs for
+     * VNICs)
+     *
      * @author imarom (24-Feb-16)
-     * 
-     * @return bool 
+     *
+     * @return bool
      */
     bool has_crc_added() const {
         return m_api_info.has_crc;
@@ -318,9 +319,9 @@ public:
 
     /**
      * get the port effective rate (on a started / paused port)
-     * 
+     *
      * @author imarom (07-Jan-16)
-     * 
+     *
      */
     void get_port_effective_rate(double &pps,
                                  double &bps_L1,
@@ -330,8 +331,8 @@ public:
 
     /**
      * set port promiscuous on/off
-     * 
-     * @param enabled 
+     *
+     * @param enabled
      */
     void set_promiscuous(bool enabled);
     bool get_promiscuous();
@@ -357,40 +358,45 @@ private:
 
     /**
      * send message to all cores using duplicate
-     * 
+     *
      */
     void send_message_to_all_dp(TrexStatelessCpToDpMsgBase *msg);
 
     /**
      * send message to specific DP core
-     * 
+     *
      */
     void send_message_to_dp(uint8_t core_id, TrexStatelessCpToDpMsgBase *msg);
 
+    /**
+     * send message to specific RX core
+     *
+     */
+    void send_message_to_rx(TrexStatelessCpToRxMsgBase *msg);
 
     /**
      * when a port stops, perform various actions
-     * 
+     *
      */
     void common_port_stop_actions(bool async);
 
     /**
      * calculate effective M per core
-     * 
+     *
      */
     double calculate_effective_factor(const TrexPortMultiplier &mul, bool force = false);
     double calculate_effective_factor_internal(const TrexPortMultiplier &mul);
-  
+
 
     /**
      * generates a graph of streams graph
-     * 
+     *
      */
     void generate_streams_graph();
 
     /**
      * dispose of it
-     * 
+     *
      * @author imarom (26-Nov-15)
      */
     void delete_streams_graph();
@@ -426,7 +432,7 @@ private:
 
 /**
  * port multiplier object
- * 
+ *
  */
 class TrexPortMultiplier {
 public:
@@ -443,8 +449,8 @@ public:
     };
 
     /**
-     * multiplier can be absolute value 
-     * increment value or subtract value 
+     * multiplier can be absolute value
+     * increment value or subtract value
      */
     enum mul_op_e {
         OP_ABS,
index f8d6d82..ba25f61 100644 (file)
@@ -5,7 +5,7 @@
 */
 
 /*
-Copyright (c) 2015-2015 Cisco Systems, Inc.
+Copyright (c) 2015-2016 Cisco Systems, Inc.
 
 Licensed under the Apache License, Version 2.0 (the "License");
 you may not use this file except in compliance with the License.
@@ -19,14 +19,12 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
 */
-#include <trex_stateless_dp_core.h>
-#include <trex_stateless_messaging.h>
-#include <trex_streams_compiler.h>
-#include <trex_stream_node.h>
-#include <trex_stream.h>
-
-#include <bp_sim.h>
-
+#include "bp_sim.h"
+#include "trex_stateless_dp_core.h"
+#include "trex_stateless_messaging.h"
+#include "trex_stream.h"
+#include "trex_stream_node.h"
+#include "trex_streams_compiler.h"
 
 void CDpOneStream::Delete(CFlowGenListPerThread   * core){
     assert(m_node->get_state() == CGenNodeStateless::ss_INACTIVE);
index 333aec8..3468d62 100644 (file)
@@ -5,7 +5,7 @@
 */
 
 /*
-Copyright (c) 2015-2015 Cisco Systems, Inc.
+Copyright (c) 2015-2016 Cisco Systems, Inc.
 
 Licensed under the Apache License, Version 2.0 (the "License");
 you may not use this file except in compliance with the License.
@@ -19,17 +19,18 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
 */
-#include <trex_stateless_messaging.h>
-#include <trex_stateless_dp_core.h>
-#include <trex_streams_compiler.h>
-#include <trex_stateless.h>
-#include <bp_sim.h>
-
 #include <string.h>
 
+#include "trex_stateless_messaging.h"
+#include "trex_stateless_dp_core.h"
+#include "trex_stateless_rx_core.h"
+#include "trex_streams_compiler.h"
+#include "trex_stateless.h"
+#include "bp_sim.h"
+
 /*************************
   start traffic message
- ************************/ 
+ ************************/
 TrexStatelessDpStart::TrexStatelessDpStart(uint8_t port_id, int event_id, TrexStreamsCompiledObj *obj, double duration) {
     m_port_id = port_id;
     m_event_id = event_id;
@@ -40,7 +41,7 @@ TrexStatelessDpStart::TrexStatelessDpStart(uint8_t port_id, int event_id, TrexSt
 
 /**
  * clone for DP start message
- * 
+ *
  */
 TrexStatelessCpToDpMsgBase *
 TrexStatelessDpStart::clone() {
@@ -69,7 +70,7 @@ TrexStatelessDpStart::handle(TrexStatelessDpCore *dp_core) {
 
 /*************************
   stop traffic message
- ************************/ 
+ ************************/
 bool
 TrexStatelessDpStop::handle(TrexStatelessDpCore *dp_core) {
 
@@ -114,7 +115,7 @@ bool TrexStatelessDpResume::handle(TrexStatelessDpCore *dp_core){
 
 /**
  * clone for DP stop message
- * 
+ *
  */
 TrexStatelessCpToDpMsgBase *
 TrexStatelessDpStop::clone() {
@@ -130,7 +131,7 @@ TrexStatelessDpStop::clone() {
 
 
 
-TrexStatelessCpToDpMsgBase * 
+TrexStatelessCpToDpMsgBase *
 TrexStatelessDpQuit::clone(){
 
     TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpQuit();
@@ -140,7 +141,7 @@ TrexStatelessDpQuit::clone(){
 
 
 bool TrexStatelessDpQuit::handle(TrexStatelessDpCore *dp_core){
-    
+
     /* quit  */
     dp_core->quit_main_loop();
     return (true);
@@ -155,7 +156,7 @@ bool TrexStatelessDpCanQuit::handle(TrexStatelessDpCore *dp_core){
     return (true);
 }
 
-TrexStatelessCpToDpMsgBase * 
+TrexStatelessCpToDpMsgBase *
 TrexStatelessDpCanQuit::clone(){
 
     TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpCanQuit();
@@ -165,7 +166,7 @@ TrexStatelessDpCanQuit::clone(){
 
 /*************************
   update traffic message
- ************************/ 
+ ************************/
 bool
 TrexStatelessDpUpdate::handle(TrexStatelessDpCore *dp_core) {
     dp_core->update_traffic(m_port_id, m_factor);
@@ -207,3 +208,14 @@ TrexDpPortEventMsg::handle() {
     return (true);
 }
 
+/************************* messages from CP to RX **********************/
+bool TrexRxStartMsg::handle (CRxCoreStateless *rx_core) {
+    rx_core->work();
+    return true;
+}
+
+/************************* messages from CP to RX **********************/
+bool TrexRxStopMsg::handle (CRxCoreStateless *rx_core) {
+    rx_core->idle();
+    return true;
+}
index dda086b..b7e8fd3 100644 (file)
@@ -5,7 +5,7 @@
 */
 
 /*
-Copyright (c) 2015-2015 Cisco Systems, Inc.
+Copyright (c) 2015-2016 Cisco Systems, Inc.
 
 Licensed under the Apache License, Version 2.0 (the "License");
 you may not use this file except in compliance with the License.
@@ -22,16 +22,17 @@ limitations under the License.
 #ifndef __TREX_STATELESS_MESSAGING_H__
 #define __TREX_STATELESS_MESSAGING_H__
 
-#include <msg_manager.h>
-#include <trex_dp_port_events.h>
+#include "msg_manager.h"
+#include "trex_dp_port_events.h"
 
 class TrexStatelessDpCore;
+class CRxCoreStateless;
 class TrexStreamsCompiledObj;
 class CFlowGenListPerThread;
 
 /**
  * defines the base class for CP to DP messages
- * 
+ *
  * @author imarom (27-Oct-15)
  */
 class TrexStatelessCpToDpMsgBase {
@@ -49,7 +50,7 @@ public:
 
     /**
      * clone the current message
-     * 
+     *
      */
     virtual TrexStatelessCpToDpMsgBase * clone() = 0;
 
@@ -76,7 +77,7 @@ protected:
 
 /**
  * a message to start traffic
- * 
+ *
  * @author imarom (27-Oct-15)
  */
 class TrexStatelessDpStart : public TrexStatelessCpToDpMsgBase {
@@ -137,7 +138,7 @@ private:
 
 /**
  * a message to stop traffic
- * 
+ *
  * @author imarom (27-Oct-15)
  */
 class TrexStatelessDpStop : public TrexStatelessCpToDpMsgBase {
@@ -191,9 +192,9 @@ private:
 };
 
 /**
- * a message to Quit the datapath traffic. support only stateless for now 
- * 
- * @author hhaim 
+ * a message to Quit the datapath traffic. support only stateless for now
+ *
+ * @author hhaim
  */
 class TrexStatelessDpQuit : public TrexStatelessCpToDpMsgBase {
 public:
@@ -209,9 +210,9 @@ public:
 };
 
 /**
- * a message to check if both port are idel and exit 
- * 
- * @author hhaim 
+ * a message to check if both port are idel and exit
+ *
+ * @author hhaim
  */
 class TrexStatelessDpCanQuit : public TrexStatelessCpToDpMsgBase {
 public:
@@ -247,7 +248,7 @@ private:
 
 /**
  * barrier message for DP core
- * 
+ *
  */
 class TrexStatelessDpBarrier : public TrexStatelessCpToDpMsgBase {
 public:
@@ -270,7 +271,7 @@ private:
 
 /**
  * defines the base class for CP to DP messages
- * 
+ *
  * @author imarom (27-Oct-15)
  */
 class TrexStatelessDpToCpMsgBase {
@@ -284,7 +285,7 @@ public:
 
     /**
      * virtual function to handle a message
-     * 
+     *
      */
     virtual bool handle() = 0;
 
@@ -295,9 +296,9 @@ public:
 
 
 /**
- * a message indicating an event has happened on a port at the 
- * DP 
- * 
+ * a message indicating an event has happened on a port at the
+ * DP
+ *
  */
 class TrexDpPortEventMsg : public TrexStatelessDpToCpMsgBase {
 public:
@@ -326,8 +327,41 @@ private:
     int                         m_thread_id;
     uint8_t                     m_port_id;
     int                         m_event_id;
-    
+
 };
 
-#endif /* __TREX_STATELESS_MESSAGING_H__ */
+/************************* messages from CP to RX **********************/
+
+/**
+ * defines the base class for CP to RX messages
+ *
+ */
+class TrexStatelessCpToRxMsgBase {
+public:
 
+    TrexStatelessCpToRxMsgBase() {
+    }
+
+    virtual ~TrexStatelessCpToRxMsgBase() {
+    }
+
+    /**
+     * virtual function to handle a message
+     *
+     */
+    virtual bool handle (CRxCoreStateless *rx_core) = 0;
+
+    /* no copy constructor */
+    TrexStatelessCpToRxMsgBase(TrexStatelessCpToRxMsgBase &) = delete;
+
+};
+
+class TrexRxStartMsg : public TrexStatelessCpToRxMsgBase {
+    bool handle (CRxCoreStateless *rx_core);
+};
+
+class TrexRxStopMsg : public TrexStatelessCpToRxMsgBase {
+    bool handle (CRxCoreStateless *rx_core);
+};
+
+#endif /* __TREX_STATELESS_MESSAGING_H__ */
index a108bef..8671118 100644 (file)
 #include <stdio.h>
-#include "latency.h"
+#include "bp_sim.h"
 #include "flow_stat_parser.h"
-#include "stateless/rx/trex_stateless_rx_core.h"
-
+#include "latency.h"
+#include "trex_stateless_messaging.h"
+#include "trex_stateless_rx_core.h"
 
 void CRxCoreStateless::create(const CRxSlCfg &cfg) {
     m_max_ports = cfg.m_max_ports;
 
+    CMessagingManager * cp_rx = CMsgIns::Ins()->getCpRx();
+
+    m_ring_from_cp = cp_rx->getRingCpToDp(0);
+    m_ring_to_cp   = cp_rx->getRingDpToCp(0);
+    m_state = STATE_IDLE;
+
     for (int i = 0; i < m_max_ports; i++) {
         CLatencyManagerPerPort * lp = &m_ports[i];
-        //        CCPortLatency * lpo = &m_ports[swap_port(i)].m_port;
-        
         lp->m_io = cfg.m_ports[i];
-        /*        lp->m_port.Create(this,
-                          i,
-                          m_pkt_gen.get_payload_offset(),
-                          m_pkt_gen.get_l4_offset(),
-                          m_pkt_gen.get_pkt_size(),lpo );???*/
     }
+}
 
+void CRxCoreStateless::handle_cp_msg(TrexStatelessCpToRxMsgBase *msg) {
+    msg->handle(this);
+    delete msg;
 }
 
-void CRxCoreStateless::start() {
-    static int count = 0;
-    static int i = 0;
-    while (1) {
-        count += try_rx();
-        i++;
-        if (i == 100000000) {
-            i = 0;
-            //??? remove
-            printf("counter:%d port0:[%u], port1:[%u]\n", count, m_ports[0].m_port.m_rx_pg_pkts[0], m_ports[1].m_port.m_rx_pg_pkts[1]);
+bool CRxCoreStateless::periodic_check_for_cp_messages() {
+    /* fast path */
+    if ( likely ( m_ring_from_cp->isEmpty() ) ) {
+        return false;
+    }
+
+    while ( true ) {
+        CGenNode * node = NULL;
+
+        if (m_ring_from_cp->Dequeue(node) != 0) {
+            break;
+        }
+        assert(node);
+        TrexStatelessCpToRxMsgBase * msg = (TrexStatelessCpToRxMsgBase *)node;
+        handle_cp_msg(msg);
+    }
+
+    return true;
+
+}
+
+void CRxCoreStateless::idle_state_loop() {
+    const int SHORT_DELAY_MS    = 2;
+    const int LONG_DELAY_MS     = 50;
+    const int DEEP_SLEEP_LIMIT  = 2000;
+
+    int counter = 0;
+
+    while (m_state == STATE_IDLE) {
+        bool had_msg = periodic_check_for_cp_messages();
+        if (had_msg) {
+            counter = 0;
+            continue;
+        }
+
+        /* enter deep sleep only if enough time had passed */
+        if (counter < DEEP_SLEEP_LIMIT) {
+            delay(SHORT_DELAY_MS);
+            counter++;
+        } else {
+            delay(LONG_DELAY_MS);
         }
     }
 }
 
-// ??? temp try
+void CRxCoreStateless::start() {
+      static int count = 0;
+      static int i = 0;
+
+      while (true) {
+          if (m_state == STATE_WORKING) {
+              count += try_rx();
+              i++;
+              if (i == 100) {
+                  i = 0;
+                  // if no packets in 100 cycles, sleep for a while to spare the cpu
+                  if (count == 0) {
+                      delay(1);
+                  }
+                  count = 0;
+                  periodic_check_for_cp_messages();
+              }
+          } else {
+              idle_state_loop();
+          }
+#if 0
+          ??? do we need this?
+          if ( m_core->is_terminated_by_master() ) {
+              break;
+          }
+#endif
+      }
+}
+
 int CRxCoreStateless::try_rx() {
     rte_mbuf_t * rx_pkts[64];
     int i, total_pkts = 0;
     for (i = 0; i < m_max_ports; i++) {
         CLatencyManagerPerPort * lp = &m_ports[i];
         rte_mbuf_t * m;
-        //m_cpu_dp_u.start_work();
         /* try to read 64 packets clean up the queue */
         uint16_t cnt_p = lp->m_io->rx_burst(rx_pkts, 64);
         total_pkts += cnt_p;
@@ -63,10 +125,8 @@ int CRxCoreStateless::try_rx() {
                 }
                 rte_pktmbuf_free(m);
             }
-            /* commit only if there was work to do ! */
-            //m_cpu_dp_u.commit(); //??? what's this?
-          }/* if work */
-      }// all ports
+        }/* if work */
+    }// all ports
     return total_pkts;
 }
 
index 942ddbd..eecc803 100644 (file)
@@ -23,6 +23,8 @@ limitations under the License.
 #include <stdint.h>
 #include "latency.h"
 
+class TrexStatelessCpToRxMsgBase;
+
 class CRxSlCfg {
  public:
     CRxSlCfg (){
@@ -37,19 +39,33 @@ class CRxSlCfg {
 };
 
 class CRxCoreStateless {
+    enum state_e {
+        STATE_IDLE,
+        STATE_WORKING,
+    };
+
  public:
     void start();
     void create(const CRxSlCfg &cfg);
     void reset_rx_stats(uint8_t port_id);
     int get_rx_stats(uint8_t port_id, uint32_t *pkts, uint32_t *prev_pkts
                      , uint32_t *bytes, uint32_t *prev_bytes, int min, int max);
+    void work() {m_state = STATE_WORKING;}
+    void idle() {m_state = STATE_IDLE;}
  private:
+    void handle_cp_msg(TrexStatelessCpToRxMsgBase *msg);
+    bool periodic_check_for_cp_messages();
+    void idle_state_loop();
     int try_rx();
     bool is_flow_stat_id(uint16_t id);
     uint16_t get_hw_id(uint16_t id);
-    
+
  private:
     uint32_t m_max_ports;
+    bool m_has_streams;
     CLatencyManagerPerPort m_ports[TREX_MAX_PORTS];
+    state_e             m_state; /* state of all ports */
+    CNodeRing           *m_ring_from_cp;
+    CNodeRing           *m_ring_to_cp;
 };
 #endif