refactor RX core
authorimarom <[email protected]>
Thu, 1 Dec 2016 12:07:27 +0000 (14:07 +0200)
committerimarom <[email protected]>
Thu, 1 Dec 2016 12:07:27 +0000 (14:07 +0200)
Signed-off-by: imarom <[email protected]>
16 files changed:
scripts/automation/trex_control_plane/stl/console/trex_console.py
scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py
scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py
scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py
src/common/basic_utils.cpp
src/common/basic_utils.h
src/flow_stat.cpp
src/main_dpdk.cpp
src/rpc-server/commands/trex_rpc_cmd_general.cpp
src/stateless/cp/trex_stateless_port.h
src/stateless/messaging/trex_stateless_messaging.cpp
src/stateless/messaging/trex_stateless_messaging.h
src/stateless/rx/trex_stateless_rx_core.cpp
src/stateless/rx/trex_stateless_rx_core.h
src/stateless/rx/trex_stateless_rx_port_mngr.cpp
src/stateless/rx/trex_stateless_rx_port_mngr.h

index e4321b8..b33b044 100755 (executable)
@@ -202,7 +202,7 @@ class TRexConsole(TRexGeneralCmd):
             func_name = f.__name__
             if func_name.startswith("do_"):
                 func_name = func_name[3:]
-
+                
             if not inst.stateless_client.is_connected():
                 print(format_text("\n'{0}' cannot be executed on offline mode\n".format(func_name), 'bold'))
                 return
@@ -313,6 +313,7 @@ class TRexConsole(TRexGeneralCmd):
     def do_shell (self, line):
         self.do_history(line)
 
+    @verify_connected
     def do_push (self, line):
         '''Push a local PCAP file\n'''
         self.stateless_client.push_line(line)
@@ -328,6 +329,7 @@ class TRexConsole(TRexGeneralCmd):
     def help_portattr (self):
         self.do_portattr("-h")
 
+    @verify_connected
     def do_set_rx_sniffer (self, line):
         '''Sets a port sniffer on RX channel as PCAP recorder'''
         self.stateless_client.set_rx_sniffer_line(line)
@@ -335,6 +337,7 @@ class TRexConsole(TRexGeneralCmd):
     def help_sniffer (self):
         self.do_set_rx_sniffer("-h")
 
+    @verify_connected
     def do_resolve (self, line):
         '''Resolve ARP for ports'''
         self.stateless_client.resolve_line(line)
@@ -431,6 +434,7 @@ class TRexConsole(TRexGeneralCmd):
         '''Release ports\n'''
         self.stateless_client.release_line(line)
 
+    @verify_connected
     def do_reacquire (self, line):
         '''reacquire all the ports under your logged user name'''
         self.stateless_client.reacquire_line(line)
index 8429bb8..cc20e08 100755 (executable)
@@ -869,6 +869,15 @@ class STLClient(object):
 
         return rc
 
+    def __get_rx_queue_pkts (self, port_id_list):
+        port_id_list = self.__ports(port_id_list)
+        rc = RC()
+
+        for port_id in port_id_list:
+            rc.add(self.ports[port_id].get_rx_queue_pkts())
+
+        return rc
+
 
     # connect to server
     def __connect(self):
@@ -1833,8 +1842,21 @@ class STLClient(object):
                 + :exc:`STLError`
 
         """
-        self._validate_port_list(src_port)
+        # validate src port
+        validate_type('src_port', src_port, int)
+        if src_port not in self.get_all_ports():
+            raise STLError("src port is not a valid port id")
         
+        if not is_valid_ipv4(dst_ipv4):
+            raise STLError("dst_ipv4 is not a valid IPv4 address: '{0}'".format(dst_ipv4))
+            
+        if (pkt_size < 64) or (pkt_size > 9216):
+            raise STLError("pkt_size should be a value between 64 and 9216: '{0}'".format(pkt_size))
+            
+        validate_type('count', count, int)
+            
+        
+            
         self.logger.pre_cmd("Pinging {0} from port {1} with {2} bytes of data:".format(dst_ipv4,
                                                                                        src_port,
                                                                                        pkt_size))
@@ -2873,11 +2895,7 @@ class STLClient(object):
 
         """
         # by default - resolve all the ports that are configured with IPv4 dest
