RX features - self code review
authorimarom <[email protected]>
Mon, 28 Nov 2016 09:25:32 +0000 (11:25 +0200)
committerimarom <[email protected]>
Mon, 28 Nov 2016 09:25:32 +0000 (11:25 +0200)
Signed-off-by: imarom <[email protected]>
12 files changed:
src/common/captureFile.h
src/common/erf.cpp
src/common/erf.h
src/common/pcap.cpp
src/common/pcap.h
src/stateless/cp/trex_stateless_port.cpp
src/stateless/messaging/trex_stateless_messaging.cpp
src/stateless/messaging/trex_stateless_messaging.h
src/stateless/rx/trex_stateless_rx_core.cpp
src/stateless/rx/trex_stateless_rx_core.h
src/stateless/rx/trex_stateless_rx_port_mngr.cpp
src/stateless/rx/trex_stateless_rx_port_mngr.h

index fefa62b..d87e57b 100755 (executable)
@@ -227,9 +227,10 @@ class CFileWriterBase {
 
 public:
 
-       virtual ~CFileWriterBase(){};
-       virtual bool Create(char * name) = 0;
+    virtual ~CFileWriterBase(){};
+    virtual bool Create(char * name) = 0;
     virtual bool write_packet(CCapPktRaw * lpPacket)=0;
+    virtual void flush_to_disk() = 0;
 
 };
 
index 76945b0..f872a28 100755 (executable)
@@ -280,7 +280,11 @@ bool CErfFileWriter::write_packet(CCapPktRaw * lpPacket){
        return true;
 }
 
-
+void CErfFileWriter::flush_to_disk() {
+    if (m_fd) {
+        fflush(m_fd);
+    }
+}
 
 bool CPcapFileWriter::Create(char *file_name){
     m_fd=CAP_FOPEN_64(file_name,"wb");
index e1b83e4..bec9487 100755 (executable)
@@ -224,6 +224,13 @@ public:
     virtual bool Create(char *file_name);
     void Delete();
     virtual bool write_packet(CCapPktRaw * lpPacket);
+    
+    /**
+     * flush all packets to disk
+     * 
+     */
+    void flush_to_disk();
+    
 private:
     FILE *m_fd;
     int m_cnt;
index 8e9bf0a..f5eb3c4 100755 (executable)
@@ -228,6 +228,12 @@ bool LibPCapWriter::Create(char * name)
     return init();
 }
 
+void LibPCapWriter::flush_to_disk() {
+    if (m_is_open) {
+        fflush(m_file_handler);
+    }
+}
+
 /**
  * 
  * Write the libpcap header.
index c9139e4..265ea17 100755 (executable)
@@ -143,6 +143,13 @@ public:
        */
        void Close();
 
+    /**
+     * flush all packets to disk
+     * 
+     * @author imarom (11/24/2016)
+     */
+    void flush_to_disk();
+    
 private:
 
        bool init();
index 75530ea..ca18575 100644 (file)
@@ -939,14 +939,19 @@ TrexStatelessPort::start_rx_capture(const std::string &pcap_filename, uint64_t l
 
     m_rx_features_info.m_rx_capture_info.enable(pcap_filename, limit);
 
-    TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStartCapture(m_port_id, m_rx_features_info.m_rx_capture_info);
-    send_message_to_rx(msg);
+    TrexStatelessRxStartCapture *msg = new TrexStatelessRxStartCapture(m_port_id, m_rx_features_info.m_rx_capture_info);
+    send_message_to_rx((TrexStatelessCpToRxMsgBase *)msg);
+    
+    /* as below, must wait for ACK from RX core before returning ACK */
+    msg->wait_for_reply();
 }
 
 void
 TrexStatelessPort::stop_rx_capture() {
     TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStopCapture(m_port_id);
     send_message_to_rx(msg);
+    
+    /* update our cached data */
     m_rx_features_info.m_rx_capture_info.disable();
 }
 
