#include "stateless/cp/trex_stateless.h"
#include "stateless/dp/trex_stream_node.h"
#include "stateless/messaging/trex_stateless_messaging.h"
+#include "stateless/rx/trex_stateless_rx_core.h"
#include "publisher/trex_publisher.h"
#include "../linux_dpdk/version.h"
extern "C" {
}
static inline int get_is_rx_thread_enabled() {
- return (CGlobalInfo::m_options.is_rx_enabled() ?1:0);
+ return ((CGlobalInfo::m_options.is_rx_enabled() || CGlobalInfo::m_options.is_stateless()) ?1:0);
}
struct port_cfg_t;
virtual int wait_for_stable_link()=0;
virtual void wait_after_link_up(){};
virtual bool flow_control_disable_supported(){return true;}
- virtual int get_rx_stats(CPhyEthIF * _if, uint32_t *stats, uint32_t *prev_stats, int min, int max) {return -1;}
+ bool hw_rx_stat_supported(){return false;}
+ virtual int get_rx_stats(CPhyEthIF * _if, uint32_t *pkts, uint32_t *prev_pkts, uint32_t *bytes, uint32_t *prev_bytes
+ , int min, int max) {return -1;}
+ virtual int reset_rx_stats(CPhyEthIF * _if, uint32_t *stats) {return 0;}
virtual int dump_fdir_global_stats(CPhyEthIF * _if, FILE *fd) { return -1;}
virtual int get_stat_counters_num() {return 0;}
virtual int get_rx_stat_capabilities() {return 0;}
virtual void get_extended_stats(CPhyEthIF * _if,CPhyEthIFStats *stats);
virtual void clear_extended_stats(CPhyEthIF * _if);
-
+ int get_rx_stats(CPhyEthIF * _if, uint32_t *pkts, uint32_t *prev_pkts, uint32_t *bytes, uint32_t *prev_bytes
+ , int min, int max);
+ int dump_fdir_global_stats(CPhyEthIF * _if, FILE *fd) {return 0;}
+ int get_stat_counters_num() {return MAX_FLOW_STATS;}
+ int get_rx_stat_capabilities() {return TrexPlatformApi::IF_STAT_IPV4_ID;}
virtual int wait_for_stable_link();
void wait_after_link_up();
};
}
virtual void get_extended_stats(CPhyEthIF * _if,CPhyEthIFStats *stats);
virtual void clear_extended_stats(CPhyEthIF * _if);
- int get_rx_stats(CPhyEthIF * _if, uint32_t *stats, uint32_t *prev_stats, int min, int max);
+ int reset_rx_stats(CPhyEthIF * _if, uint32_t *stats);
+ int get_rx_stats(CPhyEthIF * _if, uint32_t *pkts, uint32_t *prev_pkts, uint32_t *bytes, uint32_t *prev_bytes, int min, int max);
int dump_fdir_global_stats(CPhyEthIF * _if, FILE *fd);
int get_stat_counters_num() {return MAX_FLOW_STATS;}
int get_rx_stat_capabilities() {return TrexPlatformApi::IF_STAT_IPV4_ID;}
virtual int wait_for_stable_link();
// disabling flow control on 40G using DPDK API causes the interface to malfunction
bool flow_control_disable_supported(){return false;}
+ bool hw_rx_stat_supported(){return true;}
private:
void add_del_rules(enum rte_filter_op op, uint8_t port_id, uint16_t type, uint8_t ttl, uint16_t ip_id, int queue, uint16_t stat_idx);
virtual int configure_rx_filter_rules_statfull(CPhyEthIF * _if);
}
if ( (po->is_latency_enabled()) || (po->preview.getOnlyLatency()) ){
- parse_err("Latecny check is not supported with interactive mode ");
+ parse_err("Latency check is not supported with interactive mode ");
}
if ( po->preview.getSingleCore() ){
- parse_err("single core is not supported with interactive mode ");
+ parse_err("Single core is not supported with interactive mode ");
}
}
int queues_prob_init();
int ixgbe_start();
int ixgbe_rx_queue_flush();
- int ixgbe_configure_mg();
+ void ixgbe_configure_mg();
+ void rx_sl_configure();
bool is_all_links_are_up(bool dump=false);
int reset_counters();
CFlowGenList m_fl;
bool m_fl_was_init;
volatile uint8_t m_signal[BP_MAX_CORES] __rte_cache_aligned ;
- CLatencyManager m_mg;
+ CLatencyManager m_mg; // statefull RX core
+ CRxCoreStateless m_rx_sl; // stateless RX core
CTrexGlobalIoMode m_io_modes;
private:
}
-int CGlobalTRex::ixgbe_configure_mg(void){
+void CGlobalTRex::ixgbe_configure_mg(void) {
int i;
CLatencyManagerCfg mg_cfg;
mg_cfg.m_max_ports = m_max_ports;
m_mg.Create(&mg_cfg);
m_mg.set_mask(CGlobalInfo::m_options.m_latency_mask);
-
- return (0);
}
+// init m_rx_sl object for stateless rx core
+void CGlobalTRex::rx_sl_configure(void) {
+ CRxSlCfg rx_sl_cfg;
+
+ rx_sl_cfg.m_max_ports = m_max_ports;
+
+ if ( get_vm_one_queue_enable() ) {
+#if 0
+ ???
+ /* vm mode, indirect queues */
+ for (i=0; i < m_max_ports; i++) {
+ CMessagingManager * rx_dp = CMsgIns::Ins()->getRxDp();
+ uint8_t thread_id = (i >> 1);
+ CNodeRing * r = rx_dp->getRingCpToDp(thread_id); ///??? should be rx to dp?
+ m_latency_vm_vports[i].Create((uint8_t)i,r,&m_mg);
+ rx_sl_cfg.m_ports[i] =&m_latency_vm_vports[i];
+ }
+#endif
+ } else {
+ for (int i = 0; i < m_max_ports; i++) {
+ CPhyEthIF * _if = &m_ports[i];
+ m_latency_vports[i].Create(_if, m_latency_tx_queue_id, 1);
+ rx_sl_cfg.m_ports[i] = &m_latency_vports[i];
+ }
+ }
+
+ m_rx_sl.create(rx_sl_cfg);
+}
int CGlobalTRex::ixgbe_start(void){
int i;
ixgbe_rx_queue_flush();
-
- ixgbe_configure_mg();
+ if (! get_is_stateless()) {
+ ixgbe_configure_mg();
+ } else {
+ rx_sl_configure();
+ }
/* core 0 - control
int CGlobalTRex::run_in_rx_core(void){
- if ( CGlobalInfo::m_options.is_rx_enabled() ){
- m_mg.start(0);
+ if (get_is_stateless()) {
+ m_rx_sl.start();
+ } else {
+ if ( CGlobalInfo::m_options.is_rx_enabled() ){
+ m_mg.start(0);
+ }
}
- // ??? start stateless rx
+
return (0);
}
static CGlobalTRex g_trex;
-// The HW counters start from some random values. The driver give us the diffs from previous,
-// each time we do get_rx_stats. We need to make one first call, at system startup,
-// and ignore the returned diffs
int CPhyEthIF::reset_hw_flow_stats() {
- uint32_t diff_stats[MAX_FLOW_STATS];
-
- if (get_ex_drv()->get_rx_stats(this, diff_stats, m_stats.m_fdir_prev_stats, 0, MAX_FLOW_STATS - 1) < 0) {
- return -1;
+ if (get_ex_drv()->hw_rx_stat_supported()) {
+ if (get_ex_drv()->reset_rx_stats(this, m_stats.m_fdir_prev_pkts) < 0) {
+ return -1;
+ }
+ } else {
+ g_trex.m_rx_sl.reset_rx_stats(get_port_id());
}
-
return 0;
}
// get/reset flow director counters
// return 0 if OK. -1 if operation not supported.
-// rx_stats, tx_stats - arrays of len max - min + 1. Returning rx, tx updated values.
+// rx_stats, tx_stats - arrays of len max - min + 1. Returning rx, tx updated absolute values.
// min, max - minimum, maximum counters range to get
// reset - If true, need to reset counter value after reading
int CPhyEthIF::get_flow_stats(uint64_t *rx_stats, tx_per_flow_t *tx_stats, int min, int max, bool reset) {
- uint32_t diff_stats[MAX_FLOW_STATS];
+ uint32_t diff_pkts[MAX_FLOW_STATS];
+ uint32_t diff_bytes[MAX_FLOW_STATS];
- if (get_ex_drv()->get_rx_stats(this, diff_stats, m_stats.m_fdir_prev_stats, min, max) < 0) {
+ if (get_ex_drv()->get_rx_stats(this, diff_pkts, m_stats.m_fdir_prev_pkts
+ , diff_bytes, m_stats.m_fdir_prev_bytes, min, max) < 0) {
return -1;
}
if ( reset ) {
// return value so far, and reset
if (rx_stats != NULL) {
- rx_stats[i - min] = m_stats.m_rx_per_flow[i] + diff_stats[i];
+ rx_stats[i - min] = m_stats.m_rx_per_flow[i] + diff_pkts[i];
}
if (tx_stats != NULL) {
tx_stats[i - min] = g_trex.clear_flow_tx_stats(m_port_id, i);
}
m_stats.m_rx_per_flow[i] = 0;
} else {
- m_stats.m_rx_per_flow[i] += diff_stats[i];
+ m_stats.m_rx_per_flow[i] += diff_pkts[i];
if (rx_stats != NULL) {
rx_stats[i - min] = m_stats.m_rx_per_flow[i];
}
CPlatformSocketInfo * lpsock=&CGlobalInfo::m_socket;
physical_thread_id_t phy_id =rte_lcore_id();
-
if ( lpsock->thread_phy_is_rx(phy_id) ) {
g_trex.run_in_rx_core();
}else{
&& (CGlobalInfo::m_options.m_latency_prev > 0)) {
uint32_t pkts = CGlobalInfo::m_options.m_latency_prev *
CGlobalInfo::m_options.m_latency_rate;
- printf("Start prev latency check- for %d sec \n",CGlobalInfo::m_options.m_latency_prev);
+ printf("Starting pre latency check for %d sec\n",CGlobalInfo::m_options.m_latency_prev);
g_trex.m_mg.start(pkts);
delay(CGlobalInfo::m_options.m_latency_prev* 1000);
printf("Finished \n");
}
rule_id = 0;
- // filter for byte 18 of packet (lsb of IP ID) should equal ff
+ // filter for byte 18 of packet (msb of IP ID) should equal ff
_if->pci_reg_write( (E1000_FHFT(rule_id)+(2*16)) , 0x00ff0000);
_if->pci_reg_write( (E1000_FHFT(rule_id)+(2*16) + 8) , 0x04); /* MASK */
// + bytes 12 + 13 (ether type) should indicate IP.
void CTRexExtendedDriverBase1G::clear_extended_stats(CPhyEthIF * _if){
}
-
+int CTRexExtendedDriverBase1G::get_rx_stats(CPhyEthIF * _if, uint32_t *pkts, uint32_t *prev_pkts
+ ,uint32_t *bytes, uint32_t *prev_bytes, int min, int max) {
+ uint32_t port_id = _if->get_port_id();
+ return g_trex.m_rx_sl.get_rx_stats(port_id, pkts, prev_pkts, bytes, prev_bytes, min, max);
+}
void CTRexExtendedDriverBase10G::clear_extended_stats(CPhyEthIF * _if){
_if->pci_reg_read(IXGBE_RXNFGPC);
}
}
+int CTRexExtendedDriverBase40G::reset_rx_stats(CPhyEthIF * _if, uint32_t *stats) {
+ uint32_t diff_stats[MAX_FLOW_STATS];
+
+ // The HW counters start from some random values. The driver give us the diffs from previous,
+ // each time we do get_rx_stats. We need to make one first call, at system startup,
+ // and ignore the returned diffs
+ return get_rx_stats(_if, diff_stats, stats, NULL, NULL, 0, MAX_FLOW_STATS - 1);
+}
+
// instead of adding this to rte_ethdev.h
extern "C" int rte_eth_fdir_stats_get(uint8_t port_id, uint32_t *stats, uint32_t start, uint32_t len);
// get rx stats on _if, between min and max
-// prev_stats should be the previous values read from the hardware.
+// prev_pkts should be the previous values read from the hardware.
// Getting changed to be equal to current HW values.
-// stats return the diff between prev_stats and current hw values
-int CTRexExtendedDriverBase40G::get_rx_stats(CPhyEthIF * _if, uint32_t *stats, uint32_t *prev_stats, int min, int max) {
+// pkts return the diff between prev_pkts and current hw values
+// bytes and prev_bytes are not used. X710 fdir filters do not support byte count.
+int CTRexExtendedDriverBase40G::get_rx_stats(CPhyEthIF * _if, uint32_t *pkts, uint32_t *prev_pkts
+ ,uint32_t *bytes, uint32_t *prev_bytes, int min, int max) {
uint32_t hw_stats[MAX_FLOW_STATS];
uint32_t port_id = _if->get_port_id();
uint32_t start = (port_id % m_if_per_card) * MAX_FLOW_STATS + min;
rte_eth_fdir_stats_get(port_id, hw_stats, start, len);
for (int i = loop_start; i < loop_start + len; i++) {
- if (hw_stats[i - min] >= prev_stats[i]) {
- stats[i] = (uint64_t)(hw_stats[i - min] - prev_stats[i]);
+ if (hw_stats[i - min] >= prev_pkts[i]) {
+ pkts[i] = (uint64_t)(hw_stats[i - min] - prev_pkts[i]);
} else {
// Wrap around
- stats[i] = (uint64_t)((hw_stats[i - min] + ((uint64_t)1 << 32)) - prev_stats[i]);
+ pkts[i] = (uint64_t)((hw_stats[i - min] + ((uint64_t)1 << 32)) - prev_pkts[i]);
}
- prev_stats[i] = hw_stats[i - min];
+ prev_pkts[i] = hw_stats[i - min];
}
return 0;
/* hardware */
g_trex.m_ports[interface_id].macaddr_get(&rte_mac_addr);
assert(ETHER_ADDR_LEN == 6);
- printf("interface %d speed: %d mac:", interface_id, info.speed);
- for (int i = 0; i < 6; i++) {
- info.mac_info.hw_macaddr[i] = rte_mac_addr.addr_bytes[i];
- printf("%x:", rte_mac_addr.addr_bytes[i]);
- }
- printf("\n");
/* software */
uint8_t sw_macaddr[12];
--- /dev/null
+#include <stdio.h>
+#include "latency.h"
+#include "flow_stat_parser.h"
+#include "stateless/rx/trex_stateless_rx_core.h"
+
+
+void CRxCoreStateless::create(const CRxSlCfg &cfg) {
+ m_max_ports = cfg.m_max_ports;
+
+ for (int i = 0; i < m_max_ports; i++) {
+ CLatencyManagerPerPort * lp = &m_ports[i];
+ // CCPortLatency * lpo = &m_ports[swap_port(i)].m_port;
+
+ lp->m_io = cfg.m_ports[i];
+ /* lp->m_port.Create(this,
+ i,
+ m_pkt_gen.get_payload_offset(),
+ m_pkt_gen.get_l4_offset(),
+ m_pkt_gen.get_pkt_size(),lpo );???*/
+ }
+
+}
+
+void CRxCoreStateless::start() {
+ static int count = 0;
+ static int i = 0;
+ while (1) {
+ count += try_rx();
+ i++;
+ if (i == 100000000) {
+ i = 0;
+ //??? remove
+ printf("counter:%d port0:[%u], port1:[%u]\n", count, m_ports[0].m_port.m_rx_pg_pkts[0], m_ports[1].m_port.m_rx_pg_pkts[1]);
+ }
+ }
+}
+
+// ??? temp try
+int CRxCoreStateless::try_rx() {
+ rte_mbuf_t * rx_pkts[64];
+ int i, total_pkts = 0;
+ for (i = 0; i < m_max_ports; i++) {
+ CLatencyManagerPerPort * lp = &m_ports[i];
+ rte_mbuf_t * m;
+ //m_cpu_dp_u.start_work();
+ /* try to read 64 packets clean up the queue */
+ uint16_t cnt_p = lp->m_io->rx_burst(rx_pkts, 64);
+ total_pkts += cnt_p;
+ if (cnt_p) {
+ int j;
+ for (j = 0; j < cnt_p; j++) {
+ Cxl710Parser parser;
+ m = rx_pkts[j];
+ if (parser.parse(rte_pktmbuf_mtod(m, uint8_t *), m->pkt_len) == 0) {
+ uint16_t ip_id;
+ if (parser.get_ip_id(ip_id) == 0) {
+ if (is_flow_stat_id(ip_id)) {
+ uint16_t hw_id = get_hw_id(ip_id);
+ m_ports[i].m_port.m_rx_pg_bytes[hw_id] += m->pkt_len;
+ m_ports[i].m_port.m_rx_pg_pkts[hw_id]++;
+ }
+ }
+ }
+ rte_pktmbuf_free(m);
+ }
+ /* commit only if there was work to do ! */
+ //m_cpu_dp_u.commit(); //??? what's this?
+ }/* if work */
+ }// all ports
+ return total_pkts;
+}
+
+bool CRxCoreStateless::is_flow_stat_id(uint16_t id) {
+ if ((id & 0xff00) == IP_ID_RESERVE_BASE) return true;
+ return false;
+}
+
+uint16_t CRxCoreStateless::get_hw_id(uint16_t id) {
+ return (0x00ff & id);
+}
+
+void CRxCoreStateless::reset_rx_stats(uint8_t port_id) {
+ for (int hw_id = 0; hw_id < MAX_FLOW_STATS; hw_id++) {
+ m_ports[port_id].m_port.m_rx_pg_bytes[hw_id] = 0;
+ m_ports[port_id].m_port.m_rx_pg_pkts[hw_id] = 0;
+ }
+}
+
+int CRxCoreStateless::get_rx_stats(uint8_t port_id, uint32_t *pkts, uint32_t *prev_pkts
+ , uint32_t *bytes, uint32_t *prev_bytes, int min, int max) {
+ for (int hw_id = min; hw_id <= max; hw_id++) {
+ pkts[hw_id] = m_ports[port_id].m_port.m_rx_pg_pkts[hw_id] - prev_pkts[hw_id];
+ prev_pkts[hw_id] = m_ports[port_id].m_port.m_rx_pg_pkts[hw_id];
+ bytes[hw_id] = m_ports[port_id].m_port.m_rx_pg_bytes[hw_id] - prev_bytes[hw_id];
+ prev_bytes[hw_id] = m_ports[port_id].m_port.m_rx_pg_bytes[hw_id];
+ }
+
+ return 0;
+}