-        if ports is None:
-            ports = self.get_resolvable_ports()
-            if not ports:
-                raise STLError('No ports configured with destination as IPv4')
-            
+        ports = ports if ports is not None else self.get_resolvable_ports()
         ports = self._validate_port_list(ports)
             
         active_ports = list_intersect(ports, self.get_active_ports())
@@ -3019,9 +3037,14 @@ class STLClient(object):
         ports = ports if ports is not None else self.get_acquired_ports()
         ports = self._validate_port_list(ports)
         
+        rc = self.__get_rx_queue_pkts(ports)
+        if not rc:
+            raise STLError(rc)
+            
+        # decode the data back to the user
         result = {}
-        for port in ports:
-            result[port] = self.ports[port].get_rx_queue_pkts()
+        for port, r in zip(ports, rc.data()):
+            result[port] = r
         
         return result
             
@@ -3688,6 +3711,8 @@ class STLClient(object):
             
         self.set_rx_sniffer(opts.ports, opts.output_filename, opts.limit)
 
+        return RC_OK()
+        
 
     @__console
     def resolve_line (self, line):
@@ -3703,9 +3728,19 @@ class STLClient(object):
         if not opts:
             return opts
 
+        ports = list_intersect(opts.ports, self.get_resolvable_ports())
+        if not ports:
+            if not opts.ports:
+                msg = 'resolve - no ports with IPv4 destination'
+            else:
+                msg = 'pause - none of ports {0} are configured with IPv4 destination'.format(opts.ports)
+                
+            self.logger.log(msg)
+            return RC_ERR(msg)
                      
-        self.resolve(ports = opts.ports, retries = opts.retries)
+        self.resolve(ports = ports, retries = opts.retries)
 
+        return RC_OK()
         
     
     @__console
index 42b7b89..9b95546 100644 (file)
@@ -825,37 +825,32 @@ class Port(object):
         else:
             info['is_virtual'] = 'N/A'
 
-        if 'speed' in attr:
-            info['speed'] = self.get_formatted_speed()
-        else:
-            info['speed'] = 'N/A'
-                                                  
+        # speed
+        info['speed'] = self.get_formatted_speed()
         
-        if 'rx_filter_mode' in attr:
-            info['rx_filter_mode'] = 'hardware match' if attr['rx_filter_mode'] == 'hw' else 'fetch all'
-        else:
-            info['rx_filter_mode'] = 'N/A'
+        # RX filter mode
+        info['rx_filter_mode'] = 'hardware match' if attr['rx_filter_mode'] == 'hw' else 'fetch all'
 
         # src MAC and IPv4
-        info['src_mac'] = attr.get('src_mac', 'N/A')
-            
-        info['src_ipv4'] = attr.get('src_ipv4', 'N/A')
+        info['src_mac']   = attr['src_mac']
+        info['src_ipv4']  = attr['src_ipv4']
+        
         if info['src_ipv4'] is None:
             info['src_ipv4'] = 'Not Configured'
 
         # dest
         dest = attr['dest']
         if dest['type'] == 'mac':
-            info['dest'] = dest['mac']
-            info['arp']  = '-'
+            info['dest']  = dest['mac']
+            info['arp']   = '-'
             
         elif dest['type'] == 'ipv4':
-            info['dest'] = dest['ipv4']
-            info['arp']  = dest['arp']
+            info['dest']  = dest['ipv4']
+            info['arp']   = dest['arp']
             
         elif dest['type'] == 'ipv4_u':
-            info['dest'] = dest['ipv4']
-            info['arp']  = 'unresolved'
+            info['dest']  = dest['ipv4']
+            info['arp']   = 'unresolved'
             
             
         # RX info
@@ -900,7 +895,7 @@ class Port(object):
             assert(0)
     
         
-    # return True if the port is resolved
+    # port is considered resolved if it's dest is either MAC or resolved IPv4
     def is_resolved (self):
         return (self.get_dst_addr()['mac'] is not None)
     