@@ -955,14 +960,21 @@ TrexStatelessPort::start_rx_queue(uint64_t size) {
 
     m_rx_features_info.m_rx_queue_info.enable(size);
 
-    TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStartQueue(m_port_id, m_rx_features_info.m_rx_queue_info);
-    send_message_to_rx(msg);
+    TrexStatelessRxStartQueue *msg = new TrexStatelessRxStartQueue(m_port_id, m_rx_features_info.m_rx_queue_info);
+    send_message_to_rx( (TrexStatelessCpToRxMsgBase *)msg );
+    
+    /* we cannot return ACK to the user until the RX core has approved
+       this might cause the user to lose some packets from the queue
+     */
+    msg->wait_for_reply();
 }
 
 void
 TrexStatelessPort::stop_rx_queue() {
     TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStopQueue(m_port_id);
     send_message_to_rx(msg);
+    
+    /* update our cached data */
     m_rx_features_info.m_rx_queue_info.disable();
 }
 
@@ -974,13 +986,10 @@ TrexStatelessPort::get_rx_queue_pkts() {
         return NULL;
     }
 
-    /* ask RX core for the pkt queue */
-    TrexStatelessMsgReply<RXPacketBuffer *> msg_reply;
-
-    TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxQueueGetPkts(m_port_id, msg_reply);
-    send_message_to_rx(msg);
+    TrexStatelessRxQueueGetPkts *msg = new TrexStatelessRxQueueGetPkts(m_port_id);
+    send_message_to_rx( (TrexStatelessCpToRxMsgBase *)msg );
 
-    RXPacketBuffer *pkt_buffer = msg_reply.wait_for_reply();
+    RXPacketBuffer *pkt_buffer = msg->wait_for_reply();
     return pkt_buffer;
 }
 
