self code review
authorimarom <[email protected]>
Thu, 24 Nov 2016 14:58:34 +0000 (16:58 +0200)
committerimarom <[email protected]>
Thu, 24 Nov 2016 14:58:34 +0000 (16:58 +0200)
Signed-off-by: imarom <[email protected]>
scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py
scripts/t-rex-64
src/rpc-server/commands/trex_rpc_cmd_general.cpp
src/rpc-server/commands/trex_rpc_cmd_stream.cpp
src/stateless/cp/trex_stateless_port.cpp
src/stateless/cp/trex_stateless_port.h
src/stateless/messaging/trex_stateless_messaging.cpp
src/stateless/messaging/trex_stateless_messaging.h
src/stateless/rx/trex_stateless_rx_core.h
src/stateless/rx/trex_stateless_rx_port_mngr.cpp
src/stateless/rx/trex_stateless_rx_port_mngr.h

index d23010d..546298c 100755 (executable)
@@ -1880,9 +1880,9 @@ class STLClient(object):
         """
         self._validate_port_list(src_port)
         
-        self.logger.pre_cmd("Pinging {0} bytes from port {1} to IPv4 {2}:".format(pkt_size,
-                                                                                  src_port,
-                                                                                  dst_ipv4))
+        self.logger.pre_cmd("Pinging {0} from port {1} with {2} bytes of data:".format(dst_ipv4,
+                                                                                       src_port,
+                                                                                       pkt_size))
         
         # no async messages
         with self.logger.supress(level = LoggerApi.VERBOSE_REGULAR_SYNC):
index 5515ba0..fc8318a 100755 (executable)
@@ -26,7 +26,7 @@ cd $(dirname $0)
 export LD_LIBRARY_PATH=$PWD
 
 #Add dummy lib in case we don't find it, e.g. there is no OFED installed  
-if ldd _t-rex-64  | grep "libibverbs.so" | grep -q "not found"; then 
+if ldd _$(basename $0)  | grep "libibverbs.so" | grep -q "not found"; then 
 export LD_LIBRARY_PATH=$PWD:$PWD/dumy_libs                           
 fi
             
index 14b3816..11dd99b 100644 (file)
@@ -812,7 +812,7 @@ TrexRpcCmdGetRxQueuePkts::_run(const Json::Value &params, Json::Value &result) {
     TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id);
 
     try {
-        const RxPacketBuffer *pkt_buffer = port->get_rx_queue_pkts();
+        const RXPacketBuffer *pkt_buffer = port->get_rx_queue_pkts();
         if (pkt_buffer) {
             result["result"]["pkts"] = pkt_buffer->to_json();
         } else {
index d4e900a..9a57c5f 100644 (file)
@@ -670,6 +670,7 @@ TrexRpcCmdStartTraffic::_run(const Json::Value &params, Json::Value &result) {
         generate_parse_err(result, "start message can only specify absolute speed rate");
     }
 
+    dsec_t ts = now_sec();
     TrexPortMultiplier mul(type, op, value);
 
     try {
@@ -680,7 +681,7 @@ TrexRpcCmdStartTraffic::_run(const Json::Value &params, Json::Value &result) {
     }
 
     result["result"]["multiplier"] = port->get_multiplier();
-    result["result"]["ts"]         = now_sec();
+    result["result"]["ts"]         = ts;
     
     return (TREX_RPC_CMD_OK);
 }
index d4bc5c3..75530ea 100644 (file)
@@ -967,7 +967,7 @@ TrexStatelessPort::stop_rx_queue() {
 }
 
 
-RxPacketBuffer *
+RXPacketBuffer *
 TrexStatelessPort::get_rx_queue_pkts() {
 
     if (m_rx_features_info.m_rx_queue_info.is_empty()) {
@@ -975,12 +975,12 @@ TrexStatelessPort::get_rx_queue_pkts() {
     }
 
     /* ask RX core for the pkt queue */
-    TrexStatelessMsgReply<RxPacketBuffer *> msg_reply;
+    TrexStatelessMsgReply<RXPacketBuffer *> msg_reply;
 
     TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxQueueGetPkts(m_port_id, msg_reply);
     send_message_to_rx(msg);
 
-    RxPacketBuffer *pkt_buffer = msg_reply.wait_for_reply();
+    RXPacketBuffer *pkt_buffer = msg_reply.wait_for_reply();
     return pkt_buffer;
 }
 
index cf6b271..4aa7ff3 100644 (file)
@@ -31,7 +31,7 @@ class TrexStatelessCpToDpMsgBase;
 class TrexStatelessCpToRxMsgBase;
 class TrexStreamsGraphObj;
 class TrexPortMultiplier;
-class RxPacketBuffer;
+class RXPacketBuffer;
 
 
 /**
@@ -406,7 +406,7 @@ public:
      * fetch the RX queue packets from the queue
      * 
      */
-    RxPacketBuffer *get_rx_queue_pkts();
+    RXPacketBuffer *get_rx_queue_pkts();
 
     /**
      * return the port attribute object
index c2182f3..a8fb7ba 100644 (file)
@@ -288,7 +288,7 @@ TrexStatelessRxStopQueue::handle(CRxCoreStateless *rx_core) {
 
 
 bool TrexStatelessRxQueueGetPkts::handle(CRxCoreStateless *rx_core) {
-    RxPacketBuffer *pkt_buffer = rx_core->get_rx_queue_pkts(m_port_id);
+    RXPacketBuffer *pkt_buffer = rx_core->get_rx_queue_pkts(m_port_id);
     assert(pkt_buffer);
     m_reply.set(pkt_buffer);
 
index 52b1662..ed2ec90 100644 (file)
@@ -32,7 +32,7 @@ class TrexStatelessDpCore;
 class CRxCoreStateless;
 class TrexStreamsCompiledObj;
 class CFlowGenListPerThread;
-class RxPacketBuffer;
+class RXPacketBuffer;
 
 /**
  * defines the base class for CP to DP messages
@@ -524,7 +524,7 @@ private:
 class TrexStatelessRxQueueGetPkts : public TrexStatelessCpToRxMsgBase {
 public:
 
-    TrexStatelessRxQueueGetPkts(uint8_t port_id, TrexStatelessMsgReply<RxPacketBuffer *> &reply) : m_reply(reply) {
+    TrexStatelessRxQueueGetPkts(uint8_t port_id, TrexStatelessMsgReply<RXPacketBuffer *> &reply) : m_reply(reply) {
         m_port_id = port_id;
     }
 
@@ -536,7 +536,7 @@ public:
 
 private:
     uint8_t                                    m_port_id;
-    TrexStatelessMsgReply<RxPacketBuffer*>    &m_reply;
+    TrexStatelessMsgReply<RXPacketBuffer*>    &m_reply;
 };
 
 
index 519724d..9df3631 100644 (file)
@@ -111,7 +111,7 @@ class CRxCoreStateless {
     double get_cpu_util();
     void update_cpu_util();
 
-    RxPacketBuffer *get_rx_queue_pkts(uint8_t port_id) {
+    RXPacketBuffer *get_rx_queue_pkts(uint8_t port_id) {
         return m_rx_port_mngr[port_id].get_pkt_buffer();
     }
 
index 2683dbe..46fec43 100644 (file)
 #include "common/captureFile.h"
 #include "trex_stateless_rx_core.h"
 
-/************************** latency feature ************/
-void RXLatency::handle_pkt(const rte_mbuf_t *m) {
+/**************************************
+ * latency RX feature
+ * 
+ *************************************/
+RXLatency::RXLatency() {
+    m_rcv_all    = false;
+    m_rfc2544    = NULL;
+    m_err_cntrs  = NULL;
+
+    for (int i = 0; i < MAX_FLOW_STATS; i++) {
+        m_rx_pg_stat[i].clear();
+        m_rx_pg_stat_payload[i].clear();
+    }
+}
+
+void
+RXLatency::create(CRFC2544Info *rfc2544, CRxCoreErrCntrs *err_cntrs) {
+    m_rfc2544   = rfc2544;
+    m_err_cntrs = err_cntrs;
+}
+
+void 
+RXLatency::handle_pkt(const rte_mbuf_t *m) {
     CFlowStatParser parser;
 
     if (m_rcv_all || parser.parse(rte_pktmbuf_mtod(m, uint8_t *), m->pkt_len) == 0) {
@@ -135,6 +156,92 @@ RXLatency::reset_stats() {
     }
 }
 
+/**************************************
+ * RX feature queue 
+ * 
+ *************************************/
+
+RXPacketBuffer::RXPacketBuffer(uint64_t size, uint64_t *shared_counter) {
+    m_buffer           = nullptr;
+    m_head             = 0;
+    m_tail             = 0;
+    m_size             = (size + 1); // for the empty/full difference 1 slot reserved
+    m_shared_counter   = shared_counter;
+
+    /* reset the counter */
+    *m_shared_counter = 0;
+
+    /* generate queue */
+    m_buffer = new RXPacket*[m_size](); // zeroed
+
+    m_is_enabled = true;
+}
+
+RXPacketBuffer::~RXPacketBuffer() {
+    assert(m_buffer);
+
+    while (!is_empty()) {
+        RXPacket *pkt = pop();
+        delete pkt;
+    }
+    delete [] m_buffer;
+}
+
+RXPacketBuffer *
+RXPacketBuffer::freeze_and_clone() {
+    /* create a new one - same size and shared counter 0 */
+    RXPacketBuffer *new_buffer = new RXPacketBuffer(m_size, m_shared_counter);
+
+    /* freeze the current */
+    m_shared_counter = NULL;
+    m_is_enabled     = false;
+
+    return new_buffer;
+}
+
+void 
+RXPacketBuffer::handle_pkt(const rte_mbuf_t *m) {
+    assert(m_is_enabled);
+
+    /* if full - pop the oldest */
+    if (is_full()) {
+        delete pop();
+    }
+
+    /* push packet */
+    m_buffer[m_head] = new RXPacket(m);
+    m_head = next(m_head);
+    
+    /* update the shared counter - control plane memory */
+    (*m_shared_counter)++;
+}
+
+RXPacket *
+RXPacketBuffer::pop() {
+    assert(m_is_enabled);
+    assert(!is_empty());
+    
+    RXPacket *pkt = m_buffer[m_tail];
+    m_tail = next(m_tail);
+    (*m_shared_counter)--;
+    return pkt;
+}
+
+Json::Value
+RXPacketBuffer::to_json() const {
+
+    Json::Value output = Json::arrayValue;
+
+    int tmp = m_tail;
+    while (tmp != m_head) {
+        RXPacket *pkt = m_buffer[tmp];
+        output.append(pkt->to_json());
+        tmp = next(tmp);
+    }
+
+    return output;
+}
+
 /****************************** packet recorder ****************************/
 
 RXPacketRecorder::RXPacketRecorder() {
index fd023ea..9df4203 100644 (file)
 
 #include "common/captureFile.h"
 
-/************************* latency ***********************/
 
 class CPortLatencyHWBase;
 class CRFC2544Info;
 class CRxCoreErrCntrs;
 
+/**************************************
+ * RX feature latency
+ * 
+ *************************************/
 class RXLatency {
 public:
 
-    RXLatency() {
-        m_rcv_all    = false;
-        m_rfc2544    = NULL;
-        m_err_cntrs  = NULL;
+    RXLatency();
 
-        for (int i = 0; i < MAX_FLOW_STATS; i++) {
-            m_rx_pg_stat[i].clear();
-            m_rx_pg_stat_payload[i].clear();
-        }
-    }
-
-    void create(CRFC2544Info *rfc2544, CRxCoreErrCntrs *err_cntrs) {
-        m_rfc2544 = rfc2544;
-        m_err_cntrs = err_cntrs;
-    }
+    void create(CRFC2544Info *rfc2544, CRxCoreErrCntrs *err_cntrs);
 
     void reset_stats();
 
@@ -81,26 +72,24 @@ public:
     CRxCoreErrCntrs      *m_err_cntrs;
 };
 
-/************************ queue ***************************/
-
 /**                
  * describes a single saved RX packet
  * 
  */
-class RxPacket {
+class RXPacket {
 public:
 
-    RxPacket(const rte_mbuf_t *m) {
+    RXPacket(const rte_mbuf_t *m) {
         /* assume single part packet */
         assert(m->nb_segs == 1);
 
         m_size = m->pkt_len;
         const uint8_t *p = rte_pktmbuf_mtod(m, uint8_t *);
 
-        m_raw = (uint8_t *)malloc(m_size);
+        m_raw = new uint8_t[m_size];
         memcpy(m_raw, p, m_size);
         
-        /* save the packet timestamp */
+        /* generate a packet timestamp */
         m_timestamp = now_sec();
     }
 
@@ -112,9 +101,9 @@ public:
         return output;
     }
 
-    ~RxPacket() {
+    ~RXPacket() {
         if (m_raw) {
-            delete m_raw;
+            delete [] m_raw;
         }
     }
 
@@ -125,48 +114,36 @@ private:
     dsec_t     m_timestamp;
 };
 
-/**
- * a simple cyclic buffer to hold RX packets
- * 
- */
-class RxPacketBuffer {
-public:
-
-    RxPacketBuffer(uint64_t size, uint64_t *shared_counter) {
-        m_buffer           = nullptr;
-        m_head             = 0;
-        m_tail             = 0;
-        m_size             = (size + 1); // for the empty/full difference 1 slot reserved
-        m_shared_counter   = shared_counter;
 
-        *m_shared_counter = 0;
-
-        m_buffer = new RxPacket*[m_size](); // zeroed
-
-        m_is_enabled = true;
-    }
+/**************************************
+ * RX feature queue 
+ * 
+ *************************************/
 
-    ~RxPacketBuffer() {
-        assert(m_buffer);
+class RXPacketBuffer {
+public:
 
-        while (!is_empty()) {
-            RxPacket *pkt = pop();
-            delete pkt;
-        }
-        delete [] m_buffer;
-    }
+    RXPacketBuffer(uint64_t size, uint64_t *shared_counter);
+    ~RXPacketBuffer();
 
-    /* freeze the data structure - no more packets can be pushed / poped */
-    RxPacketBuffer * freeze_and_clone() {
-        /* create a new one */
-        RxPacketBuffer *new_buffer = new RxPacketBuffer(m_size, m_shared_counter);
+    /**
+     * handle a new packet
+     * 
+     */
+    void handle_pkt(const rte_mbuf_t *m);
+    
+    /**
+     * freezes the queue and clones a new one
+     * 
+     */
+    RXPacketBuffer * freeze_and_clone();
 
-        /* freeze the current */
-        m_shared_counter = NULL;
-        m_is_enabled = false;
+    /**
+     * generate a JSON output of the queue
+     * 
+     */
+    Json::Value to_json() const;
 
-        return new_buffer;
-    }
 
     bool is_empty() const {
         return (m_head == m_tail);
@@ -176,72 +153,33 @@ public:
         return ( next(m_head) == m_tail);
     }
 
-    void handle_pkt(const rte_mbuf_t *m) {
-        assert(m_is_enabled);
-
-        /* if full - pop the oldest */
-        if (is_full()) {
-            delete pop();
-        }
-
-        (*m_shared_counter)++;
-
-        m_buffer[m_head] = new RxPacket(m);
-        m_head = next(m_head);
-    }
-
-    /**
-     * generate a JSON output of the queue
-     * 
-     */
-    Json::Value to_json() const {
-
-        Json::Value output = Json::arrayValue;
-
-        int tmp = m_tail;
-        while (tmp != m_head) {
-            RxPacket *pkt = m_buffer[tmp];
-            output.append(pkt->to_json());
-            tmp = next(tmp);
-        }
-
-        return output;
-    }
-
 private:
     int next(int v) const {
         return ( (v + 1) % m_size );
     }
 
     /* pop in case of full queue - internal usage */
-    RxPacket * pop() {
-        assert(m_is_enabled);
-        assert(!is_empty());
-
-        RxPacket *pkt = m_buffer[m_tail];
-        m_tail = next(m_tail);
-        (*m_shared_counter)--;
-        return pkt;
-    }
+    RXPacket * pop();
 
     int             m_head;
     int             m_tail;
     int             m_size;
-    RxPacket      **m_buffer;
+    RXPacket      **m_buffer;
     uint64_t       *m_shared_counter;
     bool            m_is_enabled;
 };
 
-/************************ recoder ***************************/
 
-/**
- * RX packet recorder to PCAP file
+/**************************************
+ * RX feature PCAP recorder 
  * 
- */
+ *************************************/
+
 class RXPacketRecorder {
 public:
     RXPacketRecorder();
     ~RXPacketRecorder();
+    
     void start(const std::string &pcap, uint64_t limit, uint64_t *shared_counter);
     void stop();
     void handle_pkt(const rte_mbuf_t *m);
@@ -251,7 +189,7 @@ private:
     CCapPktRaw        m_pkt;
     dsec_t            m_epoch;
     uint64_t          m_limit;
-    uint64_t          *m_shared_counter;
+    uint64_t         *m_shared_counter;
 };
 
 
@@ -265,9 +203,9 @@ private:
 class RXPortManager {
 public:
     enum features_t {
-        LATENCY = 0x1,
+        LATENCY  = 0x1,
         CAPTURE  = 0x2,
-        QUEUE   = 0x4
+        QUEUE    = 0x4
     };
 
     RXPortManager() {
@@ -328,7 +266,7 @@ public:
         if (m_pkt_buffer) {
             delete m_pkt_buffer;
         }
-        m_pkt_buffer = new RxPacketBuffer(size, shared_counter);
+        m_pkt_buffer = new RXPacketBuffer(size, shared_counter);
         set_feature(QUEUE);
     }
 
@@ -340,7 +278,7 @@ public:
         unset_feature(QUEUE);
     }
 
-    RxPacketBuffer *get_pkt_buffer() {
+    RXPacketBuffer *get_pkt_buffer() {
         if (!is_feature_set(QUEUE)) {
             return NULL;
         }
@@ -348,7 +286,7 @@ public:
         assert(m_pkt_buffer);
 
         /* hold a pointer to the old one */
-        RxPacketBuffer *old_buffer = m_pkt_buffer;
+        RXPacketBuffer *old_buffer = m_pkt_buffer;
 
         /* replace the old one with a new one and freeze the old */
         m_pkt_buffer = old_buffer->freeze_and_clone();
@@ -407,7 +345,7 @@ private:
     uint32_t                     m_features;
     RXPacketRecorder             m_recorder;
     RXLatency                    m_latency;
-    RxPacketBuffer              *m_pkt_buffer;
+    RXPacketBuffer              *m_pkt_buffer;
     CCpuUtlDp                   *m_cpu_dp_u;
     CPortLatencyHWBase          *m_io;
 };