draft - unreviewed 23/5223/1
authorimarom <[email protected]>
Wed, 4 Jan 2017 16:46:45 +0000 (18:46 +0200)
committerimarom <[email protected]>
Wed, 4 Jan 2017 16:46:45 +0000 (18:46 +0200)
Signed-off-by: imarom <[email protected]>
23 files changed:
linux_dpdk/ws_main.py
scripts/automation/trex_control_plane/stl/console/trex_console.py
scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py
scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py
scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_stats.py
scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py
src/main_dpdk.cpp
src/rpc-server/commands/trex_rpc_cmd_general.cpp
src/rpc-server/commands/trex_rpc_cmds.h
src/rpc-server/trex_rpc_cmds_table.cpp
src/stateless/cp/trex_stateless.cpp
src/stateless/cp/trex_stateless.h
src/stateless/cp/trex_stateless_port.cpp
src/stateless/cp/trex_stateless_port.h
src/stateless/messaging/trex_stateless_messaging.cpp
src/stateless/messaging/trex_stateless_messaging.h
src/stateless/rx/trex_stateless_capture.cpp
src/stateless/rx/trex_stateless_capture.h
src/stateless/rx/trex_stateless_rx_core.cpp
src/stateless/rx/trex_stateless_rx_core.h
src/stateless/rx/trex_stateless_rx_defs.h
src/stateless/rx/trex_stateless_rx_port_mngr.cpp
src/stateless/rx/trex_stateless_rx_port_mngr.h