index 8a667dc..7ae22e8 100755 (executable)
@@ -245,8 +245,8 @@ def check_pkt_size (pkt_size):
     except ValueError:
         raise argparse.ArgumentTypeError("invalid packet size type: '{0}'".format(pkt_size))
         
-    if (pkt_size < 64) or (pkt_size > 9000):
-        raise argparse.ArgumentTypeError("invalid packet size: '{0}' - valid range is 64 to 9000".format(pkt_size))
+    if (pkt_size < 64) or (pkt_size > 9216):
+        raise argparse.ArgumentTypeError("invalid packet size: '{0}' - valid range is 64 to 9216".format(pkt_size))
     
     return pkt_size
     
index dfd3b18..fded49e 100755 (executable)
@@ -273,18 +273,18 @@ void utl_set_coredump_size(long size, bool map_huge_pages) {
     fclose(fp);
 }
 
-uint32_t utl_ipv4_to_uint32(const char *ipv4_str, uint32_t &ipv4_num) {
+bool utl_ipv4_to_uint32(const char *ipv4_str, uint32_t &ipv4_num) {
     
     uint32_t tmp;
     
     int rc = my_inet_pton4(ipv4_str, (unsigned char *)&tmp);
     if (!rc) {
-        return (0);
+        return false;
     }
     
     ipv4_num = PAL_NTOHL(tmp);
     
-    return (1);
+    return true;
 }
    
 std::string utl_uint32_to_ipv4(uint32_t ipv4_addr) {
index c30457d..36f9db8 100755 (executable)
@@ -101,7 +101,7 @@ std::string utl_generate_random_str(unsigned int &seed, int len);
  */
 void utl_set_coredump_size(long size, bool map_huge_pages = false);
 
-uint32_t       utl_ipv4_to_uint32(const char *ipv4_str, uint32_t &ipv4_num);
+bool           utl_ipv4_to_uint32(const char *ipv4_str, uint32_t &ipv4_num);
 std::string    utl_uint32_to_ipv4(uint32_t ipv4_addr);
    
 #endif
index dae2979..ba125df 100644 (file)
@@ -802,11 +802,15 @@ int CFlowStatRuleMgr::start_stream(TrexStream * stream) {
 #endif
 
     if (m_num_started_streams == 0) {
+        
         send_start_stop_msg_to_rx(true); // First transmitting stream. Rx core should start reading packets;
+        assert(m_rx_core->is_working());
+        
         //also good time to zero global counters
         memset(m_rx_cant_count_err, 0, sizeof(m_rx_cant_count_err));
         memset(m_tx_cant_count_err, 0, sizeof(m_tx_cant_count_err));
 
+        #if 0
         // wait to make sure that message is acknowledged. RX core might be in deep sleep mode, and we want to
         // start transmitting packets only after it is working, otherwise, packets will get lost.
         if (m_rx_core) { // in simulation, m_rx_core will be NULL
@@ -819,6 +823,8 @@ int CFlowStatRuleMgr::start_stream(TrexStream * stream) {
                 }
             }
         }
+        #endif
+        
     } else {
         // make sure rx core is working. If not, we got really confused somehow.
         if (m_rx_core)
@@ -966,13 +972,21 @@ int CFlowStatRuleMgr::set_mode(enum flow_stat_mode_e mode) {
 extern bool rx_should_stop;
 void CFlowStatRuleMgr::send_start_stop_msg_to_rx(bool is_start) {
     TrexStatelessCpToRxMsgBase *msg;
-
+    
     if (is_start) {
-        msg = new TrexStatelessRxEnableLatency();
+        static MsgReply<bool> reply;
+        reply.reset();
+        
+        msg = new TrexStatelessRxEnableLatency(reply);
+        m_ring_to_rx->Enqueue((CGenNode *)msg);
+        
+        /* hold until message was ack'ed - otherwise we might lose packets */
+        reply.wait_for_reply();
+        
     } else {
         msg = new TrexStatelessRxDisableLatency();
+        m_ring_to_rx->Enqueue((CGenNode *)msg);
     }
-    m_ring_to_rx->Enqueue((CGenNode *)msg);
 }
 
 // return false if no counters changed since last run. true otherwise
index 2783182..c02d676 100644 (file)
@@ -5744,7 +5744,7 @@ CFlowStatParser *CTRexExtendedDriverBase::get_flow_stat_parser() {
 
 // in 1G we need to wait if links became ready to soon
 void CTRexExtendedDriverBase1G::wait_after_link_up(){
-    //wait_x_sec(6 + CGlobalInfo::m_options.m_wait_before_traffic);
+    wait_x_sec(6 + CGlobalInfo::m_options.m_wait_before_traffic);
 }
 
 int CTRexExtendedDriverBase1G::wait_for_stable_link(){
index cb7d514..107b7b4 100644 (file)
@@ -776,31 +776,31 @@ TrexRpcCmdSetRxFeature::parse_capture_msg(const Json::Value &msg, TrexStatelessP
 
 void 
 TrexRpcCmdSetRxFeature::parse_queue_msg(const Json::Value &msg, TrexStatelessPort *port, Json::Value &result) {
-       bool enabled = parse_bool(msg, "enabled", result);
+    bool enabled = parse_bool(msg, "enabled", result);
 
-       if (enabled) {
+    if (enabled) {
 
-               uint64_t size = parse_uint32(msg, "size", result);
+        uint64_t size = parse_uint32(msg, "size", result);
 
-               if (size == 0) {
-                       generate_parse_err(result, "queue size cannot be zero");
-               }
+        if (size == 0) {
+            generate_parse_err(result, "queue size cannot be zero");
+        }
 
-               try {
-                       port->start_rx_queue(size);
-               } catch (const TrexException &ex) {
-                       generate_execute_err(result, ex.what());
-               }
+        try {
+            port->start_rx_queue(size);
+        } catch (const TrexException &ex) {
+            generate_execute_err(result, ex.what());
+        }
 
-       } else {
+    } else {
 
-               try {
-                       port->stop_rx_queue();
-               } catch (const TrexException &ex) {
-                       generate_execute_err(result, ex.what());
-               }
+        try {
+            port->stop_rx_queue();
+        } catch (const TrexException &ex) {
+            generate_execute_err(result, ex.what());
+        }
 
-       }
+    }
 
 }
 
index f2829b8..74ab17f 100644 (file)
@@ -377,21 +377,21 @@ public:
      */
     void stop_rx_capture();
 
-       /**
-        * start RX queueing of packets
-        
-        * @author imarom (11/7/2016)
-        
-        * @param limit 
-        */
-       void start_rx_queue(uint64_t limit);
-
-       /**
-        * stop RX queueing
-        
-        * @author imarom (11/7/2016)
-        */
-       void stop_rx_queue();
+    /**
+     * start RX queueing of packets
+     * 
+     * @author imarom (11/7/2016)
+     * 
+     * @param limit 
+     */
+    void start_rx_queue(uint64_t limit);
+
+    /**
+     * stop RX queueing
+     * 
+     * @author imarom (11/7/2016)
+     */
+    void stop_rx_queue();
 
     /**
      * fetch the RX queue packets from the queue
index 53d5a87..17acb21 100644 (file)
@@ -243,6 +243,8 @@ TrexDpPortEventMsg::handle() {
 /************************* messages from CP to RX **********************/
 bool TrexStatelessRxEnableLatency::handle (CRxCoreStateless *rx_core) {
     rx_core->enable_latency();
+    m_reply.set_reply(true);
+    
     return true;
 }
 
index 303548a..79a6bf0 100644 (file)
@@ -63,7 +63,7 @@ public:
         m_pending = false;
     }
 
-    T wait_for_reply(int timeout_ms = 100, int backoff_ms = 1) {
+    T wait_for_reply(int timeout_ms = 500, int backoff_ms = 1) {
         int guard = timeout_ms;
 
         while (is_pending()) {
@@ -461,7 +461,14 @@ public:
 
 
 class TrexStatelessRxEnableLatency : public TrexStatelessCpToRxMsgBase {
+public:
+    TrexStatelessRxEnableLatency(MsgReply<bool> &reply) : m_reply(reply) {
+    }
+    
     bool handle (CRxCoreStateless *rx_core);
+    
+private:
+    MsgReply<bool>    &m_reply;
 };
 
 class TrexStatelessRxDisableLatency : public TrexStatelessCpToRxMsgBase {
@@ -505,7 +512,7 @@ public:
     virtual bool handle(CRxCoreStateless *rx_core);
 
 private:
-    uint8_t           m_port_id;
+    uint8_t m_port_id;
 };
 
 
@@ -515,8 +522,8 @@ public:
                               uint64_t size,
                               MsgReply<bool> &reply) : m_reply(reply) {
         
-        m_port_id           = port_id;
-        m_size              = size;
+        m_port_id = port_id;
+        m_size    = size;
     }
 
     virtual bool handle(CRxCoreStateless *rx_core);
@@ -537,7 +544,7 @@ public:
     virtual bool handle(CRxCoreStateless *rx_core);
 
 private:
-    uint8_t           m_port_id;
+    uint8_t m_port_id;
 };
 
 
index f518fcd..dc63716 100644 (file)
@@ -121,6 +121,7 @@ bool CRxCoreStateless::periodic_check_for_cp_messages() {
         handle_cp_msg(msg);
     }
 
+    /* a message might result in a change of state */
     recalculate_next_state();
     return true;
 
@@ -218,9 +219,7 @@ void CRxCoreStateless::start() {
 
         switch (m_state) {
         case STATE_IDLE:
-            set_working_msg_ack(false);
             idle_state_loop();
-            set_working_msg_ack(true);
             break;
 
         case STATE_WORKING:
@@ -311,23 +310,11 @@ void CRxCoreStateless::reset_rx_stats(uint8_t port_id) {
 int CRxCoreStateless::get_rx_stats(uint8_t port_id, rx_per_flow_t *rx_stats, int min, int max
                                    , bool reset, TrexPlatformApi::driver_stat_cap_e type) {
 
-    RXLatency &latency = m_rx_port_mngr[port_id].get_latency();
-
-    for (int hw_id = min; hw_id <= max; hw_id++) {
-        if (type == TrexPlatformApi::IF_STAT_PAYLOAD) {
-            rx_stats[hw_id - min] = latency.m_rx_pg_stat_payload[hw_id];
-        } else {
-            rx_stats[hw_id - min] = latency.m_rx_pg_stat[hw_id];
-        }
-        if (reset) {
-            if (type == TrexPlatformApi::IF_STAT_PAYLOAD) {
-                latency.m_rx_pg_stat_payload[hw_id].clear();
-            } else {
-                latency.m_rx_pg_stat[hw_id].clear();
-            }
-        }
-    }
-    return 0;
+    /* for now only latency stats */
+    m_rx_port_mngr[port_id].get_latency_stats(rx_stats, min, max, reset, type);
+    
+    return (0);
+    
 }
 
 int CRxCoreStateless::get_rfc2544_info(rfc2544_info_t *rfc2544_info, int min, int max, bool reset) {
@@ -354,13 +341,6 @@ int CRxCoreStateless::get_rx_err_cntrs(CRxCoreErrCntrs *rx_err) {
     return 0;
 }
 
-void CRxCoreStateless::set_working_msg_ack(bool val) {
-    sanb_smp_memory_barrier();
-    m_ack_start_work_msg = val;
-    sanb_smp_memory_barrier();
-}
-
-
 void CRxCoreStateless::update_cpu_util(){
     m_cpu_cp_u.Update();
 }
@@ -373,21 +353,25 @@ double CRxCoreStateless::get_cpu_util() {
 void
 CRxCoreStateless::start_recorder(uint8_t port_id, const std::string &pcap_filename, uint64_t limit) {
     m_rx_port_mngr[port_id].start_recorder(pcap_filename, limit);
+    recalculate_next_state();
 }
 
 void
 CRxCoreStateless::stop_recorder(uint8_t port_id) {
     m_rx_port_mngr[port_id].stop_recorder();
+    recalculate_next_state();
 }
 
 void
 CRxCoreStateless::start_queue(uint8_t port_id, uint64_t size) {
     m_rx_port_mngr[port_id].start_queue(size);
+    recalculate_next_state();
 }
 
 void
 CRxCoreStateless::stop_queue(uint8_t port_id) {
     m_rx_port_mngr[port_id].stop_queue();
+    recalculate_next_state();
 }
 
 void
@@ -395,6 +379,8 @@ CRxCoreStateless::enable_latency() {
     for (int i = 0; i < m_max_ports; i++) {
         m_rx_port_mngr[i].enable_latency();
     }
+    
+    recalculate_next_state();
 }
 
 void
@@ -402,6 +388,8 @@ CRxCoreStateless::disable_latency() {
     for (int i = 0; i < m_max_ports; i++) {
         m_rx_port_mngr[i].disable_latency();
     }
+    
+    recalculate_next_state();
 }
 
 const RXPortManager &
index b27a7ca..7481ae2 100644 (file)
 
 class TrexStatelessCpToRxMsgBase;
 
-
+/**
+ * RFC 2544 implementation
+ * 
+ */
 class CRFC2544Info {
  public:
     void create();
@@ -88,7 +91,15 @@ class CRxCoreErrCntrs {
     uint64_t m_old_flow;
 };
 
+/**
+ * stateless RX core
+ * 
+ */
 class CRxCoreStateless {
+    
+    /**
+     * core states
+     */
     enum state_e {
         STATE_IDLE,
         STATE_WORKING,
@@ -106,8 +117,7 @@ class CRxCoreStateless {
 
 
     void quit() {m_state = STATE_QUIT;}
-    bool is_working() const {return (m_ack_start_work_msg == true);}
-    void set_working_msg_ack(bool val);
+    bool is_working() const {return (m_state == STATE_WORKING);}
     double get_cpu_util();
     void update_cpu_util();
 
index 00032e8..bd8650a 100644 (file)
@@ -156,6 +156,31 @@ RXLatency::reset_stats() {
     }
 }
 
+
+void
+RXLatency::get_stats(rx_per_flow_t *rx_stats,
+                     int min,
+                     int max,
+                     bool reset,
+                     TrexPlatformApi::driver_stat_cap_e type) {
+    
+    for (int hw_id = min; hw_id <= max; hw_id++) {
+        if (type == TrexPlatformApi::IF_STAT_PAYLOAD) {
+            rx_stats[hw_id - min] = m_rx_pg_stat_payload[hw_id];
+        } else {
+            rx_stats[hw_id - min] = m_rx_pg_stat[hw_id];
+        }
+        if (reset) {
+            if (type == TrexPlatformApi::IF_STAT_PAYLOAD) {
+                m_rx_pg_stat_payload[hw_id].clear();
+            } else {
+                m_rx_pg_stat[hw_id].clear();
+            }
+        }
+    }
+}
+
+
 Json::Value
 RXLatency::to_json() const {
     return Json::objectValue;
index bc34b5a..6af90f8 100644 (file)
@@ -43,12 +43,18 @@ public:
 
     void create(CRFC2544Info *rfc2544, CRxCoreErrCntrs *err_cntrs);
 
-    void reset_stats();
-
     void handle_pkt(const rte_mbuf_t *m);
 
     Json::Value to_json() const;
     
+    void get_stats(rx_per_flow_t *rx_stats,
+                   int min,
+                   int max,
+                   bool reset,
+                   TrexPlatformApi::driver_stat_cap_e type);
+    
+    void reset_stats();
+    
 private:
     bool is_flow_stat_id(uint32_t id) {
         if ((id & 0x000fff00) == IP_ID_RESERVE_BASE) return true;
@@ -276,6 +282,15 @@ public:
         m_latency.reset_stats();
     }
 
+    void get_latency_stats(rx_per_flow_t *rx_stats,
+                           int min,
+                           int max,
+                           bool reset,
+                           TrexPlatformApi::driver_stat_cap_e type) {
+        
+        return m_latency.get_stats(rx_stats, min, max, reset, type);
+    }
+    
     RXLatency & get_latency() {
         return m_latency;
     }