reply to messages
authorimarom <[email protected]>
Wed, 30 Nov 2016 11:00:54 +0000 (13:00 +0200)
committerimarom <[email protected]>
Wed, 30 Nov 2016 11:35:40 +0000 (13:35 +0200)
Signed-off-by: imarom <[email protected]>
16 files changed:
scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_jsonrpc_client.py
scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py
scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_rx_features.py
src/common/basic_utils.h
src/main_dpdk.cpp
src/rpc-server/commands/trex_rpc_cmd_general.cpp
src/stateless/cp/trex_stateless_port.cpp
src/stateless/cp/trex_stateless_port.h
src/stateless/messaging/trex_stateless_messaging.cpp
src/stateless/messaging/trex_stateless_messaging.h
src/stateless/rx/trex_stateless_rx_core.cpp
src/stateless/rx/trex_stateless_rx_core.h
src/stateless/rx/trex_stateless_rx_defs.h
src/stateless/rx/trex_stateless_rx_port_mngr.cpp
src/stateless/rx/trex_stateless_rx_port_mngr.h
src/trex_port_attr.cpp

index 4ebfa0b..93a930e 100644 (file)
@@ -263,11 +263,6 @@ class JsonRpcClient(object):
 
         self.connected = True
 
-        rc = self.invoke_rpc_method('ping', api_class = None)
-        if not rc:
-            self.connected = False
-            return rc
-        
         return RC_OK()
 
 
index d009253..42b7b89 100644 (file)
@@ -2,7 +2,7 @@
 from collections import namedtuple, OrderedDict
 
 from .trex_stl_packet_builder_scapy import STLPktBuilder
-from .trex_stl_streams import STLStream, STLTXSingleBurst
+from .trex_stl_streams import STLStream
 from .trex_stl_types import *
 from .trex_stl_rx_features import ARPResolver, PingResolver
 from . import trex_stl_stats
@@ -577,8 +577,7 @@ class Port(object):
         for i in range(len(pkts)):
             pkts[i]['binary'] = base64.b64decode(pkts[i]['binary'])
             
-            
-        return pkts
+        return RC_OK(pkts)
         
         
     @owned
index f9f6a49..5c2fd9a 100644 (file)
@@ -114,7 +114,7 @@ class Resolver(object):
                 return rx_pkts
                 
             # for each packet - examine it
-            for pkt in rx_pkts:
+            for pkt in rx_pkts.data():
                 rc = self.on_pkt_rx(pkt)
                 if rc is not None:
                     return rc
index ce2da69..c30457d 100755 (executable)
@@ -87,7 +87,6 @@ bool utl_is_file_exists (const std::string& name) ;
 void utl_macaddr_to_str(const uint8_t *macaddr, std::string &output);
 std::string utl_macaddr_to_str(const uint8_t *macaddr);
 
-std::string utl_macaddr_to_str(const uint8_t *macaddr);
 bool utl_str_to_macaddr(const std::string &s, uint8_t *mac);
 
 std::string utl_generate_random_str(unsigned int &seed, int len);
index 1341def..66a3611 100644 (file)
@@ -1725,19 +1725,18 @@ bool DpdkTRexPortAttr::update_link_status_nowait(){
     bool changed = false;
     rte_eth_link_get_nowait(m_port_id, &new_link);
     
-    /* if the link got down - update the dest atribute to move to unresolved */
-    if (new_link.link_status != m_link.link_status) {
-        get_dest().on_link_down();
-        changed = true;
-    }
-    
-    /* other changes */
     if (new_link.link_speed != m_link.link_speed ||
                 new_link.link_duplex != m_link.link_duplex ||
-                    new_link.link_autoneg != m_link.link_autoneg) {
+                    new_link.link_autoneg != m_link.link_autoneg ||
+                        new_link.link_status != m_link.link_status) {
         changed = true;
+        
+        /* in case of link status change - notify the dest object */
+        if (new_link.link_status != m_link.link_status) {
+            get_dest().on_link_down();
+        }
     }
-    
+     
     m_link = new_link;
     return changed;
 }
