TX packet capture - zero impact on fast path
authorimarom <[email protected]>
Wed, 15 Feb 2017 16:26:41 +0000 (18:26 +0200)
committerimarom <[email protected]>
Thu, 16 Feb 2017 13:20:23 +0000 (15:20 +0200)
(using wrapper when service mode is active)

Signed-off-by: imarom <[email protected]>
src/bp_sim.h
src/main_dpdk.cpp
src/stateless/cp/trex_stateless_port.cpp
src/stateless/dp/trex_stateless_dp_core.cpp
src/stateless/dp/trex_stateless_dp_core.h
src/stateless/messaging/trex_stateless_messaging.cpp
src/stateless/messaging/trex_stateless_messaging.h
src/stateless/rx/trex_stateless_capture.cpp
src/stateless/rx/trex_stateless_capture.h

index 27fb8d0..22968b2 100755 (executable)
@@ -319,8 +319,15 @@ public:
     virtual ~CVirtualIF() {}
     virtual int open_file(std::string file_name)=0;
     virtual int close_file(void)=0;
+
     /* send one packet */
     virtual int send_node(CGenNode * node)=0;
+    
+    /* by default does the same */
+    virtual int send_node_service_mode(CGenNode *node) {
+        return send_node(node);
+    }
+    
     /* send one packet to a specific dir. flush all packets */
     virtual void send_one_pkt(pkt_dir_t dir, rte_mbuf_t *m) {}
     /* flush all pending packets into the stream */