index f3e2560..0d9833d 100755 (executable)
@@ -297,7 +297,9 @@ stateless_src = SrcGroup(dir='src/stateless/',
                                     'dp/trex_stateless_dp_core.cpp',
                                     'messaging/trex_stateless_messaging.cpp',
                                     'rx/trex_stateless_rx_core.cpp',
-                                    'rx/trex_stateless_rx_port_mngr.cpp'
+                                    'rx/trex_stateless_rx_port_mngr.cpp',
+                                    'rx/trex_stateless_capture.cpp',
+                                    'common/trex_stateless_pkt.cpp',
                                     ])
 # JSON package
 json_src = SrcGroup(dir='external_libs/json',
@@ -571,6 +573,7 @@ includes_path =''' ../src/pal/linux_dpdk/
                    ../src/stateless/cp/
                    ../src/stateless/dp/
                    ../src/stateless/rx/
+                   ../src/stateless/common/
                    ../src/stateless/messaging/
 
                    ../external_libs/yaml-cpp/include/
index 7d47128..38a1fca 100755 (executable)
@@ -347,12 +347,12 @@ class TRexConsole(TRexGeneralCmd):
 
         
     @verify_connected
-    def do_set_rx_sniffer (self, line):
-        '''Sets a port sniffer on RX channel as PCAP recorder'''
-        self.stateless_client.set_rx_sniffer_line(line)
+    def do_capture (self, line):
+        '''Start PCAP capturing on port'''
+        self.stateless_client.start_capture_line(line)
 
-    def help_sniffer (self):
-        self.do_set_rx_sniffer("-h")
+    def help_capture (self):
+        self.do_capture("-h")
 
     @verify_connected
     def do_resolve (self, line):
index e163d51..1b57218 100755 (executable)
@@ -601,11 +601,9 @@ class STLClient(object):
                                                                  self.xstats,
                                                                  self.async_client.monitor)
 
-
-
-
     ############# private functions - used by the class itself ###########
 
+    
     # some preprocessing for port argument
     def __ports (self, port_id_list):
 
@@ -832,27 +830,6 @@ class STLClient(object):
         return rc
 
 
-    def __set_rx_sniffer (self, port_id_list, base_filename, limit):
-        port_id_list = self.__ports(port_id_list)
-        rc = RC()
-
-        for port_id in port_id_list:
-            head, tail = os.path.splitext(base_filename)
-            filename = "{0}-{1}{2}".format(head, port_id, tail)
-            rc.add(self.ports[port_id].set_rx_sniffer(filename, limit))
-
-        return rc
-
-
-    def __remove_rx_sniffer (self, port_id_list):
-        port_id_list = self.__ports(port_id_list)
-        rc = RC()
-
-        for port_id in port_id_list:
-            rc.add(self.ports[port_id].remove_rx_sniffer())
-
-        return rc
-
     def __set_rx_queue (self, port_id_list, size):
         port_id_list = self.__ports(port_id_list)
         rc = RC()
@@ -1071,7 +1048,7 @@ class STLClient(object):
 
     ############ functions used by other classes but not users ##############
 
-    def _validate_port_list (self, port_id_list):
+    def _validate_port_list (self, port_id_list, allow_empty = False):
         # listfiy single int
         if isinstance(port_id_list, int):
             port_id_list = [port_id_list]
@@ -1080,7 +1057,7 @@ class STLClient(object):
         if not isinstance(port_id_list, list):
             raise STLTypeError('port_id_list', type(port_id_list), list)
 
-        if not port_id_list:
+        if not port_id_list and not allow_empty:
             raise STLError('No ports provided')
 
         valid_ports = self.get_all_ports()
@@ -2084,9 +2061,9 @@ class STLClient(object):
                 self.set_port_attr(ports,
                                    promiscuous = False,
                                    link_up = True if restart else None)
-                self.set_service_mode(ports, False)
-                self.remove_rx_sniffer(ports)
                 self.remove_rx_queue(ports)
+                self.set_service_mode(ports, False)
+                
                 
         except STLError as e:
             self.logger.post_cmd(False)
@@ -3013,29 +2990,39 @@ class STLClient(object):
             
         
     @__api_check(True)
-    def set_rx_sniffer (self, ports = None, base_filename = 'rx.pcap', limit = 1000):
+    def start_capture (self, tx_ports, rx_ports, limit = 1000):
         """
-            Sets a RX sniffer for port(s) written to a PCAP file
+            Starts a capture to PCAP on port(s)
 
             :parameters:
-                ports          - for which ports to apply a unique sniffer (each port gets a unique file)
-                base_filename  - filename will be appended with '-<port_number>', e.g. rx.pcap --> rx-0.pcap, rx-1.pcap etc.
+                tx_ports       - on which ports to capture TX
+                rx_ports       - on which ports to capture RX
                 limit          - limit how many packets will be written
             :raises:
                 + :exe:'STLError'
 
         """
-        ports = ports if ports is not None else self.get_acquired_ports()
-        ports = self._validate_port_list(ports)
-
+        
+        tx_ports = self._validate_port_list(tx_ports, allow_empty = True)
+        rx_ports = self._validate_port_list(rx_ports, allow_empty = True)
+        merge_ports = set(tx_ports + rx_ports)
+        
+        if not merge_ports:
+            raise STLError("start_capture - must get at least one port to capture")
+            
         # check arguments
-        validate_type('base_filename', base_filename, basestring)
         validate_type('limit', limit, (int))
         if limit <= 0:
             raise STLError("'limit' must be a positive value")
 
-        self.logger.pre_cmd("Setting RX sniffers on port(s) {0}:".format(ports))
-        rc = self.__set_rx_sniffer(ports, base_filename, limit)
+        non_service_ports =  list_difference(set(tx_ports + rx_ports), self.get_service_enabled_ports())
+        if non_service_ports:
+            raise STLError("Port(s) {0} are not under service mode. PCAP capturing requires all ports to be in service mode")
+        
+            
+        self.logger.pre_cmd("Starting PCAP capturing up to {0} packets".format(limit))
+        
+        rc = self._transmit("start_capture", params = {'limit': limit, 'tx': tx_ports, 'rx': rx_ports})
         self.logger.post_cmd(rc)
 
 
@@ -3045,7 +3032,7 @@ class STLClient(object):
 
 
     @__api_check(True)
-    def remove_rx_sniffer (self, ports = None):
+    def stop_capture (self, ports = None):
         """
             Removes RX sniffer from port(s)
 
@@ -3779,21 +3766,21 @@ class STLClient(object):
 
              
     @__console
-    def set_rx_sniffer_line (self, line):
-        '''Sets a port sniffer on RX channel in form of a PCAP file'''
+    def start_capture_line (self, line):
+        '''Starts PCAP recorder on port(s)'''
 
         parser = parsing_opts.gen_parser(self,
-                                         "set_rx_sniffer",
-                                         self.set_rx_sniffer_line.__doc__,
-                                         parsing_opts.PORT_LIST_WITH_ALL,
-                                         parsing_opts.OUTPUT_FILENAME,
+                                         "capture",
+                                         self.start_capture_line.__doc__,
+                                         parsing_opts.TX_PORT_LIST,
+                                         parsing_opts.RX_PORT_LIST,
                                          parsing_opts.LIMIT)
 
         opts = parser.parse_args(line.split(), default_ports = self.get_acquired_ports(), verify_acquired = True)
         if not opts:
             return opts
 
-        self.set_rx_sniffer(opts.ports, opts.output_filename, opts.limit)
+        self.start_capture(opts.tx_port_list, opts.rx_port_list, opts.limit)
 
         return RC_OK()
         
index 07587b9..654514c 100644 (file)
@@ -56,7 +56,8 @@ class Port(object):
     def __init__ (self, port_id, user, comm_link, session_id, info):
         self.port_id = port_id
         
-        self.state = self.STATE_IDLE
+        self.state        = self.STATE_IDLE
+        self.service_mode = False
         
         self.handler = None
         self.comm_link = comm_link
@@ -247,14 +248,16 @@ class Port(object):
             raise Exception("port {0}: bad state received from server '{1}'".format(self.port_id, port_state))
 
         self.owner = rc.data()['owner']
-
+        
         self.next_available_id = int(rc.data()['max_stream_id']) + 1
 
         self.status = rc.data()
-
+        
         # replace the attributes in a thread safe manner
         self.set_ts_attr(rc.data()['attr'])
-
+        
+        self.service_mode = rc.data()['service']
+        
         return self.ok()
 
 
@@ -490,33 +493,17 @@ class Port(object):
 
     
     @owned
-    def set_rx_sniffer (self, pcap_filename, limit):
+    def start_capture (self, pcap_filename, mode, limit):
 
-        if not self.is_service_mode_on():
+        if mode != 'tx' and not self.is_service_mode_on():
             return self.err('port service mode must be enabled for performing RX capturing. Please enable service mode')
             
         params = {"handler":        self.handler,
                   "port_id":        self.port_id,
-                  "type":           "capture",
-                  "enabled":        True,
-                  "pcap_filename":  pcap_filename,
+                  "mode":           mode,
                   "limit":          limit}
 
-        rc = self.transmit("set_rx_feature", params)
-        if rc.bad():
-            return self.err(rc.err())
-
-        return self.ok()
-
-      
-    @owned
-    def remove_rx_sniffer (self):
-        params = {"handler":        self.handler,
-                  "port_id":        self.port_id,
-                  "type":           "capture",
-                  "enabled":        False}
-
-        rc = self.transmit("set_rx_feature", params)
+        rc = self.transmit("start_capture", params)
         if rc.bad():
             return self.err(rc.err())
 
@@ -719,23 +706,21 @@ class Port(object):
     
     @owned
     def set_service_mode (self, enabled):
-        rc = self.set_attr(rx_filter_mode = 'all' if enabled else 'hw')
-        if not rc:
-            return rc
-            
-        if not enabled:
-            rc = self.remove_rx_queue()
-            if not rc:
-                return rc
-                
-            rc = self.remove_rx_sniffer()
-            if not rc:
-                return rc
-                
+        params = {"handler": self.handler,
+                  "port_id": self.port_id,
+                  "enabled": enabled}
+
+        rc = self.transmit("service", params)
+        if rc.bad():
+            return self.err(rc.err())
+
+        self.service_mode = enabled
         return self.ok()
+        
 
     def is_service_mode_on (self):
-        return self.get_rx_filter_mode() == 'all'
+        return self.service_mode
+        
                 
     @writeable
     def push_remote (self, pcap_filename, ipg_usec, speedup, count, duration, is_dual, slave_handler, min_ipg_usec):
@@ -902,11 +887,6 @@ class Port(object):
         # RX info
         rx_info = self.status['rx_info']
 
-        # RX sniffer
-        sniffer = rx_info['sniffer']
-        info['rx_sniffer'] = '{0}\n[{1} / {2}]'.format(sniffer['pcap_filename'], sniffer['count'], sniffer['limit']) if sniffer['is_active'] else 'off'
-        
-
         # RX queue
         queue = rx_info['queue']
         info['rx_queue'] = '[{0} / {1}]'.format(queue['count'], queue['size']) if queue['is_active'] else 'off'
@@ -928,9 +908,6 @@ class Port(object):
     def get_layer_cfg (self):
         return self.__attr['layer_cfg']
         
-    def get_rx_filter_mode (self):
-        return self.__attr['rx_filter_mode']
-        
     def is_l3_mode (self):
         return self.get_layer_cfg()['ipv4']['state'] != 'none'
         
@@ -1002,7 +979,6 @@ class Port(object):
                 "layer mode": format_text(info['layer_mode'], 'green' if info['layer_mode'] == 'IPv4' else 'magenta'),
                 "RX Filter Mode": info['rx_filter_mode'],
                 "RX Queueing": info['rx_queue'],
-                "RX sniffer": info['rx_sniffer'],
                 "Grat ARP": info['grat_arp'],
 
                 }
index 3872606..21c9af8 100644 (file)
@@ -682,7 +682,6 @@ class CTRexInfoGenerator(object):
                                         ("-----", []),
                                         ("RX Filter Mode", []),
                                         ("RX Queueing", []),
-                                        ("RX sniffer", []),
                                         ("Grat ARP", []),
                                         ]
                                        )
index f5dab30..265c43f 100755 (executable)
@@ -63,6 +63,9 @@ PKT_SIZE
 
 SERVICE_OFF
 
+TX_PORT_LIST
+RX_PORT_LIST
+
 SRC_IPV4
 DST_IPV4
 
@@ -591,6 +594,24 @@ OPTIONS_DB = {MULTIPLIER: ArgumentPack(['-m', '--multiplier'],
                                          'default': True,
                                          'help': 'Deactivates services on port(s)'}),
                 
+              TX_PORT_LIST: ArgumentPack(['--tx'],
+                                         {'nargs': '+',
+                                          'dest':'tx_port_list',
+                                          'metavar': 'TX',
+                                          'action': 'merge',
+                                          'type': int,
+                                          'help': 'A list of ports to capture on the TX side',
+                                          'default': []}),
+               
+              RX_PORT_LIST: ArgumentPack(['--rx'],
+                                         {'nargs': '+',
+                                          'dest':'rx_port_list',
+                                          'metavar': 'RX',
+                                          'action': 'merge',
+                                          'type': int,
+                                          'help': 'A list of ports to capture on the RX side',
+                                          'default': []}),
+                
               # advanced options
               PORT_LIST_WITH_ALL: ArgumentGroup(MUTEX, [PORT_LIST,
                                                         ALL_PORTS],
index 9cc0e61..36fe180 100644 (file)
@@ -2169,6 +2169,8 @@ int CCoreEthIF::send_burst(CCorePerPort * lp_port,
                            uint16_t len,
                            CVirtualIFPerSideStats  * lp_stats){
 
+    //assert(m_ring_to_rx->Enqueue((CGenNode *)0x0) == 0);
+    
     uint16_t ret = lp_port->m_port->tx_burst(lp_port->m_tx_queue_id,lp_port->m_table,len);
 #ifdef DELAY_IF_NEEDED
     while ( unlikely( ret<len ) ){
@@ -3617,8 +3619,9 @@ void CGlobalTRex::rx_sl_configure(void) {
     int i;
 
     rx_sl_cfg.m_max_ports = m_max_ports;
+    rx_sl_cfg.m_tx_cores  = get_cores_tx();
     rx_sl_cfg.m_num_crc_fix_bytes = get_ex_drv()->get_num_crc_fix_bytes();
-
+        
     if ( get_vm_one_queue_enable() ) {
         /* vm mode, indirect queues  */
         for (i=0; i < m_max_ports; i++) {
index d4854a7..ec5c315 100644 (file)
@@ -28,6 +28,7 @@ limitations under the License.
 #include <internal_api/trex_platform_api.h>
 
 #include "trex_stateless_rx_core.h"
+#include "trex_stateless_capture.h"
 
 #include <fstream>
 #include <iostream>
@@ -339,24 +340,6 @@ TrexRpcCmdGetSysInfo::_run(const Json::Value &params, Json::Value &result) {
     return (TREX_RPC_CMD_OK);
 }
 
-
-int
-TrexRpcCmdSetPortAttr::parse_rx_filter_mode(const Json::Value &msg, uint8_t port_id, Json::Value &result) {
-    const std::string type = parse_choice(msg, "mode", {"hw", "all"}, result);
-
-    rx_filter_mode_e filter_mode;
-    if (type == "hw") {
-        filter_mode = RX_FILTER_MODE_HW;
-    } else if (type == "all") {
-        filter_mode = RX_FILTER_MODE_ALL;
-    } else {
-        /* can't happen - parsed choice */
-        assert(0);
-    }
-
-    return get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id)->set_rx_filter_mode(filter_mode);
-}
-
 /**
  * set port commands
  *
@@ -399,11 +382,6 @@ TrexRpcCmdSetPortAttr::_run(const Json::Value &params, Json::Value &result) {
             ret = get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id)->set_flow_ctrl(mode);
         }
 
-        else if (name == "rx_filter_mode") {
-            const Json::Value &rx = parse_object(attr, name, result);
-            ret = parse_rx_filter_mode(rx, port_id, result);
-        }
-
         /* unknown attribute */
         else {
             generate_execute_err(result, "unknown attribute type: '" + name + "'");
@@ -588,7 +566,8 @@ TrexRpcCmdGetPortStatus::_run(const Json::Value &params, Json::Value &result) {
     result["result"]["owner"]         = (port->get_owner().is_free() ? "" : port->get_owner().get_name());
     result["result"]["state"]         = port->get_state_as_string();
     result["result"]["max_stream_id"] = port->get_max_stream_id();
-
+    result["result"]["service"]       = port->is_service_mode_on();
+    
     /* attributes */
     get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id)->to_json(result["result"]["attr"]);
     
@@ -660,6 +639,27 @@ TrexRpcCmdPushRemote::_run(const Json::Value &params, Json::Value &result) {
 
 }
 
+
+/**
+ * set service mode on/off
+ * 
+ */
+trex_rpc_cmd_rc_e
+TrexRpcCmdSetServiceMode::_run(const Json::Value &params, Json::Value &result) {
+    uint8_t port_id = parse_port(params, result);
+    bool enabled = parse_bool(params, "enabled", result);
+    
+    TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id);
+    try {
+        port->set_service_mode(enabled);
+    } catch (TrexException &ex) {
+        generate_execute_err(result, ex.what());
+    }
+    
+    result["result"] = Json::objectValue;
+    return (TREX_RPC_CMD_OK);
+}
 /**
  * set on/off RX software receive mode
  *
@@ -670,12 +670,14 @@ TrexRpcCmdSetRxFeature::_run(const Json::Value &params, Json::Value &result) {
     uint8_t port_id = parse_port(params, result);
     TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id);
 
+    if (!port->is_service_mode_on()) {
+        generate_execute_err(result, "rx_feature - available only under service mode");
+    }
+    
     /* decide which feature is being set */
-    const std::string type = parse_choice(params, "type", {"capture", "queue", "server"}, result);
+    const std::string type = parse_choice(params, "type", {"queue", "server"}, result);
 
-    if (type == "capture") {
-        parse_capture_msg(params, port, result);
-    } else if (type == "queue") {
+    if (type == "queue") {
         parse_queue_msg(params, port, result);
     } else if (type == "server") {
         parse_server_msg(params, port, result);
@@ -688,38 +690,6 @@ TrexRpcCmdSetRxFeature::_run(const Json::Value &params, Json::Value &result) {
    
 }
 
-void 
-TrexRpcCmdSetRxFeature::parse_capture_msg(const Json::Value &msg, TrexStatelessPort *port, Json::Value &result) {
-    
-    bool enabled = parse_bool(msg, "enabled", result);
-
-    if (enabled) {
-
-        std::string pcap_filename = parse_string(msg, "pcap_filename", result);
-        uint64_t limit = parse_uint32(msg, "limit", result);
-
-        if (limit == 0) {
-            generate_parse_err(result, "limit cannot be zero");
-        }
-
-        try {
-            port->start_rx_capture(pcap_filename, limit);
-        } catch (const TrexException &ex) {
-            generate_execute_err(result, ex.what());
-        }
-
-    } else {
-
-        try {
-            port->stop_rx_capture();
-        } catch (const TrexException &ex) {
-            generate_execute_err(result, ex.what());
-        }
-
-    }
-
-}
-
 void 
 TrexRpcCmdSetRxFeature::parse_queue_msg(const Json::Value &msg, TrexStatelessPort *port, Json::Value &result) {
     bool enabled = parse_bool(msg, "enabled", result);
@@ -762,8 +732,13 @@ TrexRpcCmdGetRxQueuePkts::_run(const Json::Value &params, Json::Value &result) {
 
     TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id);
 
+    if (!port->is_service_mode_on()) {
+        generate_execute_err(result, "get_rx_queue_pkts - available only under service mode");
+    }
+
+    
     try {
-        const RXPacketBuffer *pkt_buffer = port->get_rx_queue_pkts();
+        const TrexPktBuffer *pkt_buffer = port->get_rx_queue_pkts();
         if (pkt_buffer) {
             result["result"]["pkts"] = pkt_buffer->to_json();
             delete pkt_buffer;
@@ -806,6 +781,7 @@ TrexRpcCmdSetL2::_run(const Json::Value &params, Json::Value &result) {
         generate_execute_err(result, ex.what());
     }
     
+    result["result"] = Json::objectValue;
     return (TREX_RPC_CMD_OK);
 }
 
@@ -863,7 +839,50 @@ TrexRpcCmdSetL3::_run(const Json::Value &params, Json::Value &result) {
         }
         
     }
-    
+   
+    result["result"] = Json::objectValue; 
     return (TREX_RPC_CMD_OK);    
     
 }
+   
+/**
+ * starts PCAP capturing
+ * 
+ */
+trex_rpc_cmd_rc_e
+TrexRpcCmdStartCapture::_run(const Json::Value &params, Json::Value &result) {
+    
+    uint32_t limit             = parse_uint32(params, "limit", result);
+    const Json::Value &tx_json = parse_array(params, "tx", result);
+    const Json::Value &rx_json = parse_array(params, "rx", result);
+    
+    CaptureFilter filter;
+    
+    std::set<uint8_t> ports;
+    
+    /* populate the filter */
+    for (int i = 0; i < tx_json.size(); i++) {
+        uint8_t tx_port = parse_byte(tx_json, i, result);
+        filter.add_tx(tx_port);
+        ports.insert(tx_port);
+    }
+    
+    for (int i = 0; i < rx_json.size(); i++) {
+        uint8_t rx_port = parse_byte(rx_json, i, result);
+        filter.add_rx(rx_port);
+        ports.insert(rx_port);
+    }
+    
+    /* check that all ports are under service mode */
+    for (uint8_t port_id : ports) {
+        TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id);
+        if (!port->is_service_mode_on()) {
+            generate_parse_err(result, "start_capture is available only under service mode");
+        }
+    }
+    
+    get_stateless_obj()->start_capture(filter, limit);
+    
+    result["result"] = Json::objectValue;
+    return (TREX_RPC_CMD_OK);
+}
index 6639be7..1ea63cc 100644 (file)
@@ -94,8 +94,6 @@ TREX_RPC_CMD_DEFINE(TrexRpcCmdGetPortXStatsValues, "get_port_xstats_values", 1,
 TREX_RPC_CMD_DEFINE(TrexRpcCmdGetPortXStatsNames,  "get_port_xstats_names",  1, false, APIClass::API_CLASS_TYPE_CORE);
 
 TREX_RPC_CMD_DEFINE_EXTENDED(TrexRpcCmdSetPortAttr, "set_port_attr", 2, true,  APIClass::API_CLASS_TYPE_CORE,
-                             
-    int parse_rx_filter_mode(const Json::Value &msg, uint8_t port_id, Json::Value &result);
 );
 
 
@@ -150,16 +148,19 @@ TREX_RPC_CMD_DEFINE(TrexRpcCmdPushRemote, "push_remote", 6, true, APIClass::API_
 
 TREX_RPC_CMD_DEFINE(TrexRpcCmdShutdown, "shutdown", 2, false, APIClass::API_CLASS_TYPE_CORE);
 
-TREX_RPC_CMD_DEFINE_EXTENDED(TrexRpcCmdSetRxFeature, "set_rx_feature", 3, false, APIClass::API_CLASS_TYPE_CORE,
-    void parse_capture_msg(const Json::Value &msg, TrexStatelessPort *port, Json::Value &result);
+TREX_RPC_CMD_DEFINE_EXTENDED(TrexRpcCmdSetRxFeature, "set_rx_feature", 3, true, APIClass::API_CLASS_TYPE_CORE,
     void parse_queue_msg(const Json::Value &msg,  TrexStatelessPort *port, Json::Value &result);
     void parse_server_msg(const Json::Value &msg, TrexStatelessPort *port, Json::Value &result);
 
 );
 
-TREX_RPC_CMD_DEFINE(TrexRpcCmdSetL2, "set_l2", 2, false, APIClass::API_CLASS_TYPE_CORE);
-TREX_RPC_CMD_DEFINE(TrexRpcCmdSetL3, "set_l3", 3, false, APIClass::API_CLASS_TYPE_CORE);
-TREX_RPC_CMD_DEFINE(TrexRpcCmdGetRxQueuePkts, "get_rx_queue_pkts", 2, false, APIClass::API_CLASS_TYPE_CORE);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdSetL2, "set_l2", 2, true, APIClass::API_CLASS_TYPE_CORE);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdSetL3, "set_l3", 3, true, APIClass::API_CLASS_TYPE_CORE);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdGetRxQueuePkts, "get_rx_queue_pkts", 1, true, APIClass::API_CLASS_TYPE_CORE);
+
+TREX_RPC_CMD_DEFINE(TrexRpcCmdSetServiceMode, "service", 2, true, APIClass::API_CLASS_TYPE_CORE);
+
+TREX_RPC_CMD_DEFINE(TrexRpcCmdStartCapture, "start_capture", 3, false, APIClass::API_CLASS_TYPE_CORE);
 
 #endif /* __TREX_RPC_CMD_H__ */
 
index 94a3e1b..3d4d5a2 100644 (file)
@@ -75,8 +75,11 @@ TrexRpcCommandsTable::TrexRpcCommandsTable() {
     register_command(new TrexRpcCmdSetRxFeature());
     register_command(new TrexRpcCmdGetRxQueuePkts());
     
+    register_command(new TrexRpcCmdSetServiceMode());
     register_command(new TrexRpcCmdSetL2());
     register_command(new TrexRpcCmdSetL3());
+    
+    register_command(new TrexRpcCmdStartCapture());
 }
 
 
index c31ba0a..32babbf 100644 (file)
@@ -18,13 +18,15 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
 */
-#include <trex_stateless.h>
-#include <trex_stateless_port.h>
 
-#include <sched.h>
+//#include <sched.h>
 #include <iostream>
 #include <unistd.h>
 
+#include "trex_stateless.h"
+#include "trex_stateless_port.h"
+#include "trex_stateless_messaging.h"
+
 using namespace std;
 
 /***********************************************************
@@ -140,54 +142,35 @@ TrexStateless::get_dp_core_count() {
     return m_platform_api->get_dp_core_count();
 }
 
-void
-TrexStateless::encode_stats(Json::Value &global) {
-
-    TrexPlatformGlobalStats stats;
-    m_platform_api->get_global_stats(stats);
-
-    global["cpu_util"] = stats.m_stats.m_cpu_util;
-    global["rx_cpu_util"] = stats.m_stats.m_rx_cpu_util;
-
-    global["tx_bps"]   = stats.m_stats.m_tx_bps;
-    global["rx_bps"]   = stats.m_stats.m_rx_bps;
-
-    global["tx_pps"]   = stats.m_stats.m_tx_pps;
-    global["rx_pps"]   = stats.m_stats.m_rx_pps;
-
-    global["total_tx_pkts"] = Json::Value::UInt64(stats.m_stats.m_total_tx_pkts);
-    global["total_rx_pkts"] = Json::Value::UInt64(stats.m_stats.m_total_rx_pkts);
-
-    global["total_tx_bytes"] = Json::Value::UInt64(stats.m_stats.m_total_tx_bytes);
-    global["total_rx_bytes"] = Json::Value::UInt64(stats.m_stats.m_total_rx_bytes);
-
-    global["tx_rx_errors"]    = Json::Value::UInt64(stats.m_stats.m_tx_rx_errors);
-
-    for (uint8_t i = 0; i < m_port_count; i++) {
-        std::stringstream ss;
-
-        ss << "port " << i;
-        Json::Value &port_section = global[ss.str()];
-
-        m_ports[i]->encode_stats(port_section);
-    }
+capture_id_t
+TrexStateless::start_capture(const CaptureFilter &filter, uint64_t limit) {
+    static MsgReply<capture_id_t> reply;
+    
+    reply.reset();
+    
+    CNodeRing *ring = CMsgIns::Ins()->getCpRx()->getRingCpToDp(0);
+    TrexStatelessRxStartCapture *msg = new TrexStatelessRxStartCapture(filter, limit, reply);
+     
+    ring->Enqueue((CGenNode *)msg);
+    
+    capture_id_t new_id = reply.wait_for_reply();
+    
+    return (new_id);
 }
 
-/**
- * generate a snapshot for publish (async publish)
- * 
- */
-void
-TrexStateless::generate_publish_snapshot(std::string &snapshot) {
-    Json::FastWriter writer;
-    Json::Value root;
-
-    root["name"] = "trex-stateless-info";
-    root["type"] = 0;
-
-    /* stateless specific info goes here */
-    root["data"] = Json::nullValue;
-
-    snapshot = writer.write(root);
+capture_id_t
+TrexStateless::stop_capture(capture_id_t capture_id) {
+    static MsgReply<capture_id_t> reply;
+    
+    reply.reset();
+    
+    CNodeRing *ring = CMsgIns::Ins()->getCpRx()->getRingCpToDp(0);
+    TrexStatelessRxStopCapture *msg = new TrexStatelessRxStopCapture(capture_id, reply);
+     
+    ring->Enqueue((CGenNode *)msg);
+    
+    capture_id_t rc = reply.wait_for_reply();
+    
+    return (rc);
 }
 
index 3a1a2c2..33f16ce 100644 (file)
@@ -102,6 +102,7 @@ public:
  * defines the TRex stateless operation mode
  * 
  */
+class CaptureFilter;
 class TrexStateless {
 public:
 
@@ -132,32 +133,21 @@ public:
 
 
     /**
-     * shutdown the server
-     */
-    void shutdown();
-
-    /**
-     * fetch xstats names (keys of dict)
-     * 
-     */
-    void encode_xstats_names(Json::Value &global);
-
-    /**
-     * fetch xstats values
-     * 
+     * starts a capture on a 'filter' of ports 
+     * with a limit of packets 
      */
-    void encode_xstats_values(Json::Value &global);
-
+    capture_id_t start_capture(const CaptureFilter &filter, uint64_t limit);
+    
     /**
-     * fetch all the stats
+     * stops an active capture
      * 
      */
-    void               encode_stats(Json::Value &global);
-
+    capture_id_t stop_capture(capture_id_t capture_id);
+    
     /**
-     * generate a snapshot for publish
+     * shutdown the server
      */
-    void generate_publish_snapshot(std::string &snapshot);
+    void shutdown();
 
     const TrexPlatformApi * get_platform_api() {
         return (m_platform_api);
index 7d331c6..9cf048b 100644 (file)
@@ -162,7 +162,7 @@ private:
  * trex stateless port
  * 
  **************************/
-TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api) : m_dp_events(this) {
+TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api) : m_dp_events(this), m_service_mode(port_id, api) {
     std::vector<std::pair<uint8_t, uint8_t>> core_pair_list;
 
     m_port_id            = port_id;
@@ -948,24 +948,6 @@ TrexStatelessPort::remove_and_delete_all_streams() {
     }
 }
 
-void 
-TrexStatelessPort::start_rx_capture(const std::string &pcap_filename, uint64_t limit) {
-    static MsgReply<bool> reply;
-    
-    reply.reset();
-    
-    TrexStatelessRxStartCapture *msg = new TrexStatelessRxStartCapture(m_port_id, pcap_filename, limit, reply);
-    send_message_to_rx((TrexStatelessCpToRxMsgBase *)msg);
-    
-    /* as below, must wait for ACK from RX core before returning ACK */
-    reply.wait_for_reply();
-}
-
-void
-TrexStatelessPort::stop_rx_capture() {
-    TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStopCapture(m_port_id);
-    send_message_to_rx(msg);
-}
 
 void 
 TrexStatelessPort::start_rx_queue(uint64_t size) {
@@ -980,18 +962,22 @@ TrexStatelessPort::start_rx_queue(uint64_t size) {
        this might cause the user to lose some packets from the queue
      */
     reply.wait_for_reply();
+
+    m_service_mode.set_rx_queue();
 }
 
 void
 TrexStatelessPort::stop_rx_queue() {
     TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStopQueue(m_port_id);
     send_message_to_rx(msg);
+
+    m_service_mode.unset_rx_queue();
 }
 
 
-const RXPacketBuffer *
+const TrexPktBuffer *
 TrexStatelessPort::get_rx_queue_pkts() {
-    static MsgReply<const RXPacketBuffer *> reply;
+    static MsgReply<const TrexPktBuffer *> reply;
     
     reply.reset();
 
index d4ac401..2cc1b9c 100644 (file)
@@ -26,12 +26,14 @@ limitations under the License.
 #include "trex_dp_port_events.h"
 #include "trex_stateless_rx_defs.h"
 #include "trex_stream.h"
+#include "trex_exception.h"
+#include "trex_stateless_capture.h"
 
 class TrexStatelessCpToDpMsgBase;
 class TrexStatelessCpToRxMsgBase;
 class TrexStreamsGraphObj;
 class TrexPortMultiplier;
-class RXPacketBuffer;
+class TrexPktBuffer;
 
 
 /**
@@ -113,6 +115,56 @@ private:
     static const std::string g_unowned_handler;
 };
 
+/**
+ * enforces in/out from service mode
+ * 
+ * @author imarom (1/4/2017)
+ */
+class TrexServiceMode {
+public:
+    TrexServiceMode(uint8_t port_id, const TrexPlatformApi *api) {
+        m_is_enabled   = false;
+        m_has_rx_queue = false;
+        m_port_id      = port_id;
+        m_port_attr    = api->getPortAttrObj(port_id);
+    }
+
+    void enable() {
+        m_port_attr->set_rx_filter_mode(RX_FILTER_MODE_ALL);
+        m_is_enabled = true;
+    }
+
+    void disable() {
+        if (m_has_rx_queue) {
+            throw TrexException("unable to disable service mode - please remove RX queue");
+        }
+
+        if (TrexStatelessCaptureMngr::getInstance().is_active(m_port_id)) {
+            throw TrexException("unable to disable service - an active capture on port " + std::to_string(m_port_id) + " exists");
+        }
+        
+        m_port_attr->set_rx_filter_mode(RX_FILTER_MODE_HW);
+        m_is_enabled = false;
+    }
+
+    bool is_enabled() const {
+        return m_is_enabled;
+    }
+
+    void set_rx_queue() {
+        m_has_rx_queue = true;
+    }
+
+    void unset_rx_queue() {
+        m_has_rx_queue = false;
+    }
+
+private:
+    bool            m_is_enabled;
+    bool            m_has_rx_queue;
+    TRexPortAttr   *m_port_attr;
+    uint8_t         m_port_id;
+};
 
 class AsyncStopEvent;
 
@@ -150,7 +202,15 @@ public:
         RC_ERR_FAILED_TO_COMPILE_STREAMS
     };
 
-
+    /**
+     * port capture mode
+     */
+    enum capture_mode_e {
+        PORT_CAPTURE_NONE = 0,
+        PORT_CAPTURE_RX,
+        PORT_CAPTURE_ALL
+    };
+    
     TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api);
 
     ~TrexStatelessPort();
@@ -227,6 +287,20 @@ public:
                      double            duration,
                      bool              is_dual);
 
+    /** 
+     * moves port to / out service mode 
+     */
+    void set_service_mode(bool enabled) {
+        if (enabled) {
+            m_service_mode.enable();
+        } else {
+            m_service_mode.disable();
+        }
+    }
+    bool is_service_mode_on() const {
+        return m_service_mode.is_enabled();
+    }
+    
     /**
      * get the port state
      *
@@ -367,16 +441,16 @@ public:
 
 
     /**
-     * enable RX capture on port
+     * starts capturing packets
      * 
      */
-    void start_rx_capture(const std::string &pcap_filename, uint64_t limit);
+    void start_capture(capture_mode_e mode, uint64_t limit);
 
     /**
-     * disable RX capture if on
+     * stops capturing packets
      * 
      */
-    void stop_rx_capture();
+    void stop_capture();
 
     /**
      * start RX queueing of packets
@@ -398,7 +472,7 @@ public:
      * fetch the RX queue packets from the queue
      * 
      */
-    const RXPacketBuffer *get_rx_queue_pkts();
+    const TrexPktBuffer *get_rx_queue_pkts();
 
     /**
      * configures port for L2 mode
@@ -429,7 +503,9 @@ public:
     }
     
 private:
-
+    void set_service_mode_on();
+    void set_service_mode_off();
+    
     bool is_core_active(int core_id);
 
     const std::vector<uint8_t> get_core_id_list () {
@@ -514,6 +590,9 @@ private:
     TrexPortOwner       m_owner;
 
     int m_pending_async_stop_event;
+    
+    TrexServiceMode m_service_mode;
+
     static const uint32_t MAX_STREAMS = 20000;
 
 };
index aeb1e67..f441c69 100644 (file)
@@ -263,18 +263,20 @@ bool TrexStatelessRxQuit::handle (CRxCoreStateless *rx_core) {
 
 bool
 TrexStatelessRxStartCapture::handle(CRxCoreStateless *rx_core) {
-    rx_core->start_recorder(m_port_id, m_pcap_filename, m_limit);
+    capture_id_t capture_id = rx_core->start_capture(m_limit, m_filter);
 
     /* mark as done */
-    m_reply.set_reply(true);
+    m_reply.set_reply(capture_id);
     
     return true;
 }
 
 bool
 TrexStatelessRxStopCapture::handle(CRxCoreStateless *rx_core) {
-    rx_core->stop_recorder(m_port_id);
-
+    capture_id_t rc = rx_core->stop_capture(m_capture_id);
+    
+    m_reply.set_reply(rc);
+    
     return true;
 }
 
@@ -299,7 +301,7 @@ TrexStatelessRxStopQueue::handle(CRxCoreStateless *rx_core) {
 
 bool
 TrexStatelessRxQueueGetPkts::handle(CRxCoreStateless *rx_core) {
-    const RXPacketBuffer *pkt_buffer = rx_core->get_rx_queue_pkts(m_port_id);
+    const TrexPktBuffer *pkt_buffer = rx_core->get_rx_queue_pkts(m_port_id);
     
     /* set the reply */
     m_reply.set_reply(pkt_buffer);
index 72b92d1..5f4978f 100644 (file)
@@ -28,12 +28,13 @@ limitations under the License.
 #include "trex_stateless_rx_defs.h"
 #include "os_time.h"
 #include "utl_ip.h"
+#include "trex_stateless_capture.h"
 
 class TrexStatelessDpCore;
 class CRxCoreStateless;
 class TrexStreamsCompiledObj;
 class CFlowGenListPerThread;
-class RXPacketBuffer;
+class TrexPktBuffer;
 
 /**
  * Generic message reply object
@@ -487,36 +488,35 @@ class TrexStatelessRxQuit : public TrexStatelessCpToRxMsgBase {
 
 class TrexStatelessRxStartCapture : public TrexStatelessCpToRxMsgBase {
 public:
-    TrexStatelessRxStartCapture(uint8_t port_id,
-                                const std::string &pcap_filename,
+    TrexStatelessRxStartCapture(const CaptureFilter& filter,
                                 uint64_t limit,
-                                MsgReply<bool> &reply) : m_reply(reply) {
+                                MsgReply<capture_id_t> &reply) : m_reply(reply) {
         
-        m_port_id          = port_id;
-        m_limit            = limit;
-        m_pcap_filename    = pcap_filename;
+        m_limit  = limit;
+        m_filter = filter;
     }
 
     virtual bool handle(CRxCoreStateless *rx_core);
 
 private:
-    uint8_t            m_port_id;
-    std::string        m_pcap_filename;
-    uint64_t           m_limit;
-    MsgReply<bool>    &m_reply;
+    uint8_t                    m_port_id;
+    uint64_t                   m_limit;
+    CaptureFilter              m_filter;
+    MsgReply<capture_id_t>    &m_reply;
 };
 
 
 class TrexStatelessRxStopCapture : public TrexStatelessCpToRxMsgBase {
 public:
-    TrexStatelessRxStopCapture(uint8_t port_id) {
-        m_port_id = port_id;
+    TrexStatelessRxStopCapture(capture_id_t capture_id, MsgReply<capture_id_t> &reply) : m_reply(reply) {
+        m_capture_id = capture_id;
     }
 
     virtual bool handle(CRxCoreStateless *rx_core);
 
 private:
-    uint8_t m_port_id;
+    capture_id_t              m_capture_id;
+    MsgReply<capture_id_t>   &m_reply;
 };
 
 
@@ -556,7 +556,7 @@ private:
 class TrexStatelessRxQueueGetPkts : public TrexStatelessCpToRxMsgBase {
 public:
 
-    TrexStatelessRxQueueGetPkts(uint8_t port_id, MsgReply<const RXPacketBuffer *> &reply) : m_reply(reply) {
+    TrexStatelessRxQueueGetPkts(uint8_t port_id, MsgReply<const TrexPktBuffer *> &reply) : m_reply(reply) {
         m_port_id = port_id;
     }
 
@@ -568,7 +568,7 @@ public:
 
 private:
     uint8_t                              m_port_id;
-    MsgReply<const RXPacketBuffer *>    &m_reply;
+    MsgReply<const TrexPktBuffer *>     &m_reply;
     
 };
 
index 83bb2d3..4ed126c 100644 (file)
@@ -53,22 +53,36 @@ TrexStatelessCapture::handle_pkt_rx(const rte_mbuf_t *m, int port) {
     m_pkt_buffer->push(m);
 }
 
+void
+TrexStatelessCaptureMngr::update_global_filter() {
+    CaptureFilter new_filter;
+    
+    for (TrexStatelessCapture *capture : m_captures) {
+        new_filter += capture->get_filter();
+    }
+    
+    m_global_filter = new_filter;
+}
+
 capture_id_t
 TrexStatelessCaptureMngr::add(uint64_t limit, const CaptureFilter &filter) {
     
     if (m_captures.size() > MAX_CAPTURE_SIZE) {
-        throw TrexException(TrexException::T_CAPTURE_MAX_INSTANCES);
+        return CAPTURE_TOO_MANY_CAPTURES;
     }
     
 
     int new_id = m_id_counter++;
     TrexStatelessCapture *new_buffer = new TrexStatelessCapture(new_id, limit, filter);
     m_captures.push_back(new_buffer);
+    /* update global filter */
+    update_global_filter();
     
     return new_id;
 }
 
-void
+capture_id_t
 TrexStatelessCaptureMngr::remove(capture_id_t id) {
     
     int index = -1;
@@ -81,12 +95,18 @@ TrexStatelessCaptureMngr::remove(capture_id_t id) {
     
     /* does not exist */
     if (index == -1) {
-        throw TrexException(TrexException::T_CAPTURE_INVALID_ID);
+        return CAPTURE_ID_NOT_FOUND;
     }
     
     TrexStatelessCapture *capture =  m_captures[index];
     m_captures.erase(m_captures.begin() + index);
-    delete capture; 
+    
+    delete capture;
+    
+    /* update global filter */
+    update_global_filter();
+    
+    return id;
 }
 
 void
index f7cd451..4d0b6a7 100644 (file)
@@ -31,16 +31,16 @@ limitations under the License.
 class CaptureFilter {
 public:
     CaptureFilter() {
-        tx_active = 0;
-        rx_active = 0;
+        m_tx_active = 0;
+        m_rx_active = 0;
     }
     
     void add_tx(uint8_t port_id) {
-        tx_active |= (1LL << port_id);
+        m_tx_active |= (1LL << port_id);
     }
 
     void add_rx(uint8_t port_id) {
-        rx_active |= (1LL << port_id);
+        m_rx_active |= (1LL << port_id);
     }
     
     void add(uint8_t port_id) {
@@ -63,21 +63,36 @@ public:
     
     bool in_rx(uint8_t port_id) const {
         uint64_t bit = (1LL << port_id);
-        return ((rx_active & bit) == bit);
+        return ((m_rx_active & bit) == bit);
     }
     
     bool in_tx(uint8_t port_id) const {
         uint64_t bit = (1LL << port_id);
-        return ((tx_active & bit) == bit);
+        return ((m_tx_active & bit) == bit);
+    }
+    
+    bool in_any(uint8_t port_id) const {
+        return ( in_tx(port_id) || in_rx(port_id) );
+    }
+    
+    CaptureFilter& operator +=(const CaptureFilter &other) {
+        m_tx_active |= other.m_tx_active;
+        m_rx_active |= other.m_rx_active;
+        
+        return *this;
     }
     
 private:
     
-    uint64_t  tx_active;
-    uint64_t  rx_active;
+    uint64_t  m_tx_active;
+    uint64_t  m_rx_active;
 };
 
-typedef uint64_t capture_id_t;
+typedef int64_t capture_id_t;
+enum {
+    CAPTURE_ID_NOT_FOUND = -1,
+    CAPTURE_TOO_MANY_CAPTURES = -2,
+};
 
 class TrexStatelessCapture {
 public:
@@ -93,6 +108,10 @@ public:
         return m_id;
     }
     
+    const CaptureFilter & get_filter() const {
+        return m_filter;
+    }
+    
 private:
     TrexPktBuffer   *m_pkt_buffer;
     CaptureFilter    m_filter;
@@ -122,9 +141,11 @@ public:
    
      
     /**
-     * stops capture mode
+     * stops capture mode 
+     * on success, will return the ID of the removed one 
+     * o.w it will be an error 
      */
-    void remove(capture_id_t id);
+    capture_id_t remove(capture_id_t id);
     
     /**
      * removes all captures
@@ -139,8 +160,8 @@ public:
      * 
      * @return bool 
      */
-    bool is_active() const {
-        return (m_captures.size() != 0);
+    bool is_active(uint8_t port) const {
+        return m_global_filter.in_any(port);
     }
     
     /**
@@ -153,7 +174,7 @@ public:
      */
     void handle_pkt_rx(const rte_mbuf_t *m, int port) {
         /* fast path */
-        if (!is_active()) {
+        if (!is_active(port)) {
             return;
         }
         
@@ -169,11 +190,15 @@ private:
     }
     
     void handle_pkt_rx_slow_path(const rte_mbuf_t *m, int port);
+    void update_global_filter();
     
     std::vector<TrexStatelessCapture *> m_captures;
     
     capture_id_t m_id_counter;
     
+    /* a union of all the filters curently active */
+    CaptureFilter m_global_filter;
+    
     static const int MAX_CAPTURE_SIZE = 10;
 };
 
index d27485d..f1ba303 100644 (file)
@@ -69,11 +69,11 @@ void CRFC2544Info::export_data(rfc2544_info_t_ &obj) {
 void CRxCoreStateless::create(const CRxSlCfg &cfg) {
     m_capture = false;
     m_max_ports = cfg.m_max_ports;
-
+    m_tx_cores  = cfg.m_tx_cores;
+    
     CMessagingManager * cp_rx = CMsgIns::Ins()->getCpRx();
 
     m_ring_from_cp = cp_rx->getRingCpToDp(0);
-    m_ring_to_cp   = cp_rx->getRingDpToCp(0);
     m_state = STATE_IDLE;
 
     for (int i = 0; i < MAX_FLOW_STATS_PAYLOAD; i++) {
@@ -130,6 +130,36 @@ bool CRxCoreStateless::periodic_check_for_cp_messages() {
 
 }
 
+void
+CRxCoreStateless::periodic_check_for_dp_messages() {
+
+    for (int i = 0; i < m_tx_cores; i++) {
+        periodic_check_for_dp_messages_core(i);
+    }
+    
+}
+
+void
+CRxCoreStateless::periodic_check_for_dp_messages_core(uint32_t core_id) {
+
+    CNodeRing *ring = CMsgIns::Ins()->getRxDp()->getRingDpToCp(core_id);
+    
+    /* fast path */
+    if ( likely ( ring->isEmpty() ) ) {
+        return;
+    }
+
+    while (true) {
+        CGenNode *node = NULL;
+
+        if (ring->Dequeue(node) != 0) {
+            break;
+        }
+        
+        //assert(node);
+    }
+}
+
 void CRxCoreStateless::recalculate_next_state() {
     if (m_state == STATE_QUIT) {
         return;
@@ -175,16 +205,6 @@ void CRxCoreStateless::idle_state_loop() {
     }
 }
 
-/**
- * for each port give a tick (for flushing if needed)
- * 
- */
-void CRxCoreStateless::port_manager_tick() {
-    for (int i = 0; i < m_max_ports; i++) {
-        m_rx_port_mngr[i].tick();
-    }
-}
-
 /**
  * for each port handle the grat ARP mechansim
  * 
@@ -199,7 +219,6 @@ void CRxCoreStateless::handle_work_stage() {
     
     /* set the next sync time to */
     dsec_t sync_time_sec = now_sec() + (1.0 / 1000);
-    dsec_t tick_time_sec = now_sec() + 1.0;
     dsec_t grat_arp_sec  = now_sec() + (double)CGlobalInfo::m_options.m_arp_ref_per;
 
     while (m_state == STATE_WORKING) {
@@ -211,14 +230,10 @@ void CRxCoreStateless::handle_work_stage() {
 
         if ( (now - sync_time_sec) > 0 ) {
             periodic_check_for_cp_messages();
+            //periodic_check_for_dp_messages();
             sync_time_sec = now + (1.0 / 1000);
         }
         
-        if ( (now - tick_time_sec) > 0) {
-            port_manager_tick();
-            tick_time_sec = now + 1.0;
-        }
-
         if ( (now - grat_arp_sec) > 0) {
             handle_grat_arp();
             grat_arp_sec = now + (double)CGlobalInfo::m_options.m_arp_ref_per;
@@ -317,16 +332,14 @@ double CRxCoreStateless::get_cpu_util() {
 }
 
 
-void
-CRxCoreStateless::start_recorder(uint8_t port_id, const std::string &pcap_filename, uint64_t limit) {
-    m_rx_port_mngr[port_id].start_recorder(pcap_filename, limit);
-    recalculate_next_state();
+capture_id_t
+CRxCoreStateless::start_capture(uint64_t limit, const CaptureFilter &filter) {
+    return TrexStatelessCaptureMngr::getInstance().add(limit, filter);
 }
 
-void
-CRxCoreStateless::stop_recorder(uint8_t port_id) {
-    m_rx_port_mngr[port_id].stop_recorder();
-    recalculate_next_state();
+capture_id_t
+CRxCoreStateless::stop_capture(capture_id_t capture_id) {
+    return TrexStatelessCaptureMngr::getInstance().remove(capture_id);
 }
 
 void
index 4eed59a..21ed51b 100644 (file)
@@ -27,6 +27,7 @@
 #include "pal/linux/sanb_atomic.h"
 #include "utl_cpuu.h"
 #include "trex_stateless_rx_port_mngr.h"
+#include "trex_stateless_capture.h"
 
 class TrexStatelessCpToRxMsgBase;
 
@@ -127,16 +128,16 @@ class CRxCoreStateless {
     double get_cpu_util();
     void update_cpu_util();
 
-    const RXPacketBuffer *get_rx_queue_pkts(uint8_t port_id) {
+    const TrexPktBuffer *get_rx_queue_pkts(uint8_t port_id) {
         return m_rx_port_mngr[port_id].get_pkt_buffer();
     }
 
     /**
-     * start capturing of RX packets on a specific port 
+     * start capturing packets
      *  
      */
-    void start_recorder(uint8_t port_id, const std::string &pcap_filename, uint64_t limit);
-    void stop_recorder(uint8_t port_id);
+    capture_id_t start_capture(uint64_t limit, const CaptureFilter &filter);
+    capture_id_t stop_capture(capture_id_t capture_id);
 
     /**
      * start RX queueing of packets
@@ -162,7 +163,12 @@ class CRxCoreStateless {
 
  private:
     void handle_cp_msg(TrexStatelessCpToRxMsgBase *msg);
+
     bool periodic_check_for_cp_messages();
+
+    void periodic_check_for_dp_messages();
+    void periodic_check_for_dp_messages_core(uint32_t core_id);
+    
     void tickle();
     void idle_state_loop();
 
@@ -172,7 +178,6 @@ class CRxCoreStateless {
     void capture_pkt(rte_mbuf_t *m);
     void handle_rx_queue_msgs(uint8_t thread_id, CNodeRing * r);
     void handle_work_stage();
-    void port_manager_tick();
     void handle_grat_arp();
 
     int process_all_pending_pkts(bool flush_rx = false);
@@ -186,10 +191,10 @@ class CRxCoreStateless {
  private:
     TrexMonitor      m_monitor;
     uint32_t         m_max_ports;
+    uint32_t         m_tx_cores;
     bool             m_capture;
     state_e          m_state;
     CNodeRing       *m_ring_from_cp;
-    CNodeRing       *m_ring_to_cp;
     CCpuUtlDp        m_cpu_dp_u;
     CCpuUtlCp        m_cpu_cp_u;
 
index aefcc13..367cf4e 100644 (file)
@@ -38,10 +38,12 @@ class CRxSlCfg {
         m_max_ports = 0;
         m_cps = 0.0;
         m_num_crc_fix_bytes = 0;
+        m_tx_cores = 0;
     }
 
  public:
     uint32_t             m_max_ports;
+    uint32_t             m_tx_cores;
     double               m_cps;
     CPortLatencyHWBase * m_ports[TREX_MAX_PORTS];
     uint8_t              m_num_crc_fix_bytes;
index e16b3d0..d2e0b4e 100644 (file)
 */
 #include "bp_sim.h"
 #include "trex_stateless_rx_port_mngr.h"
-#include "common/captureFile.h"
 #include "trex_stateless_rx_core.h"
 #include "common/Network/Packet/Arp.h"
 #include "pkt_gen.h"
-
-/**
- * copy MBUF to a flat buffer
- * 
- * @author imarom (12/20/2016)
- * 
- * @param dest - buffer with at least rte_pktmbuf_pkt_len(m) 
- *               bytes
- * @param m - MBUF to copy 
- * 
- * @return uint8_t* 
- */
-void copy_mbuf(uint8_t *dest, const rte_mbuf_t *m) {
-    
-    int index = 0;
-    for (const rte_mbuf_t *it = m; it != NULL; it = it->next) {
-        const uint8_t *src = rte_pktmbuf_mtod(it, const uint8_t *);
-        memcpy(dest + index, src, it->data_len);
-        index += it->data_len;
-    }
-}
-
-/**************************************
- * RX packet
- * 
- *************************************/
-RXPacket::RXPacket(const rte_mbuf_t *m) {
-
-    /* allocate buffer */
-    m_size = m->pkt_len;
-    m_raw = new uint8_t[m_size];
-
-    /* copy data */
-    copy_mbuf(m_raw, m);
-
-    /* generate a packet timestamp */
-    m_timestamp = now_sec();
-}
+#include "trex_stateless_capture.h"
 
 /**************************************
  * latency RX feature
@@ -231,79 +193,12 @@ RXLatency::to_json() const {
  * 
  *************************************/
 
-RXPacketBuffer::RXPacketBuffer(uint64_t size) {
-    m_buffer           = nullptr;
-    m_head             = 0;
-    m_tail             = 0;
-    m_size             = (size + 1); // for the empty/full difference 1 slot reserved
-
-    /* generate queue */
-    m_buffer = new RXPacket*[m_size](); // zeroed
-}
-
-RXPacketBuffer::~RXPacketBuffer() {
-    assert(m_buffer);
-
-    while (!is_empty()) {
-        RXPacket *pkt = pop();
-        delete pkt;
-    }
-    delete [] m_buffer;
-}
-
-void 
-RXPacketBuffer::push(const rte_mbuf_t *m) {
-    /* if full - pop the oldest */
-    if (is_full()) {
-        delete pop();
-    }
-
-    /* push packet */
-    m_buffer[m_head] = new RXPacket(m);
-    m_head = next(m_head);
-}
-
-RXPacket *
-RXPacketBuffer::pop() {
-    assert(!is_empty());
-    
-    RXPacket *pkt = m_buffer[m_tail];
-    m_tail = next(m_tail);
-    
-    return pkt;
-}
-
-uint64_t
-RXPacketBuffer::get_element_count() const {
-    if (m_head >= m_tail) {
-        return (m_head - m_tail);
-    } else {
-        return ( get_capacity() - (m_tail - m_head - 1) );
-    }
-}
-
-Json::Value
-RXPacketBuffer::to_json() const {
-
-    Json::Value output = Json::arrayValue;
-
-    int tmp = m_tail;
-    while (tmp != m_head) {
-        RXPacket *pkt = m_buffer[tmp];
-        output.append(pkt->to_json());
-        tmp = next(tmp);
-    }
-
-    return output;
-}
-
-
 void
 RXQueue::start(uint64_t size) {
     if (m_pkt_buffer) {
         delete m_pkt_buffer;
     }
-    m_pkt_buffer = new RXPacketBuffer(size);
+    m_pkt_buffer = new TrexPktBuffer(size, TrexPktBuffer::MODE_DROP_HEAD);
 }
 
 void
@@ -314,7 +209,7 @@ RXQueue::stop() {
     }
 }
 
-const RXPacketBuffer *
+const TrexPktBuffer *
 RXQueue::fetch() {
 
     /* if no buffer or the buffer is empty - give a NULL one */
@@ -323,10 +218,10 @@ RXQueue::fetch() {
     }
     
     /* hold a pointer to the old one */
-    RXPacketBuffer *old_buffer = m_pkt_buffer;
+    TrexPktBuffer *old_buffer = m_pkt_buffer;
 
     /* replace the old one with a new one and freeze the old */
-    m_pkt_buffer = new RXPacketBuffer(old_buffer->get_capacity());
+    m_pkt_buffer = new TrexPktBuffer(old_buffer->get_capacity(), old_buffer->get_mode());
 
     return old_buffer;
 }
@@ -348,97 +243,6 @@ RXQueue::to_json() const {
     return output;
 }
 
-/**************************************
- * RX feature recorder
- * 
- *************************************/
-
-RXPacketRecorder::RXPacketRecorder() {
-    m_writer = NULL;
-    m_count  = 0;
-    m_limit  = 0;
-    m_epoch  = -1;
-    
-    m_pending_flush = false;
-}
-
-void
-RXPacketRecorder::start(const std::string &pcap, uint64_t limit) {
-    m_writer = CCapWriterFactory::CreateWriter(LIBPCAP, (char *)pcap.c_str());
-    if (m_writer == NULL) {
-        std::stringstream ss;
-        ss << "unable to create PCAP file: " << pcap;
-        throw TrexException(ss.str());
-    }
-
-    assert(limit > 0);
-    
-    m_limit = limit;
-    m_count = 0;
-    m_pending_flush = false;
-    m_pcap_filename = pcap;
-}
-
-void
-RXPacketRecorder::stop() {
-    if (!m_writer) {
-        return;
-    }
-    
-    delete m_writer;
-    m_writer = NULL;
-}
-
-void
-RXPacketRecorder::flush_to_disk() {
-    
-    if (m_writer && m_pending_flush) {
-        m_writer->flush_to_disk();
-        m_pending_flush = false;
-    }
-}
-
-void
-RXPacketRecorder::handle_pkt(const rte_mbuf_t *m) {
-    if (!m_writer) {
-        return;
-    }
-
-    dsec_t now = now_sec();
-    if (m_epoch < 0) {
-        m_epoch = now;
-    }
-
-    dsec_t dt = now - m_epoch;
-
-    CPktNsecTimeStamp t_c(dt);
-    m_pkt.time_nsec = t_c.m_time_nsec;
-    m_pkt.time_sec  = t_c.m_time_sec;
-
-    copy_mbuf((uint8_t *)m_pkt.raw, m);
-    m_pkt.pkt_len = m->pkt_len;
-    
-    m_writer->write_packet(&m_pkt);
-    m_count++;
-    m_pending_flush = true;
-    
-    if (m_count == m_limit) {
-        stop();
-    }
-    
-}
-
-Json::Value
-RXPacketRecorder::to_json() const {
-    Json::Value output = Json::objectValue;
-    
-    output["pcap_filename"] = m_pcap_filename;
-    output["limit"]         = Json::UInt64(m_limit);
-    output["count"]         = Json::UInt64(m_count);
-    
-    return output;
-}
-
 
 /**************************************
  * RX feature server (ARP, ICMP) and etc.
@@ -786,10 +590,6 @@ void RXPortManager::handle_pkt(const rte_mbuf_t *m) {
         m_latency.handle_pkt(m);
     }
 
-    if (is_feature_set(RECORDER)) {
-        m_recorder.handle_pkt(m);
-    }
-
     if (is_feature_set(QUEUE)) {
         m_queue.handle_pkt(m);
     }
@@ -797,6 +597,9 @@ void RXPortManager::handle_pkt(const rte_mbuf_t *m) {
     if (is_feature_set(SERVER)) {
         m_server.handle_pkt(m);
     }
+    
+    /* capture */
+    TrexStatelessCaptureMngr::getInstance().handle_pkt_rx(m, m_port_id);
 }
 
 int RXPortManager::process_all_pending_pkts(bool flush_rx) {
@@ -834,13 +637,6 @@ int RXPortManager::process_all_pending_pkts(bool flush_rx) {
     return cnt_p;
 }
 
-void
-RXPortManager::tick() {
-    if (is_feature_set(RECORDER)) {
-        m_recorder.flush_to_disk();
-    }
-}
-
 void
 RXPortManager::send_next_grat_arp() {
     if (is_feature_set(GRAT_ARP)) {
@@ -887,13 +683,6 @@ RXPortManager::to_json() const {
         output["latency"]["is_active"] = false;
     }
 
-    if (is_feature_set(RECORDER)) {
-        output["sniffer"] = m_recorder.to_json();
-        output["sniffer"]["is_active"] = true;
-    } else {
-        output["sniffer"]["is_active"] = false;
-    }
-
     if (is_feature_set(QUEUE)) {
         output["queue"] = m_queue.to_json();
         output["queue"]["is_active"] = true;
index 6efdae6..0cc6071 100644 (file)
@@ -25,8 +25,7 @@
 #include <stdint.h>
 #include "common/base64.h"
 
-#include "common/captureFile.h"
-
+#include "trex_stateless_pkt.h"
 
 class CPortLatencyHWBase;
 class CRFC2544Info;
@@ -80,97 +79,12 @@ public:
     CRxCoreErrCntrs      *m_err_cntrs;
 };
 
-/**                
- * describes a single saved RX packet
- * 
- */
-class RXPacket {
-public:
-
-    RXPacket(const rte_mbuf_t *m);
-
-    /* slow path and also RVO - pass by value is ok */
-    Json::Value to_json() {
-        Json::Value output;
-        output["ts"]      = m_timestamp;
-        output["binary"]  = base64_encode(m_raw, m_size);
-        return output;
-    }
-
-    ~RXPacket() {
-        if (m_raw) {
-            delete [] m_raw;
-        }
-    }
-
-private:
-
-    uint8_t   *m_raw;
-    uint16_t   m_size;
-    dsec_t     m_timestamp;
-};
-
 
 /**************************************
  * RX feature queue 
  * 
  *************************************/
 
-class RXPacketBuffer {
-public:
-
-    RXPacketBuffer(uint64_t size);
-    ~RXPacketBuffer();
-
-    /**
-     * push a packet to the buffer
-     * 
-     */
-    void push(const rte_mbuf_t *m);
-    
-    /**
-     * generate a JSON output of the queue
-     * 
-     */
-    Json::Value to_json() const;
-
-
-    bool is_empty() const {
-        return (m_head == m_tail);
-    }
-
-    bool is_full() const {
-        return ( next(m_head) == m_tail);
-    }
-
-    /**
-     * return the total amount of space possible
-     */
-    uint64_t get_capacity() const {
-        /* one slot is used for diff between full/empty */
-        return (m_size - 1);
-    }
-    
-    /**
-     * returns how many elements are in the queue
-     */
-    uint64_t get_element_count() const;
-    
-private:
-    int next(int v) const {
-        return ( (v + 1) % m_size );
-    }
-
-    /* pop in case of full queue - internal usage */
-    RXPacket * pop();
-
-    int             m_head;
-    int             m_tail;
-    int             m_size;
-    RXPacket      **m_buffer;
-};
-
-
 class RXQueue {
 public:
     RXQueue() {
@@ -191,7 +105,7 @@ public:
      * fetch the current buffer
      * return NULL if no packets
      */
-    const RXPacketBuffer * fetch();
+    const TrexPktBuffer * fetch();
     
     /**
      * stop RX queue
@@ -204,42 +118,7 @@ public:
     Json::Value to_json() const;
     
 private:
-    RXPacketBuffer  *m_pkt_buffer;
-};
-
-/**************************************
- * RX feature PCAP recorder 
- * 
- *************************************/
-
-class RXPacketRecorder {
-public:
-    RXPacketRecorder();
-    
-    ~RXPacketRecorder() {
-        stop();
-    }
-    
-    void start(const std::string &pcap, uint64_t limit);
-    void stop();
-    void handle_pkt(const rte_mbuf_t *m);
-
-    /**
-     * flush any cached packets to disk
-     * 
-     */
-    void flush_to_disk();
-    
-    Json::Value to_json() const;
-    
-private:
-    CFileWriterBase  *m_writer;
-    std::string       m_pcap_filename;
-    CCapPktRaw        m_pkt;
-    dsec_t            m_epoch;
-    uint64_t          m_limit;
-    uint64_t          m_count;
-    bool              m_pending_flush;
+    TrexPktBuffer  *m_pkt_buffer;
 };
 
 
@@ -311,7 +190,6 @@ public:
     enum feature_t {
         NO_FEATURES  = 0x0,
         LATENCY      = 0x1,
-        RECORDER     = 0x2,
         QUEUE        = 0x4,
         SERVER       = 0x8,
         GRAT_ARP     = 0x10,
@@ -354,17 +232,6 @@ public:
         unset_feature(LATENCY);
     }
 
-    /* recorder */
-    void start_recorder(const std::string &pcap, uint64_t limit_pkts) {
-        m_recorder.start(pcap, limit_pkts);
-        set_feature(RECORDER);
-    }
-
-    void stop_recorder() {
-        m_recorder.stop();
-        unset_feature(RECORDER);
-    }
-
     /* queue */
     void start_queue(uint32_t size) {
         m_queue.start(size);
@@ -376,7 +243,7 @@ public:
         unset_feature(QUEUE); 
     }
 
-    const RXPacketBuffer *get_pkt_buffer() {
+    const TrexPktBuffer *get_pkt_buffer() {
         if (!is_feature_set(QUEUE)) {
             return nullptr;
         }
@@ -414,13 +281,6 @@ public:
      */
     void handle_pkt(const rte_mbuf_t *m);
 
-    /**
-     * maintenance
-     * 
-     * @author imarom (11/24/2016)
-     */
-    void tick();
-    
     /**
      * send next grat arp (if on)
      * 
@@ -482,7 +342,6 @@ private:
     uint32_t                     m_features;
     uint8_t                      m_port_id;
     RXLatency                    m_latency;
-    RXPacketRecorder             m_recorder;
     RXQueue                      m_queue;
     RXServer                     m_server;
     RXGratARP                    m_grat_arp;