index ccdcbd8..849c9be 100644 (file)
@@ -310,8 +310,8 @@ TrexRpcCmdGetSysInfo::_run(const Json::Value &params, Json::Value &result) {
         section["ports"][i]["driver"]       = driver;
         section["ports"][i]["description"]  = description;
 
-        section["ports"][i]["pci_addr"]         = pci_addr;
-        section["ports"][i]["numa"]             = numa;
+        section["ports"][i]["pci_addr"]     = pci_addr;
+        section["ports"][i]["numa"]         = numa;
 
         uint16_t caps = port->get_rx_caps();
         section["ports"][i]["rx"]["caps"]      = Json::arrayValue;
@@ -350,6 +350,7 @@ TrexRpcCmdSetPortAttr::parse_rx_filter_mode(const Json::Value &msg, uint8_t port
     } else if (type == "all") {
         filter_mode = RX_FILTER_MODE_ALL;
     } else {
+        /* can't happen - parsed choice */
         assert(0);
     }
 
@@ -647,8 +648,8 @@ TrexRpcCmdGetPortStatus::_run(const Json::Value &params, Json::Value &result) {
     get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id)->to_json(result["result"]["attr"]);
     
     /* RX info */
-    port->get_rx_features().to_json(result["result"]["rx_info"]);
-
+    result["result"]["rx_info"] = port->rx_features_to_json();
+    
     return (TREX_RPC_CMD_OK);
 }
 
index ca18575..7edf1a3 100644 (file)
@@ -936,61 +936,67 @@ TrexStatelessPort::remove_and_delete_all_streams() {
 
 void 
 TrexStatelessPort::start_rx_capture(const std::string &pcap_filename, uint64_t limit) {
-
-    m_rx_features_info.m_rx_capture_info.enable(pcap_filename, limit);
-
-    TrexStatelessRxStartCapture *msg = new TrexStatelessRxStartCapture(m_port_id, m_rx_features_info.m_rx_capture_info);
+    static MsgReply<bool> reply;
+    
+    reply.reset();
+    
+    TrexStatelessRxStartCapture *msg = new TrexStatelessRxStartCapture(m_port_id, pcap_filename, limit, reply);
     send_message_to_rx((TrexStatelessCpToRxMsgBase *)msg);
     
     /* as below, must wait for ACK from RX core before returning ACK */
-    msg->wait_for_reply();
+    reply.wait_for_reply();
 }
 
 void
 TrexStatelessPort::stop_rx_capture() {
     TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStopCapture(m_port_id);
     send_message_to_rx(msg);
-    
-    /* update our cached data */
-    m_rx_features_info.m_rx_capture_info.disable();
 }
 
 void 
 TrexStatelessPort::start_rx_queue(uint64_t size) {
-
-    m_rx_features_info.m_rx_queue_info.enable(size);
-
-    TrexStatelessRxStartQueue *msg = new TrexStatelessRxStartQueue(m_port_id, m_rx_features_info.m_rx_queue_info);
+    static MsgReply<bool> reply;
+    
+    reply.reset();
+    
+    TrexStatelessRxStartQueue *msg = new TrexStatelessRxStartQueue(m_port_id, size, reply);
     send_message_to_rx( (TrexStatelessCpToRxMsgBase *)msg );
     
     /* we cannot return ACK to the user until the RX core has approved
        this might cause the user to lose some packets from the queue
      */
-    msg->wait_for_reply();
+    reply.wait_for_reply();
 }
 
 void
 TrexStatelessPort::stop_rx_queue() {
     TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStopQueue(m_port_id);
     send_message_to_rx(msg);
-    
-    /* update our cached data */
-    m_rx_features_info.m_rx_queue_info.disable();
 }
 
 
-RXPacketBuffer *
+const RXPacketBuffer *
 TrexStatelessPort::get_rx_queue_pkts() {
+    static MsgReply<const RXPacketBuffer *> reply;
+    
+    reply.reset();
 
-    if (m_rx_features_info.m_rx_queue_info.is_empty()) {
-        return NULL;
-    }
+    TrexStatelessRxQueueGetPkts *msg = new TrexStatelessRxQueueGetPkts(m_port_id, reply);
+    send_message_to_rx( (TrexStatelessCpToRxMsgBase *)msg );
+
+    return reply.wait_for_reply();
+}
+
+Json::Value
+TrexStatelessPort::rx_features_to_json() {
+    static MsgReply<Json::Value> reply;
+    
+    reply.reset();
 
-    TrexStatelessRxQueueGetPkts *msg = new TrexStatelessRxQueueGetPkts(m_port_id);
+    TrexStatelessRxFeaturesToJson *msg = new TrexStatelessRxFeaturesToJson(m_port_id, reply);
     send_message_to_rx( (TrexStatelessCpToRxMsgBase *)msg );
 
-    RXPacketBuffer *pkt_buffer = msg->wait_for_reply();
-    return pkt_buffer;
+    return reply.wait_for_reply();
 }
 
 /************* Trex Port Owner **************/
index 4aa7ff3..f2829b8 100644 (file)
@@ -393,21 +393,19 @@ public:
         */
        void stop_rx_queue();
 
-
     /**
-     * get the RX features info object
+     * fetch the RX queue packets from the queue
      * 
      */
-    const RXFeaturesInfo &get_rx_features() {
-        return m_rx_features_info;
-    }
+    const RXPacketBuffer *get_rx_queue_pkts();
 
     /**
-     * fetch the RX queue packets from the queue
+     * generate a JSON describing the status 
+     * of the RX features 
      * 
      */
-    RXPacketBuffer *get_rx_queue_pkts();
-
+    Json::Value rx_features_to_json();
+    
     /**
      * return the port attribute object
      * 
@@ -447,7 +445,7 @@ private:
      *
      */
     void send_message_to_rx(TrexStatelessCpToRxMsgBase *msg);
-
+    
     /**
      * when a port stops, perform various actions
      *
@@ -503,8 +501,6 @@ private:
 
     int m_pending_async_stop_event;
 
-    RXFeaturesInfo      m_rx_features_info;
-
 };
 
 
index cad4fe7..53d5a87 100644 (file)
@@ -259,9 +259,10 @@ bool TrexStatelessRxQuit::handle (CRxCoreStateless *rx_core) {
 
 bool
 TrexStatelessRxStartCapture::handle(CRxCoreStateless *rx_core) {
-    rx_core->start_recorder(m_port_id, m_pcap_filename, m_limit, m_shared_counter);
+    rx_core->start_recorder(m_port_id, m_pcap_filename, m_limit);
 
-    set_reply(true);
+    /* mark as done */
+    m_reply.set_reply(true);
     
     return true;
 }