index f227b2d..7371b36 100644 (file)
@@ -2166,10 +2166,27 @@ class CCoreEthIFStateless : public CCoreEthIF {
 public:
     virtual int send_node_flow_stat(rte_mbuf *m, CGenNodeStateless * node_sl, CCorePerPort *  lp_port
                                     , CVirtualIFPerSideStats  * lp_stats, bool is_const);
-    virtual int send_node(CGenNode * node);
+    
+    /**
+     * fast path version
+     */
+    virtual int send_node(CGenNode *node);
+    
+    /**
+     * slow path version
+     */
+    virtual int send_node_service_mode(CGenNode *node);
+    
 protected:
-    int handle_slow_path_node(CGenNode *node);
-    int send_pcap_node(CGenNodePCAP *pcap_node);
+    template <bool SERVICE_MODE> inline int send_node_common(CGenNode *no);
+    
+    inline rte_mbuf_t * generate_node_pkt(CGenNodeStateless *node_sl)   __attribute__ ((always_inline));
+    inline int send_node_packet(CGenNodeStateless      *node_sl,
+                                rte_mbuf_t             *m,
+                                CCorePerPort           *lp_port,
+                                CVirtualIFPerSideStats *lp_stats)   __attribute__ ((always_inline));
+    
+    rte_mbuf_t * generate_slow_path_node_pkt(CGenNodeStateless *node_sl);
 };
 
 bool CCoreEthIF::Create(uint8_t             core_id,
@@ -2311,8 +2328,6 @@ int CCoreEthIF::send_pkt(CCorePerPort * lp_port,
     lp_port->m_table[len]=m;
     len++;
     
-    TrexStatelessCaptureMngr::getInstance().handle_pkt_tx(m, lp_port->m_port->get_port_id());
-    
     /* enough pkts to be sent */
     if (unlikely(len == MAX_PKT_BURST)) {
         send_burst(lp_port, MAX_PKT_BURST,lp_stats);
@@ -2414,25 +2429,20 @@ int CCoreEthIFStateless::send_node_flow_stat(rte_mbuf *m, CGenNodeStateless * no
     return 0;
 }
 
-int CCoreEthIFStateless::send_node(CGenNode * no) {
-    /* if a node is marked as slow path - single IF to redirect it to slow path */
-    if (no->get_is_slow_path()) {
-        return handle_slow_path_node(no);
+inline rte_mbuf_t *
+CCoreEthIFStateless::generate_node_pkt(CGenNodeStateless *node_sl) {
+    if (unlikely(node_sl->get_is_slow_path())) {
+        return generate_slow_path_node_pkt(node_sl);
     }
-
-    CGenNodeStateless * node_sl=(CGenNodeStateless *) no;
-
+    
     /* check that we have mbuf  */
-    rte_mbuf_t *    m;
-
-    pkt_dir_t dir=(pkt_dir_t)node_sl->get_mbuf_cache_dir();
-    CCorePerPort *  lp_port=&m_ports[dir];
-    CVirtualIFPerSideStats  * lp_stats = &m_stats[dir];
+    rte_mbuf_t *m;
+    
     if ( likely(node_sl->is_cache_mbuf_array()) ) {
-        m=node_sl->cache_mbuf_array_get_cur();
+        m = node_sl->cache_mbuf_array_get_cur();
         rte_pktmbuf_refcnt_update(m,1);
     }else{
-        m=node_sl->get_cache_mbuf();
+        m = node_sl->get_cache_mbuf();
 
         if (m) {
             /* cache case */
@@ -2443,6 +2453,15 @@ int CCoreEthIFStateless::send_node(CGenNode * no) {
         }
     }
 
+    return m;
+}
+
+inline int
+CCoreEthIFStateless::send_node_packet(CGenNodeStateless      *node_sl,
+                                      rte_mbuf_t             *m,
+                                      CCorePerPort           *lp_port,
+                                      CVirtualIFPerSideStats *lp_stats) {
+    
     if (unlikely(node_sl->is_stat_needed())) {
         if ( unlikely(node_sl->is_cache_mbuf_array()) ) {
             // No support for latency + cache. If user asks for cache on latency stream, we change cache to 0.
@@ -2451,38 +2470,49 @@ int CCoreEthIFStateless::send_node(CGenNode * no) {
         }
         return send_node_flow_stat(m, node_sl, lp_port, lp_stats, (node_sl->get_cache_mbuf()) ? true : false);
     } else {
-        send_pkt(lp_port,m,lp_stats);
-    }
-
-    return (0);
-};
-
-int CCoreEthIFStateless::send_pcap_node(CGenNodePCAP *pcap_node) {
-    rte_mbuf_t *m = pcap_node->get_pkt();
-    if (!m) {
-        return (-1);
+        return send_pkt(lp_port, m, lp_stats);
     }
+}
 
-    pkt_dir_t dir = (pkt_dir_t)pcap_node->get_mbuf_dir();
-    CCorePerPort *lp_port=&m_ports[dir];
-    CVirtualIFPerSideStats *lp_stats = &m_stats[dir];
+int CCoreEthIFStateless::send_node(CGenNode *no) {
+    return send_node_common<false>(no);
+}
 
-    send_pkt(lp_port, m, lp_stats);
+int CCoreEthIFStateless::send_node_service_mode(CGenNode *no) {
+    return send_node_common<true>(no);
+}
 
-    return (0);
+template <bool SERVICE_MODE>
+int CCoreEthIFStateless::send_node_common(CGenNode *no) {
+    CGenNodeStateless * node_sl = (CGenNodeStateless *) no;
+    
+    pkt_dir_t dir                     = (pkt_dir_t)node_sl->get_mbuf_cache_dir();
+    CCorePerPort *lp_port             = &m_ports[dir];
+    CVirtualIFPerSideStats *lp_stats  = &m_stats[dir];
+    
+    rte_mbuf_t *m = generate_node_pkt(node_sl);
+    
+    /* template boolean - this will be removed at compile time */
+    if (SERVICE_MODE) {
+        TrexStatelessCaptureMngr::getInstance().handle_pkt_tx(m, lp_port->m_port->get_port_id());
+    }
+    
+    return send_node_packet(node_sl, m, lp_port, lp_stats);
 }
 
 /**
  * slow path code goes here
  *
  */
-int CCoreEthIFStateless::handle_slow_path_node(CGenNode * no) {
+rte_mbuf_t *
+CCoreEthIFStateless::generate_slow_path_node_pkt(CGenNodeStateless *node_sl) {
 
-    if (no->m_type == CGenNode::PCAP_PKT) {
-        return send_pcap_node((CGenNodePCAP *)no);
+    if (node_sl->m_type == CGenNode::PCAP_PKT) {
+        CGenNodePCAP *pcap_node = (CGenNodePCAP *)node_sl;
+        return pcap_node->get_pkt();
     }
 
-    return (-1);
+    return (NULL);
 }
 
 void CCoreEthIF::apply_client_cfg(const ClientCfgBase *cfg, rte_mbuf_t *m, pkt_dir_t dir, uint8_t *p) {
index b0366fb..598577c 100644 (file)
@@ -989,7 +989,7 @@ TrexStatelessPort::set_service_mode(bool enabled) {
             getPortAttrObj()->set_rx_filter_mode(RX_FILTER_MODE_HW);
         }
         m_is_service_mode_on = enabled;
-        return;
+        break;
         
     case TrexStatelessRxQuery::RC_FAIL_RX_QUEUE_ACTIVE:
         throw TrexException("unable to disable service mode - please remove RX queue");
@@ -1000,6 +1000,10 @@ TrexStatelessPort::set_service_mode(bool enabled) {
     default:
         assert(0);
     }
+    
+    /* update the dp cores */
+    TrexStatelessDpServiceMode *dp_msg = new TrexStatelessDpServiceMode(m_port_id, enabled);
+    send_message_to_all_dp(dp_msg);
 }
 
 
index d8563e9..56184ae 100644 (file)
@@ -28,6 +28,53 @@ limitations under the License.
 #include "mbuf.h"
 
 
+class DPCoreWrapper : public CVirtualIF {
+public:
+    
+    DPCoreWrapper() {
+        m_wrapped = nullptr;
+    }
+    
+    void set_wrapped_object(CVirtualIF *wrapped) {
+        m_wrapped = wrapped;
+    }
+    
+    CVirtualIF *get_wrapped_object() const {
+        return m_wrapped;
+    }
+    
+    virtual int close_file(void) {
+        return m_wrapped->close_file();
+    }
+    
+    virtual int flush_tx_queue(void) {
+        return m_wrapped->flush_tx_queue();
+    }
+    
+    virtual int open_file(std::string file_name) {
+        return m_wrapped->open_file(file_name);
+    }
+    
+    /* move to service mode */
+    virtual int send_node(CGenNode *node) {
+        return m_wrapped->send_node_service_mode(node);
+    }
+    
+    virtual int update_mac_addr_from_global_cfg(pkt_dir_t dir, uint8_t *p) {
+        return m_wrapped->update_mac_addr_from_global_cfg(dir, p);
+    }
+    
+    virtual pkt_dir_t port_id_to_dir(uint8_t port_id) {
+        return m_wrapped->port_id_to_dir(port_id);
+    }
+    
+    virtual void send_one_pkt(pkt_dir_t dir, rte_mbuf_t *m) {
+        m_wrapped->send_one_pkt(dir, m);
+    }
+    
+private:
+    CVirtualIF *m_wrapped;
+};
 
 
 void CGenNodeStateless::cache_mbuf_array_init(){
@@ -592,6 +639,18 @@ void TrexStatelessDpPerPort::create(CFlowGenListPerThread   *  core){
 }
 
 
+TrexStatelessDpCore::TrexStatelessDpCore() {
+    m_thread_id       = 0;
+    m_core            = NULL;
+    m_duration        = -1;
+    m_is_service_mode = NULL;
+    m_wrapper         = new DPCoreWrapper();
+}
+
+TrexStatelessDpCore::~TrexStatelessDpCore() {
+    delete m_wrapper;
+}
+
 
 void
 TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) {
@@ -717,6 +776,7 @@ void TrexStatelessDpCore::quit_main_loop(){
  */
 void
 TrexStatelessDpCore::start_scheduler() {
+    
     /* creates a maintenace job using the scheduler */
     CGenNode * node_sync = m_core->create_node() ;
     node_sync->m_type = CGenNode::FLOW_SYNC;
@@ -1255,6 +1315,32 @@ TrexStatelessDpCore::barrier(uint8_t port_id, int event_id) {
     ring->Enqueue((CGenNode *)event_msg);
 }
 
+void
+TrexStatelessDpCore::set_service_mode(uint8_t port_id, bool enabled) {
+    /* ignore the same message */
+    if (enabled == m_is_service_mode) {
+        return;
+    }
+    
+    if (enabled) {
+        /* sanity */
+        assert(m_core->m_node_gen.m_v_if != m_wrapper);
+        
+        /* set the wrapper object and make the VIF point to it */
+        m_wrapper->set_wrapped_object(m_core->m_node_gen.m_v_if);
+        m_core->m_node_gen.m_v_if = m_wrapper;
+        m_is_service_mode = true;
+        
+    } else {
+        /* sanity */
+        assert(m_core->m_node_gen.m_v_if == m_wrapper);
+        
+        /* restore the wrapped object and make the VIF point to it */
+        m_core->m_node_gen.m_v_if = m_wrapper->get_wrapped_object();
+        m_is_service_mode = false;
+    }
+}
+
 
 /**
  * PCAP node
index e880a6e..9312890 100644 (file)
@@ -34,6 +34,7 @@ class CGenNodeStateless;
 class TrexStreamsCompiledObj;
 class TrexStream;
 class CGenNodePCAP;
+class DPCoreWrapper;
 
 class CDpOneStream  {
 public:
@@ -116,6 +117,7 @@ public:
 /* for now */
 #define NUM_PORTS_PER_CORE 2
 
+
 class TrexStatelessDpCore {
 
 public:
@@ -131,12 +133,10 @@ public:
 
     };
 
-    TrexStatelessDpCore() {
-        m_thread_id = 0;
-        m_core = NULL;
-        m_duration = -1;
-    }
-
+    TrexStatelessDpCore();
+    ~TrexStatelessDpCore();
+    
+    
     /**
      * "static constructor"
      * 
@@ -273,6 +273,10 @@ public:
         return get_port_db(port_id)->is_active();
     }
 
+    /**
+     * enabled/disable service mode
+     */
+    void set_service_mode(uint8_t port_id, bool enabled);
 
 private:
 
@@ -335,6 +339,9 @@ private:
     CFlowGenListPerThread   * m_core;
 
     double                 m_duration;
+    
+    DPCoreWrapper          *m_wrapper;
+    bool                   m_is_service_mode;
 };
 
 #endif /* __TREX_STATELESS_DP_CORE_H__ */
index f89ca34..0a3fbfd 100644 (file)
@@ -233,6 +233,24 @@ TrexStatelessDpBarrier::clone() {
     return new_msg;
 }
 
+/*************************
+  service mode message
+ ************************/
+
+bool
+TrexStatelessDpServiceMode::handle(TrexStatelessDpCore *dp_core) {
+    dp_core->set_service_mode(m_port_id, m_enabled);
+    return true;
+}
+
+TrexStatelessCpToDpMsgBase *
+TrexStatelessDpServiceMode::clone() {
+
+    TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpServiceMode(m_port_id, m_enabled);
+
+    return new_msg;
+}
+
 /************************* messages from DP to CP **********************/
 bool
 TrexDpPortEventMsg::handle() {
index cd79d6e..7871a75 100644 (file)
@@ -140,7 +140,7 @@ protected:
 class TrexStatelessDpStart : public TrexStatelessCpToDpMsgBase {
 public:
 
-    TrexStatelessDpStart(uint8_t m_port_id, int m_event_id, TrexStreamsCompiledObj *obj, double duration);
+    TrexStatelessDpStart(uint8_t port_id, int event_id, TrexStreamsCompiledObj *obj, double duration);
 
     ~TrexStatelessDpStart();
 
@@ -369,6 +369,29 @@ private:
 };
 
 
+/**
+ * move a DP core in/out of service mode (slower as it might do
+ * capturing and etc.) 
+ *
+ */
+class TrexStatelessDpServiceMode : public TrexStatelessCpToDpMsgBase {
+public:
+
+    TrexStatelessDpServiceMode(uint8_t port_id, bool enabled) {
+        m_port_id = port_id;
+        m_enabled = enabled;
+    }
+
+    virtual TrexStatelessCpToDpMsgBase * clone();
+
+    virtual bool handle(TrexStatelessDpCore *dp_core);
+
+private:
+
+    uint8_t   m_port_id;
+    bool      m_enabled;
+};
+
 /************************* messages from DP to CP **********************/
 
 /**
index d9813ca..3b0273a 100644 (file)
@@ -46,7 +46,7 @@ TrexStatelessCapture::~TrexStatelessCapture() {
 
 void
 TrexStatelessCapture::handle_pkt(const rte_mbuf_t *m, int port, TrexPkt::origin_e origin) {
-
+    
     if (m_state != STATE_ACTIVE) {
         return;
     }
@@ -127,7 +127,7 @@ TrexStatelessCaptureMngr::update_global_filter() {
     for (TrexStatelessCapture *capture : m_captures) {
         new_filter += capture->get_filter();
     }
-    
+  
     m_global_filter = new_filter;
 }
 
@@ -136,7 +136,7 @@ TrexStatelessCaptureMngr::update_global_filter() {
  * lookup a specific capture by ID
  */
 TrexStatelessCapture *
-TrexStatelessCaptureMngr::lookup(capture_id_t capture_id) {
+TrexStatelessCaptureMngr::lookup(capture_id_t capture_id) const {
     
     for (int i = 0; i < m_captures.size(); i++) {
         if (m_captures[i]->get_id() == capture_id) {
@@ -150,7 +150,7 @@ TrexStatelessCaptureMngr::lookup(capture_id_t capture_id) {
 
 
 int
-TrexStatelessCaptureMngr::lookup_index(capture_id_t capture_id) {
+TrexStatelessCaptureMngr::lookup_index(capture_id_t capture_id) const {
     for (int i = 0; i < m_captures.size(); i++) {
         if (m_captures[i]->get_id() == capture_id) {
             return i;
@@ -179,11 +179,20 @@ TrexStatelessCaptureMngr::start(const CaptureFilter &filter,
     /* create a new capture*/
     int new_id = m_id_counter++;
     TrexStatelessCapture *new_capture = new TrexStatelessCapture(new_id, limit, filter, mode);
+    
+    /**
+     * add the new capture in a safe mode 
+     * (TX might be active) 
+     */
+    std::unique_lock<std::mutex> ulock(m_lock);
     m_captures.push_back(new_capture);
+
     /* update global filter */
     update_global_filter();
     
+    /* done with critical section */
+    ulock.unlock();
+    
     /* result */
     rc.set_rc(new_id, new_capture->get_start_ts());
 }
@@ -196,7 +205,9 @@ TrexStatelessCaptureMngr::stop(capture_id_t capture_id, TrexCaptureRCStop &rc) {
         return;
     }
     
+    std::unique_lock<std::mutex> ulock(m_lock);
     capture->stop();
+    
     rc.set_rc(capture->get_pkt_count());
 }
 
@@ -210,7 +221,11 @@ TrexStatelessCaptureMngr::fetch(capture_id_t capture_id, uint32_t pkt_limit, Tre
     }
     
     uint32_t pending = 0;
+    
+    /* take a lock before fetching all the packets */
+    std::unique_lock<std::mutex> ulock(m_lock);
     TrexPktBuffer *pkt_buffer = capture->fetch(pkt_limit, pending);
+    ulock.unlock();
     
     rc.set_rc(pkt_buffer, pending, capture->get_start_ts());
 }
@@ -226,14 +241,21 @@ TrexStatelessCaptureMngr::remove(capture_id_t capture_id, TrexCaptureRCRemove &r
     }
     
     TrexStatelessCapture *capture =  m_captures[index];
-    m_captures.erase(m_captures.begin() + index);
     
-    /* free memory */
-    delete capture;
+    /* remove from list under lock */
+    std::unique_lock<std::mutex> ulock(m_lock);
     
-    /* update global filter */
+    m_captures.erase(m_captures.begin() + index);
+    
+    /* update global filter under lock (for barrier) */
     update_global_filter();
     
+    /* done with critical section */
+    ulock.unlock();
+    
+    /* free memory */
+    delete capture;
+
     rc.set_rc();
 }
 
@@ -247,24 +269,39 @@ TrexStatelessCaptureMngr::reset() {
     }
 }
 
+//#define STRESS_TEST
 void 
 TrexStatelessCaptureMngr::handle_pkt_slow_path(const rte_mbuf_t *m, int port, TrexPkt::origin_e origin) {
-    std::unique_lock<std::mutex> lock(m_lock);
+    
+    #ifdef STRESS_TEST
+        static int sanity = 0;
+        assert(__sync_fetch_and_add(&sanity, 1) == 0);
+    #endif
     
     for (TrexStatelessCapture *capture : m_captures) {
         capture->handle_pkt(m, port, origin);
     }
+    
+    #ifdef STRESS_TEST
+        assert(__sync_fetch_and_sub(&sanity, 1) == 1);
+    #endif
 }
 
+
 Json::Value
-TrexStatelessCaptureMngr::to_json() const {
+TrexStatelessCaptureMngr::to_json() {
     Json::Value lst = Json::arrayValue;
-
+    
+    std::unique_lock<std::mutex> ulock(m_lock);
+    
     for (TrexStatelessCapture *capture : m_captures) {
         lst.append(capture->to_json());
     }
 
+    ulock.unlock();
+    
     return lst;
 }
 
 TrexStatelessCaptureMngr TrexStatelessCaptureMngr::g_instance;
+
index 6288ac5..8af2510 100644 (file)
@@ -101,6 +101,14 @@ public:
         return output;
     }
 
+    uint64_t get_tx_active_map() const {
+        return m_tx_active;
+    }
+    
+    uint64_t get_rx_active_map() const {
+        return m_rx_active;
+    }
+    
 private:
     
     uint64_t  m_tx_active;
@@ -247,30 +255,56 @@ public:
     }
     
     /**
-     * handle packet on TX side
+     * handle packet on TX side 
+     * always with a lock 
      */
     inline void handle_pkt_tx(const rte_mbuf_t *m, int port) {
+
+        /* fast path */
+        if (likely(!m_global_filter.in_tx(port))) {
+            return;
+        }
         
-        /* fast bail out IF */
-        if (unlikely(m_global_filter.in_tx(port))) {
+        /* TX core always locks */
+        std::unique_lock<std::mutex> ulock(m_lock);
+        
+        /* check again the global filter (because of RX fast path might not lock) */
+        if (m_global_filter.in_tx(port)) {
             handle_pkt_slow_path(m, port, TrexPkt::ORIGIN_TX);
         }
         
+        ulock.unlock();
+        
     }
     
     /** 
-     * handle packet on RX side
+     * handle packet on RX side 
+     * RX side might or might not use a lock - depends if there are 
+     * other TX cores being captured 
      */
     inline void handle_pkt_rx(const rte_mbuf_t *m, int port) {
         
+        /* fast path */
+        if (likely(!m_global_filter.in_rx(port))) {
+            return;
+        }
+        
+        /* create a RAII object lock but do not lock yet */
+        std::unique_lock<std::mutex> ulock(m_lock, std::defer_lock);
+        
+        /* if we are not alone - lock */
+        if (m_global_filter.get_tx_active_map() != 0) {
+            ulock.lock();
+        }
+        
         /* fast bail out IF */
-        if (unlikely(m_global_filter.in_rx(port))) {
+        if (m_global_filter.in_rx(port)) {
             handle_pkt_slow_path(m, port, TrexPkt::ORIGIN_RX);
         }
         
     }
     
-    Json::Value to_json() const;
+    Json::Value to_json();
         
 private:
     
@@ -280,10 +314,10 @@ private:
     }
     
     
-    TrexStatelessCapture * lookup(capture_id_t capture_id);
-    int lookup_index(capture_id_t capture_id);
+    TrexStatelessCapture * lookup(capture_id_t capture_id) const;
+    int lookup_index(capture_id_t capture_id) const;
     
-    void handle_pkt_slow_path(const rte_mbuf_t *m, int port, TrexPkt::origin_e origin) __attribute__ ((noinline));
+    void handle_pkt_slow_path(const rte_mbuf_t *m, int port, TrexPkt::origin_e origin);
    
     void update_global_filter();