index a8fb7ba..cad4fe7 100644 (file)
@@ -259,14 +259,16 @@ bool TrexStatelessRxQuit::handle (CRxCoreStateless *rx_core) {
 
 bool
 TrexStatelessRxStartCapture::handle(CRxCoreStateless *rx_core) {
-    rx_core->start_capture(m_port_id, m_pcap_filename, m_limit, m_shared_counter);
+    rx_core->start_recorder(m_port_id, m_pcap_filename, m_limit, m_shared_counter);
 
+    set_reply(true);
+    
     return true;
 }
 
 bool
 TrexStatelessRxStopCapture::handle(CRxCoreStateless *rx_core) {
-    rx_core->stop_capture(m_port_id);
+    rx_core->stop_recorder(m_port_id);
 
     return true;
 }
@@ -274,7 +276,10 @@ TrexStatelessRxStopCapture::handle(CRxCoreStateless *rx_core) {
 bool
 TrexStatelessRxStartQueue::handle(CRxCoreStateless *rx_core) {
     rx_core->start_queue(m_port_id, m_size, m_shared_counter);
-
+    
+    /* mark as done */
+    set_reply(true);
+    
     return true;
 }
 
@@ -290,7 +295,9 @@ TrexStatelessRxStopQueue::handle(CRxCoreStateless *rx_core) {
 bool TrexStatelessRxQueueGetPkts::handle(CRxCoreStateless *rx_core) {
     RXPacketBuffer *pkt_buffer = rx_core->get_rx_queue_pkts(m_port_id);
     assert(pkt_buffer);
-    m_reply.set(pkt_buffer);
+    
+    /* set the reply */
+    set_reply(pkt_buffer);
 
     return true;
 }
index ed2ec90..b1d9117 100644 (file)
@@ -408,6 +408,53 @@ public:
 
 };
 
+/**
+ * defines the base class for CP to RX with reply
+ * 
+ * @author imarom (11/27/2016)
+ */
+template<typename T> class TrexStatelessCpToRxMsgReply : public TrexStatelessCpToRxMsgBase {
+
+public:
+    
+    TrexStatelessCpToRxMsgReply() {
+        m_pending = true;
+    }
+
+    bool is_pending() const {
+        return m_pending;
+    }
+
+    void set_reply(const T &reply) {
+        m_reply = reply;
+
+        /* before marking as done - memory fence */
+        asm volatile("mfence" ::: "memory");
+        m_pending = false;
+    }
+
+    T wait_for_reply(int timeout_ms = 100, int backoff_ms = 1) {
+        int guard = timeout_ms;
+
+        while (is_pending()) {
+            guard -= backoff_ms;
+            if (guard < 0) {
+                throw TrexException("timeout: RX core has failed to reply");
+            }
+
+            delay(backoff_ms);
+        }
+        
+        return m_reply;
+
+    }
+    
+protected:
+    volatile bool  m_pending;
+    T              m_reply;
+};
+
+
 class TrexStatelessRxEnableLatency : public TrexStatelessCpToRxMsgBase {
     bool handle (CRxCoreStateless *rx_core);
 };
@@ -421,7 +468,8 @@ class TrexStatelessRxQuit : public TrexStatelessCpToRxMsgBase {
 };
 
 
-class TrexStatelessRxStartCapture : public TrexStatelessCpToRxMsgBase {
+
+class TrexStatelessRxStartCapture : public TrexStatelessCpToRxMsgReply<bool> {
 public:
     TrexStatelessRxStartCapture(uint8_t port_id, RXCaptureInfo &rx_capture_info) {
         m_port_id          = port_id;
@@ -452,7 +500,8 @@ private:
     uint8_t           m_port_id;
 };
 
-class TrexStatelessRxStartQueue : public TrexStatelessCpToRxMsgBase {
+
+class TrexStatelessRxStartQueue : public TrexStatelessCpToRxMsgReply<bool> {
 public:
     TrexStatelessRxStartQueue(uint8_t port_id, RXQueueInfo &rx_queue_info) {
         m_port_id           = port_id;
@@ -468,6 +517,7 @@ private:
     uint64_t          *m_shared_counter;
 };
 
+
 class TrexStatelessRxStopQueue : public TrexStatelessCpToRxMsgBase {
 public:
     TrexStatelessRxStopQueue(uint8_t port_id) {
@@ -481,50 +531,11 @@ private:
 };
 
 
-template<typename T> class TrexStatelessMsgReply {
-public:
-    TrexStatelessMsgReply() {
-        m_pending = true;
-    }
-
-    bool is_pending() const {
-        return m_pending;
-    }
-
-    void set(T reply) {
-        m_reply = reply;
-
-        /* before marking as done - memory fence */
-        asm volatile("mfence" ::: "memory");
-        m_pending = false;
-    }
-
-    T wait_for_reply(int timeout_ms = 100, int backoff_ms = 1) {
-        int guard = timeout_ms;
-
-        while (is_pending()) {
-            guard -= backoff_ms;
-            if (guard < 0) {
-                throw TrexException("timeout: RX core has failed to reply");
-            }
-
-            delay(backoff_ms);
-            
-        }
-        return m_reply;
-
-    }
-private:
-    bool  m_pending;
-    T     m_reply;
-};
-
-
 
-class TrexStatelessRxQueueGetPkts : public TrexStatelessCpToRxMsgBase {
+class TrexStatelessRxQueueGetPkts : public TrexStatelessCpToRxMsgReply<RXPacketBuffer *> {
 public:
 
-    TrexStatelessRxQueueGetPkts(uint8_t port_id, TrexStatelessMsgReply<RXPacketBuffer *> &reply) : m_reply(reply) {
+    TrexStatelessRxQueueGetPkts(uint8_t port_id) {
         m_port_id = port_id;
     }
 
@@ -535,8 +546,7 @@ public:
     virtual bool handle(CRxCoreStateless *rx_core);
 
 private:
-    uint8_t                                    m_port_id;
-    TrexStatelessMsgReply<RXPacketBuffer*>    &m_reply;
+    uint8_t  m_port_id;
 };
 
 
index 2a67836..a1ff9c6 100644 (file)
@@ -82,7 +82,7 @@ void CRxCoreStateless::create(const CRxSlCfg &cfg) {
 
     m_cpu_cp_u.Create(&m_cpu_dp_u);
 
-    /* init per port manager */
+    /* create per port manager */
     for (int i = 0; i < m_max_ports; i++) {
         m_rx_port_mngr[i].create(cfg.m_ports[i],
                                  m_rfc2544,
@@ -171,9 +171,20 @@ void CRxCoreStateless::idle_state_loop() {
     }
 }
 
+/**
+ * for each port give a tick (for flushing if needed)
+ * 
+ */
+void CRxCoreStateless::port_manager_tick() {
+    for (int i = 0; i < m_max_ports; i++) {
+        m_rx_port_mngr[i].tick();
+    }
+}
+
 void CRxCoreStateless::handle_work_stage(bool do_try_rx_queue) {
     int i = 0;
-
+    int j = 0;
+    
     while (m_state == STATE_WORKING) {
 
         if (do_try_rx_queue) {
@@ -182,12 +193,19 @@ void CRxCoreStateless::handle_work_stage(bool do_try_rx_queue) {
 
         process_all_pending_pkts();
 
+        /* TODO: with scheduler, this should be solved better */
         i++;
         if (i == 100000) { // approx 10msec
             i = 0;
             periodic_check_for_cp_messages(); // m_state might change in here
+            
+            j++;
+            if (j == 100) { // approx 1 sec
+                j = 0;
+                port_manager_tick();
+            }
         }
-
+        
         rte_pause();
     }
 }
@@ -356,13 +374,13 @@ double CRxCoreStateless::get_cpu_util() {
 
 
 void
-CRxCoreStateless::start_capture(uint8_t port_id, const std::string &pcap_filename, uint64_t limit, uint64_t *shared_counter) {
-    m_rx_port_mngr[port_id].start_capture(pcap_filename, limit, shared_counter);
+CRxCoreStateless::start_recorder(uint8_t port_id, const std::string &pcap_filename, uint64_t limit, uint64_t *shared_counter) {
+    m_rx_port_mngr[port_id].start_recorder(pcap_filename, limit, shared_counter);
 }
 
 void
-CRxCoreStateless::stop_capture(uint8_t port_id) {
-    m_rx_port_mngr[port_id].stop_capture();
+CRxCoreStateless::stop_recorder(uint8_t port_id) {
+    m_rx_port_mngr[port_id].stop_recorder();
 }
 
 void
index 9df3631..8e50a46 100644 (file)
@@ -124,8 +124,8 @@ class CRxCoreStateless {
      * @param pcap_filename 
      * @param limit 
      */
-    void start_capture(uint8_t port_id, const std::string &pcap_filename, uint64_t limit, uint64_t *shared_counter);
-    void stop_capture(uint8_t port_id);
+    void start_recorder(uint8_t port_id, const std::string &pcap_filename, uint64_t limit, uint64_t *shared_counter);
+    void stop_recorder(uint8_t port_id);
 
     /**
      * start RX queueing of packets
@@ -153,7 +153,8 @@ class CRxCoreStateless {
     void capture_pkt(rte_mbuf_t *m);
     void handle_rx_queue_msgs(uint8_t thread_id, CNodeRing * r);
     void handle_work_stage(bool do_try_rx_queue);
-
+    void port_manager_tick();
+    
     int process_all_pending_pkts(bool flush_rx = false);
 
     void flush_all_pending_pkts() {
index 46fec43..78f4ac5 100644 (file)
@@ -200,7 +200,7 @@ RXPacketBuffer::freeze_and_clone() {
 }
 
 void 
-RXPacketBuffer::handle_pkt(const rte_mbuf_t *m) {
+RXPacketBuffer::push(const rte_mbuf_t *m) {
     assert(m_is_enabled);
 
     /* if full - pop the oldest */
@@ -242,7 +242,48 @@ RXPacketBuffer::to_json() const {
     return output;
 }
 
-/****************************** packet recorder ****************************/
+
+void
+RXQueue::start(uint64_t size, uint64_t *shared_counter) {
+    if (m_pkt_buffer) {
+        delete m_pkt_buffer;
+    }
+    m_pkt_buffer = new RXPacketBuffer(size, shared_counter);
+}
+
+void
+RXQueue::stop() {
+    if (m_pkt_buffer) {
+        delete m_pkt_buffer;
+        m_pkt_buffer = NULL;
+    }
+}
+
+RXPacketBuffer *
+RXQueue::fetch() {
+
+    if (!m_pkt_buffer) {
+        return nullptr;
+    }
+    
+    /* hold a pointer to the old one */
+    RXPacketBuffer *old_buffer = m_pkt_buffer;
+
+    /* replace the old one with a new one and freeze the old */
+    m_pkt_buffer = old_buffer->freeze_and_clone();
+
+    return old_buffer;
+}
+
+void
+RXQueue::handle_pkt(const rte_mbuf_t *m) {
+    m_pkt_buffer->push(m);
+}
+
+/**************************************
+ * RX feature recorder
+ * 
+ *************************************/
 
 RXPacketRecorder::RXPacketRecorder() {
     m_writer = NULL;
@@ -251,10 +292,6 @@ RXPacketRecorder::RXPacketRecorder() {
     m_epoch  = -1;
 }
 
-RXPacketRecorder::~RXPacketRecorder() {
-    stop();
-}
-
 void
 RXPacketRecorder::start(const std::string &pcap, uint64_t limit, uint64_t *shared_counter) {
     m_writer = CCapWriterFactory::CreateWriter(LIBPCAP, (char *)pcap.c_str());
@@ -278,6 +315,13 @@ RXPacketRecorder::stop() {
     }
 }
 
+void
+RXPacketRecorder::flush_to_disk() {
+    if (m_writer) {
+        m_writer->flush_to_disk();
+    }
+}
+
 void
 RXPacketRecorder::handle_pkt(const rte_mbuf_t *m) {
     if (!m_writer) {
@@ -310,6 +354,30 @@ RXPacketRecorder::handle_pkt(const rte_mbuf_t *m) {
 }
 
 
+/**************************************
+ * Port manager 
+ * 
+ *************************************/
+
+RXPortManager::RXPortManager() {
+    clear_all_features();
+    m_io          = NULL;
+    m_cpu_dp_u    = NULL;
+}
+
+
+void
+RXPortManager::create(CPortLatencyHWBase *io,
+                      CRFC2544Info *rfc2544,
+                      CRxCoreErrCntrs *err_cntrs,
+                      CCpuUtlDp *cpu_util) {
+    m_io = io;
+    m_cpu_dp_u = cpu_util;
+    
+    /* init features */
+    m_latency.create(rfc2544, err_cntrs);
+}
+    
 void RXPortManager::handle_pkt(const rte_mbuf_t *m) {
 
     /* handle features */
@@ -318,12 +386,12 @@ void RXPortManager::handle_pkt(const rte_mbuf_t *m) {
         m_latency.handle_pkt(m);
     }
 
-    if (is_feature_set(CAPTURE)) {
+    if (is_feature_set(RECORDER)) {
         m_recorder.handle_pkt(m);
     }
 
     if (is_feature_set(QUEUE)) {
-        m_pkt_buffer->handle_pkt(m);
+        m_queue.handle_pkt(m);
     }
 }
 
@@ -358,3 +426,9 @@ int RXPortManager::process_all_pending_pkts(bool flush_rx) {
     return cnt_p;
 }
 
+void
+RXPortManager::tick() {
+    if (is_feature_set(RECORDER)) {
+        m_recorder.flush_to_disk();
+    }
+}
index 9df4203..564b15d 100644 (file)
@@ -127,10 +127,10 @@ public:
     ~RXPacketBuffer();
 
     /**
-     * handle a new packet
+     * push a packet to the buffer
      * 
      */
-    void handle_pkt(const rte_mbuf_t *m);
+    void push(const rte_mbuf_t *m);
     
     /**
      * freezes the queue and clones a new one
@@ -170,6 +170,40 @@ private:
 };
 
 
+class RXQueue {
+public:
+    RXQueue() {
+        m_pkt_buffer = nullptr;
+    }
+    ~RXQueue() {
+        stop();
+    }
+    
+    /**
+     * start RX queue
+     * 
+     */
+    void start(uint64_t size, uint64_t *shared_counter);
+    
+    /**
+     * fetch the current buffer
+     * 
+     */
+    RXPacketBuffer * fetch();
+    
+    /**
+     * stop RX queue
+     * 
+     */
+    void stop();
+    
+    void handle_pkt(const rte_mbuf_t *m);
+    
+private:
+    RXPacketBuffer  *m_pkt_buffer;
+};
+
 /**************************************
  * RX feature PCAP recorder 
  * 
@@ -178,12 +212,21 @@ private:
 class RXPacketRecorder {
 public:
     RXPacketRecorder();
-    ~RXPacketRecorder();
+    
+    ~RXPacketRecorder() {
+        stop();
+    }
     
     void start(const std::string &pcap, uint64_t limit, uint64_t *shared_counter);
     void stop();
     void handle_pkt(const rte_mbuf_t *m);
 
+    /**
+     * flush any cached packets to disk
+     * 
+     */
+    void flush_to_disk();
+    
 private:
     CFileWriterBase  *m_writer;
     CCapPktRaw        m_pkt;
@@ -202,27 +245,19 @@ private:
  */
 class RXPortManager {
 public:
-    enum features_t {
-        LATENCY  = 0x1,
-        CAPTURE  = 0x2,
-        QUEUE    = 0x4
+    enum feature_t {
+        NO_FEATURES  = 0x0,
+        LATENCY      = 0x1,
+        RECORDER     = 0x2,
+        QUEUE        = 0x4
     };
 
-    RXPortManager() {
-        m_features    = 0;
-        m_pkt_buffer  = NULL;
-        m_io          = NULL;
-        m_cpu_dp_u    = NULL;
-    }
+    RXPortManager();
 
     void create(CPortLatencyHWBase *io,
                 CRFC2544Info *rfc2544,
                 CRxCoreErrCntrs *err_cntrs,
-                CCpuUtlDp *cpu_util) {
-        m_io = io;
-        m_cpu_dp_u = cpu_util;
-        m_latency.create(rfc2544, err_cntrs);
-    }
+                CCpuUtlDp *cpu_util);
 
     void clear_stats() {
         m_latency.reset_stats();
@@ -232,6 +267,7 @@ public:
         return m_latency;
     }
 
+    /* latency */
     void enable_latency() {
         set_feature(LATENCY);
     }
@@ -240,60 +276,37 @@ public:
         unset_feature(LATENCY);
     }
 
-    /**
-     * capturing of RX packets
-     * 
-     * @author imarom (11/2/2016)
-     * 
-     * @param pcap 
-     * @param limit_pkts 
-     */
-    void start_capture(const std::string &pcap, uint64_t limit_pkts, uint64_t *shared_counter) {
+    /* recorder */
+    void start_recorder(const std::string &pcap, uint64_t limit_pkts, uint64_t *shared_counter) {
         m_recorder.start(pcap, limit_pkts, shared_counter);
-        set_feature(CAPTURE);
+        set_feature(RECORDER);
     }
 
-    void stop_capture() {
+    void stop_recorder() {
         m_recorder.stop();
-        unset_feature(CAPTURE);
+        unset_feature(RECORDER);
     }
 
-    /**
-     * queueing packets
-     * 
-     */
+    /* queue */
     void start_queue(uint32_t size, uint64_t *shared_counter) {
-        if (m_pkt_buffer) {
-            delete m_pkt_buffer;
-        }
-        m_pkt_buffer = new RXPacketBuffer(size, shared_counter);
+        m_queue.start(size, shared_counter);
         set_feature(QUEUE);
     }
 
     void stop_queue() {
-        if (m_pkt_buffer) {
-            delete m_pkt_buffer;
-            m_pkt_buffer = NULL;
-        }
-        unset_feature(QUEUE);
+        m_queue.stop();
+        unset_feature(QUEUE); 
     }
 
     RXPacketBuffer *get_pkt_buffer() {
         if (!is_feature_set(QUEUE)) {
-            return NULL;
+            return nullptr;
         }
-
-        assert(m_pkt_buffer);
-
-        /* hold a pointer to the old one */
-        RXPacketBuffer *old_buffer = m_pkt_buffer;
-
-        /* replace the old one with a new one and freeze the old */
-        m_pkt_buffer = old_buffer->freeze_and_clone();
-
-        return old_buffer;
+        
+        return m_queue.fetch();
     }
 
+    
 
     /**
      * fetch and process all packets
@@ -317,9 +330,15 @@ public:
      */
     void handle_pkt(const rte_mbuf_t *m);
 
-
+    /**
+     * maintenance
+     * 
+     * @author imarom (11/24/2016)
+     */
+    void tick();
+    
     bool has_features_set() {
-        return (m_features != 0);
+        return (m_features != NO_FEATURES);
     }
 
 
@@ -329,23 +348,29 @@ public:
 
 private:
     
+    void clear_all_features() {
+        m_features = NO_FEATURES;
+    }
 
-    void set_feature(features_t feature) {
+    void set_feature(feature_t feature) {
         m_features |= feature;
     }
 
-    void unset_feature(features_t feature) {
+    void unset_feature(feature_t feature) {
         m_features &= (~feature);
     }
 
-    bool is_feature_set(features_t feature) {
+    bool is_feature_set(feature_t feature) const {
         return ( (m_features & feature) == feature );
     }
 
     uint32_t                     m_features;
-    RXPacketRecorder             m_recorder;
+    
     RXLatency                    m_latency;
-    RXPacketBuffer              *m_pkt_buffer;
+    RXPacketRecorder             m_recorder;
+    RXQueue                      m_queue;
+    
+    
     CCpuUtlDp                   *m_cpu_dp_u;
     CPortLatencyHWBase          *m_io;
 };