@@ -275,10 +276,10 @@ TrexStatelessRxStopCapture::handle(CRxCoreStateless *rx_core) {
 
 bool
 TrexStatelessRxStartQueue::handle(CRxCoreStateless *rx_core) {
-    rx_core->start_queue(m_port_id, m_size, m_shared_counter);
+    rx_core->start_queue(m_port_id, m_size);
     
     /* mark as done */
-    set_reply(true);
+    m_reply.set_reply(true);
     
     return true;
 }
@@ -292,12 +293,24 @@ TrexStatelessRxStopQueue::handle(CRxCoreStateless *rx_core) {
 
 
 
-bool TrexStatelessRxQueueGetPkts::handle(CRxCoreStateless *rx_core) {
-    RXPacketBuffer *pkt_buffer = rx_core->get_rx_queue_pkts(m_port_id);
-    assert(pkt_buffer);
+bool
+TrexStatelessRxQueueGetPkts::handle(CRxCoreStateless *rx_core) {
+    const RXPacketBuffer *pkt_buffer = rx_core->get_rx_queue_pkts(m_port_id);
     
     /* set the reply */
-    set_reply(pkt_buffer);
+    m_reply.set_reply(pkt_buffer);
 
     return true;
 }
+
+
+bool
+TrexStatelessRxFeaturesToJson::handle(CRxCoreStateless *rx_core) {
+    Json::Value output = rx_core->get_rx_port_mngr(m_port_id).to_json();
+    
+    /* set the reply */
+    m_reply.set_reply(output);
+
+    return true;
+}
+
index b1d9117..303548a 100644 (file)
@@ -34,6 +34,57 @@ class TrexStreamsCompiledObj;
 class CFlowGenListPerThread;
 class RXPacketBuffer;
 
+/**
+ * Generic message reply object
+ * 
+ * @author imarom (11/27/2016)
+ */
+template<typename T> class MsgReply {
+
+public:
+    
+    MsgReply() {
+        reset();
+    }
+
+    void reset() {
+        m_pending = true;
+    }
+    
+    bool is_pending() const {
+        return m_pending;
+    }
+
+    void set_reply(const T &reply) {
+        m_reply = reply;
+
+        /* before marking as done make sure all stores are committed */
+        asm volatile("mfence" ::: "memory");
+        m_pending = false;
+    }
+
+    T wait_for_reply(int timeout_ms = 100, int backoff_ms = 1) {
+        int guard = timeout_ms;
+
+        while (is_pending()) {
+            guard -= backoff_ms;
+            if (guard < 0) {
+                throw TrexException("timeout: failed to get reply from core");
+            }
+
+            delay(backoff_ms);
+        }
+        
+        return m_reply;
+
+    }
+    
+protected:
+    volatile bool  m_pending;
+    T              m_reply;
+};
+
+
 /**
  * defines the base class for CP to DP messages
  *
@@ -408,52 +459,6 @@ public:
 
 };
 
-/**
- * defines the base class for CP to RX with reply
- * 
- * @author imarom (11/27/2016)
- */
-template<typename T> class TrexStatelessCpToRxMsgReply : public TrexStatelessCpToRxMsgBase {
-
-public:
-    
-    TrexStatelessCpToRxMsgReply() {
-        m_pending = true;
-    }
-
-    bool is_pending() const {
-        return m_pending;
-    }
-
-    void set_reply(const T &reply) {
-        m_reply = reply;
-
-        /* before marking as done - memory fence */
-        asm volatile("mfence" ::: "memory");
-        m_pending = false;
-    }
-
-    T wait_for_reply(int timeout_ms = 100, int backoff_ms = 1) {
-        int guard = timeout_ms;
-
-        while (is_pending()) {
-            guard -= backoff_ms;
-            if (guard < 0) {
-                throw TrexException("timeout: RX core has failed to reply");
-            }
-
-            delay(backoff_ms);
-        }
-        
-        return m_reply;
-
-    }
-    
-protected:
-    volatile bool  m_pending;
-    T              m_reply;
-};
-
 
 class TrexStatelessRxEnableLatency : public TrexStatelessCpToRxMsgBase {
     bool handle (CRxCoreStateless *rx_core);
@@ -469,22 +474,25 @@ class TrexStatelessRxQuit : public TrexStatelessCpToRxMsgBase {
 
 
 
-class TrexStatelessRxStartCapture : public TrexStatelessCpToRxMsgReply<bool> {
+class TrexStatelessRxStartCapture : public TrexStatelessCpToRxMsgBase {
 public:
-    TrexStatelessRxStartCapture(uint8_t port_id, RXCaptureInfo &rx_capture_info) {
+    TrexStatelessRxStartCapture(uint8_t port_id,
+                                const std::string &pcap_filename,
+                                uint64_t limit,
+                                MsgReply<bool> &reply) : m_reply(reply) {
+        
         m_port_id          = port_id;
-        m_limit            = rx_capture_info.m_limit;
-        m_pcap_filename    = rx_capture_info.m_pcap_filename;
-        m_shared_counter   = &rx_capture_info.m_shared_counter;
+        m_limit            = limit;
+        m_pcap_filename    = pcap_filename;
     }
 
     virtual bool handle(CRxCoreStateless *rx_core);
 
 private:
-    uint8_t           m_port_id;
-    std::string       m_pcap_filename;
-    uint64_t          m_limit;
-    uint64_t          *m_shared_counter;
+    uint8_t            m_port_id;
+    std::string        m_pcap_filename;
+    uint64_t           m_limit;
+    MsgReply<bool>    &m_reply;
 };
 
 
@@ -501,12 +509,14 @@ private:
 };
 
 
-class TrexStatelessRxStartQueue : public TrexStatelessCpToRxMsgReply<bool> {
+class TrexStatelessRxStartQueue : public TrexStatelessCpToRxMsgBase {
 public:
-    TrexStatelessRxStartQueue(uint8_t port_id, RXQueueInfo &rx_queue_info) {
+    TrexStatelessRxStartQueue(uint8_t port_id,
+                              uint64_t size,
+                              MsgReply<bool> &reply) : m_reply(reply) {
+        
         m_port_id           = port_id;
-        m_size              = rx_queue_info.m_size;
-        m_shared_counter    = &rx_queue_info.m_shared_counter;
+        m_size              = size;
     }
 
     virtual bool handle(CRxCoreStateless *rx_core);
@@ -514,7 +524,7 @@ public:
 private:
     uint8_t           m_port_id;
     uint64_t          m_size;
-    uint64_t          *m_shared_counter;
+    MsgReply<bool>   &m_reply;
 };
 
 
@@ -532,10 +542,10 @@ private:
 
 
 
-class TrexStatelessRxQueueGetPkts : public TrexStatelessCpToRxMsgReply<RXPacketBuffer *> {
+class TrexStatelessRxQueueGetPkts : public TrexStatelessCpToRxMsgBase {
 public:
 
-    TrexStatelessRxQueueGetPkts(uint8_t port_id) {
+    TrexStatelessRxQueueGetPkts(uint8_t port_id, MsgReply<const RXPacketBuffer *> &reply) : m_reply(reply) {
         m_port_id = port_id;
     }
 
@@ -546,8 +556,30 @@ public:
     virtual bool handle(CRxCoreStateless *rx_core);
 
 private:
-    uint8_t  m_port_id;
+    uint8_t                              m_port_id;
+    MsgReply<const RXPacketBuffer *>    &m_reply;
+    
 };
 
+/**
+ * a request from RX core to dump to Json the RX features
+ */
+class TrexStatelessRxFeaturesToJson : public TrexStatelessCpToRxMsgBase {
+public:
+    
+    TrexStatelessRxFeaturesToJson(uint8_t port_id, MsgReply<Json::Value> &reply) : m_reply(reply) {
+        m_port_id = port_id;
+    }
+     
+    /**
+     * virtual function to handle a message
+     *
+     */
+    virtual bool handle(CRxCoreStateless *rx_core);
+    
+private:
+    uint8_t                  m_port_id;
+    MsgReply<Json::Value>   &m_reply;
+};
 
 #endif /* __TREX_STATELESS_MESSAGING_H__ */
index a1ff9c6..b24fcb8 100644 (file)
@@ -374,8 +374,8 @@ double CRxCoreStateless::get_cpu_util() {
 
 
 void
-CRxCoreStateless::start_recorder(uint8_t port_id, const std::string &pcap_filename, uint64_t limit, uint64_t *shared_counter) {
-    m_rx_port_mngr[port_id].start_recorder(pcap_filename, limit, shared_counter);
+CRxCoreStateless::start_recorder(uint8_t port_id, const std::string &pcap_filename, uint64_t limit) {
+    m_rx_port_mngr[port_id].start_recorder(pcap_filename, limit);
 }
 
 void
@@ -384,8 +384,8 @@ CRxCoreStateless::stop_recorder(uint8_t port_id) {
 }
 
 void
-CRxCoreStateless::start_queue(uint8_t port_id, uint64_t size, uint64_t *shared_counter) {
-    m_rx_port_mngr[port_id].start_queue(size, shared_counter);
+CRxCoreStateless::start_queue(uint8_t port_id, uint64_t size) {
+    m_rx_port_mngr[port_id].start_queue(size);
 }
 
 void
@@ -407,3 +407,9 @@ CRxCoreStateless::disable_latency() {
     }
 }
 
+const RXPortManager &
+CRxCoreStateless::get_rx_port_mngr(uint8_t port_id) {
+    assert(port_id < m_max_ports);
+    return m_rx_port_mngr[port_id];
+    
+}
index 8e50a46..b27a7ca 100644 (file)
@@ -111,27 +111,22 @@ class CRxCoreStateless {
     double get_cpu_util();
     void update_cpu_util();
 
-    RXPacketBuffer *get_rx_queue_pkts(uint8_t port_id) {
+    const RXPacketBuffer *get_rx_queue_pkts(uint8_t port_id) {
         return m_rx_port_mngr[port_id].get_pkt_buffer();
     }
 
     /**
-     * start capturing of RX packets on a specific port
-     * 
-     * @author imarom (11/2/2016)
-     * 
-     * @param port_id 
-     * @param pcap_filename 
-     * @param limit 
+     * start capturing of RX packets on a specific port 
+     *  
      */
-    void start_recorder(uint8_t port_id, const std::string &pcap_filename, uint64_t limit, uint64_t *shared_counter);
+    void start_recorder(uint8_t port_id, const std::string &pcap_filename, uint64_t limit);
     void stop_recorder(uint8_t port_id);
 
     /**
      * start RX queueing of packets
      * 
      */
-    void start_queue(uint8_t port_id, uint64_t size, uint64_t *shared_counter);
+    void start_queue(uint8_t port_id, uint64_t size);
     void stop_queue(uint8_t port_id);
 
     /**
@@ -141,6 +136,8 @@ class CRxCoreStateless {
     void enable_latency();
     void disable_latency();
 
+    const RXPortManager &get_rx_port_mngr(uint8_t port_id);
+    
  private:
     void handle_cp_msg(TrexStatelessCpToRxMsgBase *msg);
     bool periodic_check_for_cp_messages();
@@ -162,7 +159,7 @@ class CRxCoreStateless {
     }
 
     void try_rx_queues();
-
+    
  private:
     TrexMonitor      m_monitor;
     uint32_t         m_max_ports;
index 7b1e0f3..3e5d2c3 100644 (file)
@@ -55,103 +55,5 @@ typedef enum rx_filter_mode_ {
     RX_FILTER_MODE_ALL
 } rx_filter_mode_e;
 
-/**
- * holds RX capture info
- * 
- */
-class RXCaptureInfo {
-public:
-    RXCaptureInfo() {
-        m_is_active = false;
-        m_limit = 0;
-        m_shared_counter = 0;
-    }
-
-    void enable(const std::string &pcap_filename, uint64_t limit) {
-        m_pcap_filename = pcap_filename;
-        m_limit = limit;
-        m_is_active = true; 
-    }
-
-    void disable() {
-        m_is_active = false;
-        m_pcap_filename = "";
-        m_limit = 0;
-    }
-
-    bool is_empty() const {
-        return (m_shared_counter == 0);
-    }
-
-    void to_json(Json::Value &output) const {
-        output["is_active"] = m_is_active;
-        if (m_is_active) {
-            output["pcap_filename"] = m_pcap_filename;
-            output["limit"]         = Json::UInt64(m_limit);
-            output["count"]         = Json::UInt64(m_shared_counter);
-        }
-    }
-
-public:
-    bool             m_is_active;
-    std::string      m_pcap_filename;
-    uint64_t         m_limit;
-    uint64_t         m_shared_counter;
-};
-
-
-class RXQueueInfo {
-public:
-
-    RXQueueInfo() {
-        m_is_active = false;
-        m_shared_counter = 0;
-    }
-
-    void enable(uint64_t size) {
-        m_size = size;
-        m_is_active = true;
-    }
-
-    void disable() {
-        m_is_active = false;
-        m_size = 0;
-    }
-
-    bool is_empty() const {
-        return (m_shared_counter == 0);
-    }
-    
-    void to_json(Json::Value &output) const {
-        output["is_active"] = m_is_active;
-        if (m_is_active) {
-            output["size"]          = Json::UInt64(m_size);
-            output["count"]         = Json::UInt64(m_shared_counter);
-        }
-    }
-
-public:
-    bool        m_is_active;
-    uint64_t    m_size;
-    uint64_t    m_shared_counter;
-};
-
-
-/**
- * holds all the RX features info
- * 
- * @author imarom (11/7/2016)
- */
-class RXFeaturesInfo {
-public:
-    RXCaptureInfo m_rx_capture_info;
-    RXQueueInfo   m_rx_queue_info;
-
-    void to_json(Json::Value &msg) const {
-        m_rx_capture_info.to_json(msg["sniffer"]);
-        m_rx_queue_info.to_json(msg["queue"]);
-    }
-};
-
 #endif /* __TREX_STATELESS_RX_DEFS_H__ */
 
index 78f4ac5..00032e8 100644 (file)
@@ -156,25 +156,24 @@ RXLatency::reset_stats() {
     }
 }
 
+Json::Value
+RXLatency::to_json() const {
+    return Json::objectValue;
+}
+
 /**************************************
  * RX feature queue 
  * 
  *************************************/
 
-RXPacketBuffer::RXPacketBuffer(uint64_t size, uint64_t *shared_counter) {
+RXPacketBuffer::RXPacketBuffer(uint64_t size) {
     m_buffer           = nullptr;
     m_head             = 0;
     m_tail             = 0;
     m_size             = (size + 1); // for the empty/full difference 1 slot reserved
-    m_shared_counter   = shared_counter;
-
-    /* reset the counter */
-    *m_shared_counter = 0;
 
     /* generate queue */
     m_buffer = new RXPacket*[m_size](); // zeroed
-
-    m_is_enabled = true;
 }
 
 RXPacketBuffer::~RXPacketBuffer() {
@@ -187,22 +186,8 @@ RXPacketBuffer::~RXPacketBuffer() {
     delete [] m_buffer;
 }
 
-RXPacketBuffer *
-RXPacketBuffer::freeze_and_clone() {
-    /* create a new one - same size and shared counter 0 */
-    RXPacketBuffer *new_buffer = new RXPacketBuffer(m_size, m_shared_counter);
-
-    /* freeze the current */
-    m_shared_counter = NULL;
-    m_is_enabled     = false;
-
-    return new_buffer;
-}
-
 void 
 RXPacketBuffer::push(const rte_mbuf_t *m) {
-    assert(m_is_enabled);
-
     /* if full - pop the oldest */
     if (is_full()) {
         delete pop();
@@ -211,22 +196,27 @@ RXPacketBuffer::push(const rte_mbuf_t *m) {
     /* push packet */
     m_buffer[m_head] = new RXPacket(m);
     m_head = next(m_head);
-    
-    /* update the shared counter - control plane memory */
-    (*m_shared_counter)++;
 }
 
 RXPacket *
 RXPacketBuffer::pop() {
-    assert(m_is_enabled);
     assert(!is_empty());
     
     RXPacket *pkt = m_buffer[m_tail];
     m_tail = next(m_tail);
-    (*m_shared_counter)--;
+    
     return pkt;
 }
 
+uint64_t
+RXPacketBuffer::get_element_count() const {
+    if (m_head >= m_tail) {
+        return (m_head - m_tail);
+    } else {
+        return ( get_capacity() - (m_tail - m_head - 1) );
+    }
+}
+
 Json::Value
 RXPacketBuffer::to_json() const {
 
@@ -244,11 +234,11 @@ RXPacketBuffer::to_json() const {
 
 
 void
-RXQueue::start(uint64_t size, uint64_t *shared_counter) {
+RXQueue::start(uint64_t size) {
     if (m_pkt_buffer) {
         delete m_pkt_buffer;
     }
-    m_pkt_buffer = new RXPacketBuffer(size, shared_counter);
+    m_pkt_buffer = new RXPacketBuffer(size);
 }
 
 void
@@ -259,10 +249,11 @@ RXQueue::stop() {
     }
 }
 
-RXPacketBuffer *
+const RXPacketBuffer *
 RXQueue::fetch() {
 
-    if (!m_pkt_buffer) {
+    /* if no buffer or the buffer is empty - give a NULL one */
+    if ( (!m_pkt_buffer) || (m_pkt_buffer->get_element_count() == 0) ) {
         return nullptr;
     }
     
@@ -270,7 +261,7 @@ RXQueue::fetch() {
     RXPacketBuffer *old_buffer = m_pkt_buffer;
 
     /* replace the old one with a new one and freeze the old */
-    m_pkt_buffer = old_buffer->freeze_and_clone();
+    m_pkt_buffer = new RXPacketBuffer(old_buffer->get_capacity());
 
     return old_buffer;
 }
@@ -280,6 +271,18 @@ RXQueue::handle_pkt(const rte_mbuf_t *m) {
     m_pkt_buffer->push(m);
 }
 
+Json::Value
+RXQueue::to_json() const {
+    assert(m_pkt_buffer != NULL);
+    
+    Json::Value output = Json::objectValue;
+    
+    output["size"]    = Json::UInt64(m_pkt_buffer->get_capacity());
+    output["count"]   = Json::UInt64(m_pkt_buffer->get_element_count());
+    
+    return output;
+}
+
 /**************************************
  * RX feature recorder
  * 
@@ -287,13 +290,15 @@ RXQueue::handle_pkt(const rte_mbuf_t *m) {
 
 RXPacketRecorder::RXPacketRecorder() {
     m_writer = NULL;
-    m_shared_counter = NULL;
+    m_count  = 0;
     m_limit  = 0;
     m_epoch  = -1;
+    
+    m_pending_flush = false;
 }
 
 void
-RXPacketRecorder::start(const std::string &pcap, uint64_t limit, uint64_t *shared_counter) {
+RXPacketRecorder::start(const std::string &pcap, uint64_t limit) {
     m_writer = CCapWriterFactory::CreateWriter(LIBPCAP, (char *)pcap.c_str());
     if (m_writer == NULL) {
         std::stringstream ss;
@@ -302,23 +307,29 @@ RXPacketRecorder::start(const std::string &pcap, uint64_t limit, uint64_t *share
     }
 
     assert(limit > 0);
+    
     m_limit = limit;
-    m_shared_counter = shared_counter;
-    (*m_shared_counter) = 0;
+    m_count = 0;
+    m_pending_flush = false;
+    m_pcap_filename = pcap;
 }
 
 void
 RXPacketRecorder::stop() {
-    if (m_writer) {
-        delete m_writer;
-        m_writer = NULL;
+    if (!m_writer) {
+        return;
     }
+    
+    delete m_writer;
+    m_writer = NULL;
 }
 
 void
 RXPacketRecorder::flush_to_disk() {
-    if (m_writer) {
+    
+    if (m_writer && m_pending_flush) {
         m_writer->flush_to_disk();
+        m_pending_flush = false;
     }
 }
 
@@ -344,15 +355,25 @@ RXPacketRecorder::handle_pkt(const rte_mbuf_t *m) {
     memcpy(m_pkt.raw, p, m->pkt_len);
     
     m_writer->write_packet(&m_pkt);
-
-    m_limit--;
-    (*m_shared_counter)++;
-
-    if (m_limit == 0) {
+    m_count++;
+    m_pending_flush = true;
+    
+    if (m_count == m_limit) {
         stop();
     }
+    
 }
 
+Json::Value
+RXPacketRecorder::to_json() const {
+    Json::Value output = Json::objectValue;
+    
+    output["pcap_filename"] = m_pcap_filename;
+    output["limit"]         = Json::UInt64(m_limit);
+    output["count"]         = Json::UInt64(m_count);
+    
+    return output;
+}
 
 /**************************************
  * Port manager 
@@ -432,3 +453,31 @@ RXPortManager::tick() {
         m_recorder.flush_to_disk();
     }
 }
+
+Json::Value
+RXPortManager::to_json() const {
+    Json::Value output = Json::objectValue;
+    
+    if (is_feature_set(LATENCY)) {
+        output["latency"] = m_latency.to_json();
+        output["latency"]["is_active"] = true;
+    } else {
+        output["latency"]["is_active"] = false;
+    }
+
+    if (is_feature_set(RECORDER)) {
+        output["sniffer"] = m_recorder.to_json();
+        output["sniffer"]["is_active"] = true;
+    } else {
+        output["sniffer"]["is_active"] = false;
+    }
+
+    if (is_feature_set(QUEUE)) {
+        output["queue"] = m_queue.to_json();
+        output["queue"]["is_active"] = true;
+    } else {
+        output["queue"]["is_active"] = false;
+    }
+    return output;
+}
index 564b15d..bc34b5a 100644 (file)
@@ -47,6 +47,8 @@ public:
 
     void handle_pkt(const rte_mbuf_t *m);
 
+    Json::Value to_json() const;
+    
 private:
     bool is_flow_stat_id(uint32_t id) {
         if ((id & 0x000fff00) == IP_ID_RESERVE_BASE) return true;
@@ -123,7 +125,7 @@ private:
 class RXPacketBuffer {
 public:
 
-    RXPacketBuffer(uint64_t size, uint64_t *shared_counter);
+    RXPacketBuffer(uint64_t size);
     ~RXPacketBuffer();
 
     /**
@@ -132,12 +134,6 @@ public:
      */
     void push(const rte_mbuf_t *m);
     
-    /**
-     * freezes the queue and clones a new one
-     * 
-     */
-    RXPacketBuffer * freeze_and_clone();
-
     /**
      * generate a JSON output of the queue
      * 
@@ -153,6 +149,19 @@ public:
         return ( next(m_head) == m_tail);
     }
 
+    /**
+     * return the total amount of space possible
+     */
+    uint64_t get_capacity() const {
+        /* one slot is used for diff between full/empty */
+        return (m_size - 1);
+    }
+    
+    /**
+     * returns how many elements are in the queue
+     */
+    uint64_t get_element_count() const;
+    
 private:
     int next(int v) const {
         return ( (v + 1) % m_size );
@@ -165,8 +174,6 @@ private:
     int             m_tail;
     int             m_size;
     RXPacket      **m_buffer;
-    uint64_t       *m_shared_counter;
-    bool            m_is_enabled;
 };
 
 
@@ -184,13 +191,13 @@ public:
      * start RX queue
      * 
      */
-    void start(uint64_t size, uint64_t *shared_counter);
+    void start(uint64_t size);
     
     /**
      * fetch the current buffer
-     * 
+     * return NULL if no packets
      */
-    RXPacketBuffer * fetch();
+    const RXPacketBuffer * fetch();
     
     /**
      * stop RX queue
@@ -200,6 +207,8 @@ public:
     
     void handle_pkt(const rte_mbuf_t *m);
     
+    Json::Value to_json() const;
+    
 private:
     RXPacketBuffer  *m_pkt_buffer;
 };
@@ -217,7 +226,7 @@ public:
         stop();
     }
     
-    void start(const std::string &pcap, uint64_t limit, uint64_t *shared_counter);
+    void start(const std::string &pcap, uint64_t limit);
     void stop();
     void handle_pkt(const rte_mbuf_t *m);
 
@@ -227,12 +236,16 @@ public:
      */
     void flush_to_disk();
     
+    Json::Value to_json() const;
+    
 private:
     CFileWriterBase  *m_writer;
+    std::string       m_pcap_filename;
     CCapPktRaw        m_pkt;
     dsec_t            m_epoch;
     uint64_t          m_limit;
-    uint64_t         *m_shared_counter;
+    uint64_t          m_count;
+    bool              m_pending_flush;
 };
 
 
@@ -277,8 +290,8 @@ public:
     }
 
     /* recorder */
-    void start_recorder(const std::string &pcap, uint64_t limit_pkts, uint64_t *shared_counter) {
-        m_recorder.start(pcap, limit_pkts, shared_counter);
+    void start_recorder(const std::string &pcap, uint64_t limit_pkts) {
+        m_recorder.start(pcap, limit_pkts);
         set_feature(RECORDER);
     }
 
@@ -288,8 +301,8 @@ public:
     }
 
     /* queue */
-    void start_queue(uint32_t size, uint64_t *shared_counter) {
-        m_queue.start(size, shared_counter);
+    void start_queue(uint32_t size) {
+        m_queue.start(size);
         set_feature(QUEUE);
     }
 
@@ -298,7 +311,7 @@ public:
         unset_feature(QUEUE); 
     }
 
-    RXPacketBuffer *get_pkt_buffer() {
+    const RXPacketBuffer *get_pkt_buffer() {
         if (!is_feature_set(QUEUE)) {
             return nullptr;
         }
@@ -346,6 +359,11 @@ public:
         return (!has_features_set());
     }
 
+    /**
+     * write the status to a JSON format
+     */
+    Json::Value to_json() const;
+    
 private:
     
     void clear_all_features() {
index b215a24..53ae19d 100644 (file)
@@ -48,7 +48,9 @@ DestAttr::set_dest(uint32_t ipv4, const uint8_t *mac) {
     assert(ipv4 != 0);
 
     m_ipv4 = ipv4;
-    memcpy(m_mac, mac, 6);
+    
+    /* source might be the same as dest (this shadows the datapath memory) */
+    memmove(m_mac, mac, 6);
     m_type = DEST_TYPE_IPV4;
 }
 
@@ -60,7 +62,9 @@ void
 DestAttr::set_dest(const uint8_t *mac) {
 
     m_ipv4 = 0;
-    memcpy(m_mac, mac, 6);
+    
+    /* source might be the same as dest (this shadows the datapath memory) */
+    memmove(m_mac, mac, 6);
     m_type = DEST_TYPE_MAC;
 }