self.connected = True
- rc = self.invoke_rpc_method('ping', api_class = None)
- if not rc:
- self.connected = False
- return rc
-
return RC_OK()
from collections import namedtuple, OrderedDict
from .trex_stl_packet_builder_scapy import STLPktBuilder
-from .trex_stl_streams import STLStream, STLTXSingleBurst
+from .trex_stl_streams import STLStream
from .trex_stl_types import *
from .trex_stl_rx_features import ARPResolver, PingResolver
from . import trex_stl_stats
for i in range(len(pkts)):
pkts[i]['binary'] = base64.b64decode(pkts[i]['binary'])
-
- return pkts
+ return RC_OK(pkts)
@owned
return rx_pkts
# for each packet - examine it
- for pkt in rx_pkts:
+ for pkt in rx_pkts.data():
rc = self.on_pkt_rx(pkt)
if rc is not None:
return rc
void utl_macaddr_to_str(const uint8_t *macaddr, std::string &output);
std::string utl_macaddr_to_str(const uint8_t *macaddr);
-std::string utl_macaddr_to_str(const uint8_t *macaddr);
bool utl_str_to_macaddr(const std::string &s, uint8_t *mac);
std::string utl_generate_random_str(unsigned int &seed, int len);
bool changed = false;
rte_eth_link_get_nowait(m_port_id, &new_link);
- /* if the link got down - update the dest atribute to move to unresolved */
- if (new_link.link_status != m_link.link_status) {
- get_dest().on_link_down();
- changed = true;
- }
-
- /* other changes */
if (new_link.link_speed != m_link.link_speed ||
new_link.link_duplex != m_link.link_duplex ||
- new_link.link_autoneg != m_link.link_autoneg) {
+ new_link.link_autoneg != m_link.link_autoneg ||
+ new_link.link_status != m_link.link_status) {
changed = true;
+
+ /* in case of link status change - notify the dest object */
+ if (new_link.link_status != m_link.link_status) {
+ get_dest().on_link_down();
+ }
}
-
+
m_link = new_link;
return changed;
}
section["ports"][i]["driver"] = driver;
section["ports"][i]["description"] = description;
- section["ports"][i]["pci_addr"] = pci_addr;
- section["ports"][i]["numa"] = numa;
+ section["ports"][i]["pci_addr"] = pci_addr;
+ section["ports"][i]["numa"] = numa;
uint16_t caps = port->get_rx_caps();
section["ports"][i]["rx"]["caps"] = Json::arrayValue;
} else if (type == "all") {
filter_mode = RX_FILTER_MODE_ALL;
} else {
+ /* can't happen - parsed choice */
assert(0);
}
get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id)->to_json(result["result"]["attr"]);
/* RX info */
- port->get_rx_features().to_json(result["result"]["rx_info"]);
-
+ result["result"]["rx_info"] = port->rx_features_to_json();
+
return (TREX_RPC_CMD_OK);
}
void
TrexStatelessPort::start_rx_capture(const std::string &pcap_filename, uint64_t limit) {
-
- m_rx_features_info.m_rx_capture_info.enable(pcap_filename, limit);
-
- TrexStatelessRxStartCapture *msg = new TrexStatelessRxStartCapture(m_port_id, m_rx_features_info.m_rx_capture_info);
+ static MsgReply<bool> reply;
+
+ reply.reset();
+
+ TrexStatelessRxStartCapture *msg = new TrexStatelessRxStartCapture(m_port_id, pcap_filename, limit, reply);
send_message_to_rx((TrexStatelessCpToRxMsgBase *)msg);
/* as below, must wait for ACK from RX core before returning ACK */
- msg->wait_for_reply();
+ reply.wait_for_reply();
}
void
TrexStatelessPort::stop_rx_capture() {
TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStopCapture(m_port_id);
send_message_to_rx(msg);
-
- /* update our cached data */
- m_rx_features_info.m_rx_capture_info.disable();
}
void
TrexStatelessPort::start_rx_queue(uint64_t size) {
-
- m_rx_features_info.m_rx_queue_info.enable(size);
-
- TrexStatelessRxStartQueue *msg = new TrexStatelessRxStartQueue(m_port_id, m_rx_features_info.m_rx_queue_info);
+ static MsgReply<bool> reply;
+
+ reply.reset();
+
+ TrexStatelessRxStartQueue *msg = new TrexStatelessRxStartQueue(m_port_id, size, reply);
send_message_to_rx( (TrexStatelessCpToRxMsgBase *)msg );
/* we cannot return ACK to the user until the RX core has approved
this might cause the user to lose some packets from the queue
*/
- msg->wait_for_reply();
+ reply.wait_for_reply();
}
void
TrexStatelessPort::stop_rx_queue() {
TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStopQueue(m_port_id);
send_message_to_rx(msg);
-
- /* update our cached data */
- m_rx_features_info.m_rx_queue_info.disable();
}
-RXPacketBuffer *
+const RXPacketBuffer *
TrexStatelessPort::get_rx_queue_pkts() {
+ static MsgReply<const RXPacketBuffer *> reply;
+
+ reply.reset();
- if (m_rx_features_info.m_rx_queue_info.is_empty()) {
- return NULL;
- }
+ TrexStatelessRxQueueGetPkts *msg = new TrexStatelessRxQueueGetPkts(m_port_id, reply);
+ send_message_to_rx( (TrexStatelessCpToRxMsgBase *)msg );
+
+ return reply.wait_for_reply();
+}
+
+Json::Value
+TrexStatelessPort::rx_features_to_json() {
+ static MsgReply<Json::Value> reply;
+
+ reply.reset();
- TrexStatelessRxQueueGetPkts *msg = new TrexStatelessRxQueueGetPkts(m_port_id);
+ TrexStatelessRxFeaturesToJson *msg = new TrexStatelessRxFeaturesToJson(m_port_id, reply);
send_message_to_rx( (TrexStatelessCpToRxMsgBase *)msg );
- RXPacketBuffer *pkt_buffer = msg->wait_for_reply();
- return pkt_buffer;
+ return reply.wait_for_reply();
}
/************* Trex Port Owner **************/
*/
void stop_rx_queue();
-
/**
- * get the RX features info object
+ * fetch the RX queue packets from the queue
*
*/
- const RXFeaturesInfo &get_rx_features() {
- return m_rx_features_info;
- }
+ const RXPacketBuffer *get_rx_queue_pkts();
/**
- * fetch the RX queue packets from the queue
+ * generate a JSON describing the status
+ * of the RX features
*
*/
- RXPacketBuffer *get_rx_queue_pkts();
-
+ Json::Value rx_features_to_json();
+
/**
* return the port attribute object
*
*
*/
void send_message_to_rx(TrexStatelessCpToRxMsgBase *msg);
-
+
/**
* when a port stops, perform various actions
*
int m_pending_async_stop_event;
- RXFeaturesInfo m_rx_features_info;
-
};
bool
TrexStatelessRxStartCapture::handle(CRxCoreStateless *rx_core) {
- rx_core->start_recorder(m_port_id, m_pcap_filename, m_limit, m_shared_counter);
+ rx_core->start_recorder(m_port_id, m_pcap_filename, m_limit);
- set_reply(true);
+ /* mark as done */
+ m_reply.set_reply(true);
return true;
}
bool
TrexStatelessRxStartQueue::handle(CRxCoreStateless *rx_core) {
- rx_core->start_queue(m_port_id, m_size, m_shared_counter);
+ rx_core->start_queue(m_port_id, m_size);
/* mark as done */
- set_reply(true);
+ m_reply.set_reply(true);
return true;
}
-bool TrexStatelessRxQueueGetPkts::handle(CRxCoreStateless *rx_core) {
- RXPacketBuffer *pkt_buffer = rx_core->get_rx_queue_pkts(m_port_id);
- assert(pkt_buffer);
+bool
+TrexStatelessRxQueueGetPkts::handle(CRxCoreStateless *rx_core) {
+ const RXPacketBuffer *pkt_buffer = rx_core->get_rx_queue_pkts(m_port_id);
/* set the reply */
- set_reply(pkt_buffer);
+ m_reply.set_reply(pkt_buffer);
return true;
}
+
+
+bool
+TrexStatelessRxFeaturesToJson::handle(CRxCoreStateless *rx_core) {
+ Json::Value output = rx_core->get_rx_port_mngr(m_port_id).to_json();
+
+ /* set the reply */
+ m_reply.set_reply(output);
+
+ return true;
+}
+
class CFlowGenListPerThread;
class RXPacketBuffer;
+/**
+ * Generic message reply object
+ *
+ * @author imarom (11/27/2016)
+ */
+template<typename T> class MsgReply {
+
+public:
+
+ MsgReply() {
+ reset();
+ }
+
+ void reset() {
+ m_pending = true;
+ }
+
+ bool is_pending() const {
+ return m_pending;
+ }
+
+ void set_reply(const T &reply) {
+ m_reply = reply;
+
+ /* before marking as done make sure all stores are committed */
+ asm volatile("mfence" ::: "memory");
+ m_pending = false;
+ }
+
+ T wait_for_reply(int timeout_ms = 100, int backoff_ms = 1) {
+ int guard = timeout_ms;
+
+ while (is_pending()) {
+ guard -= backoff_ms;
+ if (guard < 0) {
+ throw TrexException("timeout: failed to get reply from core");
+ }
+
+ delay(backoff_ms);
+ }
+
+ return m_reply;
+
+ }
+
+protected:
+ volatile bool m_pending;
+ T m_reply;
+};
+
+
/**
* defines the base class for CP to DP messages
*
};
-/**
- * defines the base class for CP to RX with reply
- *
- * @author imarom (11/27/2016)
- */
-template<typename T> class TrexStatelessCpToRxMsgReply : public TrexStatelessCpToRxMsgBase {
-
-public:
-
- TrexStatelessCpToRxMsgReply() {
- m_pending = true;
- }
-
- bool is_pending() const {
- return m_pending;
- }
-
- void set_reply(const T &reply) {
- m_reply = reply;
-
- /* before marking as done - memory fence */
- asm volatile("mfence" ::: "memory");
- m_pending = false;
- }
-
- T wait_for_reply(int timeout_ms = 100, int backoff_ms = 1) {
- int guard = timeout_ms;
-
- while (is_pending()) {
- guard -= backoff_ms;
- if (guard < 0) {
- throw TrexException("timeout: RX core has failed to reply");
- }
-
- delay(backoff_ms);
- }
-
- return m_reply;
-
- }
-
-protected:
- volatile bool m_pending;
- T m_reply;
-};
-
class TrexStatelessRxEnableLatency : public TrexStatelessCpToRxMsgBase {
bool handle (CRxCoreStateless *rx_core);
-class TrexStatelessRxStartCapture : public TrexStatelessCpToRxMsgReply<bool> {
+class TrexStatelessRxStartCapture : public TrexStatelessCpToRxMsgBase {
public:
- TrexStatelessRxStartCapture(uint8_t port_id, RXCaptureInfo &rx_capture_info) {
+ TrexStatelessRxStartCapture(uint8_t port_id,
+ const std::string &pcap_filename,
+ uint64_t limit,
+ MsgReply<bool> &reply) : m_reply(reply) {
+
m_port_id = port_id;
- m_limit = rx_capture_info.m_limit;
- m_pcap_filename = rx_capture_info.m_pcap_filename;
- m_shared_counter = &rx_capture_info.m_shared_counter;
+ m_limit = limit;
+ m_pcap_filename = pcap_filename;
}
virtual bool handle(CRxCoreStateless *rx_core);
private:
- uint8_t m_port_id;
- std::string m_pcap_filename;
- uint64_t m_limit;
- uint64_t *m_shared_counter;
+ uint8_t m_port_id;
+ std::string m_pcap_filename;
+ uint64_t m_limit;
+ MsgReply<bool> &m_reply;
};
};
-class TrexStatelessRxStartQueue : public TrexStatelessCpToRxMsgReply<bool> {
+class TrexStatelessRxStartQueue : public TrexStatelessCpToRxMsgBase {
public:
- TrexStatelessRxStartQueue(uint8_t port_id, RXQueueInfo &rx_queue_info) {
+ TrexStatelessRxStartQueue(uint8_t port_id,
+ uint64_t size,
+ MsgReply<bool> &reply) : m_reply(reply) {
+
m_port_id = port_id;
- m_size = rx_queue_info.m_size;
- m_shared_counter = &rx_queue_info.m_shared_counter;
+ m_size = size;
}
virtual bool handle(CRxCoreStateless *rx_core);
private:
uint8_t m_port_id;
uint64_t m_size;
- uint64_t *m_shared_counter;
+ MsgReply<bool> &m_reply;
};
-class TrexStatelessRxQueueGetPkts : public TrexStatelessCpToRxMsgReply<RXPacketBuffer *> {
+class TrexStatelessRxQueueGetPkts : public TrexStatelessCpToRxMsgBase {
public:
- TrexStatelessRxQueueGetPkts(uint8_t port_id) {
+ TrexStatelessRxQueueGetPkts(uint8_t port_id, MsgReply<const RXPacketBuffer *> &reply) : m_reply(reply) {
m_port_id = port_id;
}
virtual bool handle(CRxCoreStateless *rx_core);
private:
- uint8_t m_port_id;
+ uint8_t m_port_id;
+ MsgReply<const RXPacketBuffer *> &m_reply;
+
};
+/**
+ * a request from RX core to dump to Json the RX features
+ */
+class TrexStatelessRxFeaturesToJson : public TrexStatelessCpToRxMsgBase {
+public:
+
+ TrexStatelessRxFeaturesToJson(uint8_t port_id, MsgReply<Json::Value> &reply) : m_reply(reply) {
+ m_port_id = port_id;
+ }
+
+ /**
+ * virtual function to handle a message
+ *
+ */
+ virtual bool handle(CRxCoreStateless *rx_core);
+
+private:
+ uint8_t m_port_id;
+ MsgReply<Json::Value> &m_reply;
+};
#endif /* __TREX_STATELESS_MESSAGING_H__ */
void
-CRxCoreStateless::start_recorder(uint8_t port_id, const std::string &pcap_filename, uint64_t limit, uint64_t *shared_counter) {
- m_rx_port_mngr[port_id].start_recorder(pcap_filename, limit, shared_counter);
+CRxCoreStateless::start_recorder(uint8_t port_id, const std::string &pcap_filename, uint64_t limit) {
+ m_rx_port_mngr[port_id].start_recorder(pcap_filename, limit);
}
void
}
void
-CRxCoreStateless::start_queue(uint8_t port_id, uint64_t size, uint64_t *shared_counter) {
- m_rx_port_mngr[port_id].start_queue(size, shared_counter);
+CRxCoreStateless::start_queue(uint8_t port_id, uint64_t size) {
+ m_rx_port_mngr[port_id].start_queue(size);
}
void
}
}
+const RXPortManager &
+CRxCoreStateless::get_rx_port_mngr(uint8_t port_id) {
+ assert(port_id < m_max_ports);
+ return m_rx_port_mngr[port_id];
+
+}
double get_cpu_util();
void update_cpu_util();
- RXPacketBuffer *get_rx_queue_pkts(uint8_t port_id) {
+ const RXPacketBuffer *get_rx_queue_pkts(uint8_t port_id) {
return m_rx_port_mngr[port_id].get_pkt_buffer();
}
/**
- * start capturing of RX packets on a specific port
- *
- * @author imarom (11/2/2016)
- *
- * @param port_id
- * @param pcap_filename
- * @param limit
+ * start capturing of RX packets on a specific port
+ *
*/
- void start_recorder(uint8_t port_id, const std::string &pcap_filename, uint64_t limit, uint64_t *shared_counter);
+ void start_recorder(uint8_t port_id, const std::string &pcap_filename, uint64_t limit);
void stop_recorder(uint8_t port_id);
/**
* start RX queueing of packets
*
*/
- void start_queue(uint8_t port_id, uint64_t size, uint64_t *shared_counter);
+ void start_queue(uint8_t port_id, uint64_t size);
void stop_queue(uint8_t port_id);
/**
void enable_latency();
void disable_latency();
+ const RXPortManager &get_rx_port_mngr(uint8_t port_id);
+
private:
void handle_cp_msg(TrexStatelessCpToRxMsgBase *msg);
bool periodic_check_for_cp_messages();
}
void try_rx_queues();
-
+
private:
TrexMonitor m_monitor;
uint32_t m_max_ports;
RX_FILTER_MODE_ALL
} rx_filter_mode_e;
-/**
- * holds RX capture info
- *
- */
-class RXCaptureInfo {
-public:
- RXCaptureInfo() {
- m_is_active = false;
- m_limit = 0;
- m_shared_counter = 0;
- }
-
- void enable(const std::string &pcap_filename, uint64_t limit) {
- m_pcap_filename = pcap_filename;
- m_limit = limit;
- m_is_active = true;
- }
-
- void disable() {
- m_is_active = false;
- m_pcap_filename = "";
- m_limit = 0;
- }
-
- bool is_empty() const {
- return (m_shared_counter == 0);
- }
-
- void to_json(Json::Value &output) const {
- output["is_active"] = m_is_active;
- if (m_is_active) {
- output["pcap_filename"] = m_pcap_filename;
- output["limit"] = Json::UInt64(m_limit);
- output["count"] = Json::UInt64(m_shared_counter);
- }
- }
-
-public:
- bool m_is_active;
- std::string m_pcap_filename;
- uint64_t m_limit;
- uint64_t m_shared_counter;
-};
-
-
-class RXQueueInfo {
-public:
-
- RXQueueInfo() {
- m_is_active = false;
- m_shared_counter = 0;
- }
-
- void enable(uint64_t size) {
- m_size = size;
- m_is_active = true;
- }
-
- void disable() {
- m_is_active = false;
- m_size = 0;
- }
-
- bool is_empty() const {
- return (m_shared_counter == 0);
- }
-
- void to_json(Json::Value &output) const {
- output["is_active"] = m_is_active;
- if (m_is_active) {
- output["size"] = Json::UInt64(m_size);
- output["count"] = Json::UInt64(m_shared_counter);
- }
- }
-
-public:
- bool m_is_active;
- uint64_t m_size;
- uint64_t m_shared_counter;
-};
-
-
-/**
- * holds all the RX features info
- *
- * @author imarom (11/7/2016)
- */
-class RXFeaturesInfo {
-public:
- RXCaptureInfo m_rx_capture_info;
- RXQueueInfo m_rx_queue_info;
-
- void to_json(Json::Value &msg) const {
- m_rx_capture_info.to_json(msg["sniffer"]);
- m_rx_queue_info.to_json(msg["queue"]);
- }
-};
-
#endif /* __TREX_STATELESS_RX_DEFS_H__ */
}
}
+Json::Value
+RXLatency::to_json() const {
+ return Json::objectValue;
+}
+
/**************************************
* RX feature queue
*
*************************************/
-RXPacketBuffer::RXPacketBuffer(uint64_t size, uint64_t *shared_counter) {
+RXPacketBuffer::RXPacketBuffer(uint64_t size) {
m_buffer = nullptr;
m_head = 0;
m_tail = 0;
m_size = (size + 1); // for the empty/full difference 1 slot reserved
- m_shared_counter = shared_counter;
-
- /* reset the counter */
- *m_shared_counter = 0;
/* generate queue */
m_buffer = new RXPacket*[m_size](); // zeroed
-
- m_is_enabled = true;
}
RXPacketBuffer::~RXPacketBuffer() {
delete [] m_buffer;
}
-RXPacketBuffer *
-RXPacketBuffer::freeze_and_clone() {
- /* create a new one - same size and shared counter 0 */
- RXPacketBuffer *new_buffer = new RXPacketBuffer(m_size, m_shared_counter);
-
- /* freeze the current */
- m_shared_counter = NULL;
- m_is_enabled = false;
-
- return new_buffer;
-}
-
void
RXPacketBuffer::push(const rte_mbuf_t *m) {
- assert(m_is_enabled);
-
/* if full - pop the oldest */
if (is_full()) {
delete pop();
/* push packet */
m_buffer[m_head] = new RXPacket(m);
m_head = next(m_head);
-
- /* update the shared counter - control plane memory */
- (*m_shared_counter)++;
}
RXPacket *
RXPacketBuffer::pop() {
- assert(m_is_enabled);
assert(!is_empty());
RXPacket *pkt = m_buffer[m_tail];
m_tail = next(m_tail);
- (*m_shared_counter)--;
+
return pkt;
}
+uint64_t
+RXPacketBuffer::get_element_count() const {
+ if (m_head >= m_tail) {
+ return (m_head - m_tail);
+ } else {
+ return ( get_capacity() - (m_tail - m_head - 1) );
+ }
+}
+
Json::Value
RXPacketBuffer::to_json() const {
void
-RXQueue::start(uint64_t size, uint64_t *shared_counter) {
+RXQueue::start(uint64_t size) {
if (m_pkt_buffer) {
delete m_pkt_buffer;
}
- m_pkt_buffer = new RXPacketBuffer(size, shared_counter);
+ m_pkt_buffer = new RXPacketBuffer(size);
}
void
}
}
-RXPacketBuffer *
+const RXPacketBuffer *
RXQueue::fetch() {
- if (!m_pkt_buffer) {
+ /* if no buffer or the buffer is empty - give a NULL one */
+ if ( (!m_pkt_buffer) || (m_pkt_buffer->get_element_count() == 0) ) {
return nullptr;
}
RXPacketBuffer *old_buffer = m_pkt_buffer;
/* replace the old one with a new one and freeze the old */
- m_pkt_buffer = old_buffer->freeze_and_clone();
+ m_pkt_buffer = new RXPacketBuffer(old_buffer->get_capacity());
return old_buffer;
}
m_pkt_buffer->push(m);
}
+Json::Value
+RXQueue::to_json() const {
+ assert(m_pkt_buffer != NULL);
+
+ Json::Value output = Json::objectValue;
+
+ output["size"] = Json::UInt64(m_pkt_buffer->get_capacity());
+ output["count"] = Json::UInt64(m_pkt_buffer->get_element_count());
+
+ return output;
+}
+
/**************************************
* RX feature recorder
*
RXPacketRecorder::RXPacketRecorder() {
m_writer = NULL;
- m_shared_counter = NULL;
+ m_count = 0;
m_limit = 0;
m_epoch = -1;
+
+ m_pending_flush = false;
}
void
-RXPacketRecorder::start(const std::string &pcap, uint64_t limit, uint64_t *shared_counter) {
+RXPacketRecorder::start(const std::string &pcap, uint64_t limit) {
m_writer = CCapWriterFactory::CreateWriter(LIBPCAP, (char *)pcap.c_str());
if (m_writer == NULL) {
std::stringstream ss;
}
assert(limit > 0);
+
m_limit = limit;
- m_shared_counter = shared_counter;
- (*m_shared_counter) = 0;
+ m_count = 0;
+ m_pending_flush = false;
+ m_pcap_filename = pcap;
}
void
RXPacketRecorder::stop() {
- if (m_writer) {
- delete m_writer;
- m_writer = NULL;
+ if (!m_writer) {
+ return;
}
+
+ delete m_writer;
+ m_writer = NULL;
}
void
RXPacketRecorder::flush_to_disk() {
- if (m_writer) {
+
+ if (m_writer && m_pending_flush) {
m_writer->flush_to_disk();
+ m_pending_flush = false;
}
}
memcpy(m_pkt.raw, p, m->pkt_len);
m_writer->write_packet(&m_pkt);
-
- m_limit--;
- (*m_shared_counter)++;
-
- if (m_limit == 0) {
+ m_count++;
+ m_pending_flush = true;
+
+ if (m_count == m_limit) {
stop();
}
+
}
+Json::Value
+RXPacketRecorder::to_json() const {
+ Json::Value output = Json::objectValue;
+
+ output["pcap_filename"] = m_pcap_filename;
+ output["limit"] = Json::UInt64(m_limit);
+ output["count"] = Json::UInt64(m_count);
+
+ return output;
+}
/**************************************
* Port manager
m_recorder.flush_to_disk();
}
}
+
+Json::Value
+RXPortManager::to_json() const {
+ Json::Value output = Json::objectValue;
+
+ if (is_feature_set(LATENCY)) {
+ output["latency"] = m_latency.to_json();
+ output["latency"]["is_active"] = true;
+ } else {
+ output["latency"]["is_active"] = false;
+ }
+
+ if (is_feature_set(RECORDER)) {
+ output["sniffer"] = m_recorder.to_json();
+ output["sniffer"]["is_active"] = true;
+ } else {
+ output["sniffer"]["is_active"] = false;
+ }
+
+ if (is_feature_set(QUEUE)) {
+ output["queue"] = m_queue.to_json();
+ output["queue"]["is_active"] = true;
+ } else {
+ output["queue"]["is_active"] = false;
+ }
+
+ return output;
+}
void handle_pkt(const rte_mbuf_t *m);
+ Json::Value to_json() const;
+
private:
bool is_flow_stat_id(uint32_t id) {
if ((id & 0x000fff00) == IP_ID_RESERVE_BASE) return true;
class RXPacketBuffer {
public:
- RXPacketBuffer(uint64_t size, uint64_t *shared_counter);
+ RXPacketBuffer(uint64_t size);
~RXPacketBuffer();
/**
*/
void push(const rte_mbuf_t *m);
- /**
- * freezes the queue and clones a new one
- *
- */
- RXPacketBuffer * freeze_and_clone();
-
/**
* generate a JSON output of the queue
*
return ( next(m_head) == m_tail);
}
+ /**
+ * return the total amount of space possible
+ */
+ uint64_t get_capacity() const {
+ /* one slot is used for diff between full/empty */
+ return (m_size - 1);
+ }
+
+ /**
+ * returns how many elements are in the queue
+ */
+ uint64_t get_element_count() const;
+
private:
int next(int v) const {
return ( (v + 1) % m_size );
int m_tail;
int m_size;
RXPacket **m_buffer;
- uint64_t *m_shared_counter;
- bool m_is_enabled;
};
* start RX queue
*
*/
- void start(uint64_t size, uint64_t *shared_counter);
+ void start(uint64_t size);
/**
* fetch the current buffer
- *
+ * return NULL if no packets
*/
- RXPacketBuffer * fetch();
+ const RXPacketBuffer * fetch();
/**
* stop RX queue
void handle_pkt(const rte_mbuf_t *m);
+ Json::Value to_json() const;
+
private:
RXPacketBuffer *m_pkt_buffer;
};
stop();
}
- void start(const std::string &pcap, uint64_t limit, uint64_t *shared_counter);
+ void start(const std::string &pcap, uint64_t limit);
void stop();
void handle_pkt(const rte_mbuf_t *m);
*/
void flush_to_disk();
+ Json::Value to_json() const;
+
private:
CFileWriterBase *m_writer;
+ std::string m_pcap_filename;
CCapPktRaw m_pkt;
dsec_t m_epoch;
uint64_t m_limit;
- uint64_t *m_shared_counter;
+ uint64_t m_count;
+ bool m_pending_flush;
};
}
/* recorder */
- void start_recorder(const std::string &pcap, uint64_t limit_pkts, uint64_t *shared_counter) {
- m_recorder.start(pcap, limit_pkts, shared_counter);
+ void start_recorder(const std::string &pcap, uint64_t limit_pkts) {
+ m_recorder.start(pcap, limit_pkts);
set_feature(RECORDER);
}
}
/* queue */
- void start_queue(uint32_t size, uint64_t *shared_counter) {
- m_queue.start(size, shared_counter);
+ void start_queue(uint32_t size) {
+ m_queue.start(size);
set_feature(QUEUE);
}
unset_feature(QUEUE);
}
- RXPacketBuffer *get_pkt_buffer() {
+ const RXPacketBuffer *get_pkt_buffer() {
if (!is_feature_set(QUEUE)) {
return nullptr;
}
return (!has_features_set());
}
+ /**
+ * write the status to a JSON format
+ */
+ Json::Value to_json() const;
+
private:
void clear_all_features() {
assert(ipv4 != 0);
m_ipv4 = ipv4;
- memcpy(m_mac, mac, 6);
+
+ /* source might be the same as dest (this shadows the datapath memory) */
+ memmove(m_mac, mac, 6);
m_type = DEST_TYPE_IPV4;
}
DestAttr::set_dest(const uint8_t *mac) {
m_ipv4 = 0;
- memcpy(m_mac, mac, 6);
+
+ /* source might be the same as dest (this shadows the datapath memory) */
+ memmove(m_mac, mac, 6);
m_type = DEST_TYPE_MAC;
}