capture - draft commit 43/5243/1
authorimarom <[email protected]>
Wed, 11 Jan 2017 16:19:47 +0000 (18:19 +0200)
committerimarom <[email protected]>
Wed, 11 Jan 2017 16:19:47 +0000 (18:19 +0200)
Signed-off-by: imarom <[email protected]>
16 files changed:
scripts/automation/trex_control_plane/stl/console/trex_console.py
scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py
scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/common.py
scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py
src/rpc-server/commands/trex_rpc_cmd_general.cpp
src/rpc-server/commands/trex_rpc_cmds.h
src/rpc-server/trex_rpc_cmds_table.cpp
src/stateless/cp/trex_stateless.cpp
src/stateless/cp/trex_stateless.h
src/stateless/cp/trex_stateless_port.h
src/stateless/messaging/trex_stateless_messaging.cpp
src/stateless/messaging/trex_stateless_messaging.h
src/stateless/rx/trex_stateless_capture.cpp
src/stateless/rx/trex_stateless_capture.h
src/stateless/rx/trex_stateless_rx_core.cpp
src/stateless/rx/trex_stateless_rx_core.h

index 38a1fca..b0ab70e 100755 (executable)
@@ -348,8 +348,8 @@ class TRexConsole(TRexGeneralCmd):
         
     @verify_connected
     def do_capture (self, line):
-        '''Start PCAP capturing on port'''
-        self.stateless_client.start_capture_line(line)
+        '''Manage PCAP captures'''
+        self.stateless_client.capture_line(line)
 
     def help_capture (self):
         self.do_capture("-h")
index 1b57218..d75c554 100755 (executable)
@@ -26,6 +26,7 @@ import random
 import json
 import traceback
 import os.path
+import argparse
 
 ############################     logger     #############################
 ############################                #############################
@@ -2961,7 +2962,7 @@ class STLClient(object):
             Resolves ports (ARP resolution)
 
             :parameters:
-                ports          - for which ports to apply a unique sniffer (each port gets a unique file)
+                ports          - which ports to resolve
                 retires        - how many times to retry on each port (intervals of 100 milliseconds)
                 verbose        - log for each request the response
             :raises:
@@ -3022,7 +3023,7 @@ class STLClient(object):
             
         self.logger.pre_cmd("Starting PCAP capturing up to {0} packets".format(limit))
         
-        rc = self._transmit("start_capture", params = {'limit': limit, 'tx': tx_ports, 'rx': rx_ports})
+        rc = self._transmit("capture", params = {'command': 'start', 'limit': limit, 'tx': tx_ports, 'rx': rx_ports})
         self.logger.post_cmd(rc)
 
 
@@ -3032,24 +3033,82 @@ class STLClient(object):
 
 
     @__api_check(True)
-    def stop_capture (self, ports = None):
+    def stop_capture (self, capture_id, output_filename):
         """
-            Removes RX sniffer from port(s)
+            Stops an active capture
+
+            :parameters:
+                capture_id        - an active capture ID to stop
+                output_filename   - output filename to save capture
 
             :raises:
                 + :exe:'STLError'
 
         """
-        ports = ports if ports is not None else self.get_acquired_ports()
-        ports = self._validate_port_list(ports)
 
-        self.logger.pre_cmd("Removing RX sniffers on port(s) {0}:".format(ports))
-        rc = self.__remove_rx_sniffer(ports)
+        
+        
+        # stopping a capture requires:
+        # 1. stopping
+        # 2. fetching
+        # 3. saving to file
+        
+        # stop
+        
+        self.logger.pre_cmd("Stopping PCAP capture {0}".format(capture_id))
+        rc = self._transmit("capture", params = {'command': 'stop', 'capture_id': capture_id})
         self.logger.post_cmd(rc)
+        if not rc:
+            raise STLError(rc)
+        
+        # pkt count
+        pkt_count = rc.data()['pkt_count']
+            
+        if not output_filename or pkt_count == 0:
+            return
+        
+        self.logger.pre_cmd("Writing {0} packets to '{1}'".format(pkt_count, output_filename))
+
+        # create a PCAP file
+        writer = RawPcapWriter(output_filename, linktype = 1)
+        writer._write_header(None)
+        
+        # fetch
+        while True:
+            rc = self._transmit("capture", params = {'command': 'fetch', 'capture_id': capture_id, 'pkt_limit': 50})
+            if not rc:
+                self.logger.post_cmd(rc)
+                raise STLError(rc)
+        
+            pkts = rc.data()['pkts']
+            for pkt in pkts:
+                ts = pkt['ts']
+                pkt_bin = base64.b64decode(pkt['binary'])
+                writer._write_packet(pkt_bin, sec = 0, usec = 0)
+                
+            if rc.data()['pending'] == 0:
+                break
+        
+        self.logger.post_cmd(rc)
+        
+
+    # get capture status
+    @__api_check(True)
+    def get_capture_status (self):
+        """
+            returns a list of all active captures
+            each element in the list is an object containing
+            info about the capture
+
+        """
+
+        rc = self._transmit("capture", params = {'command': 'status'})
 
         if not rc:
             raise STLError(rc)
 
+        return rc.data()
+
     
     @__api_check(True)
     def set_rx_queue (self, ports = None, size = 1000):
@@ -3766,23 +3825,71 @@ class STLClient(object):
 
              
     @__console
-    def start_capture_line (self, line):
-        '''Starts PCAP recorder on port(s)'''
+    def capture_line (self, line):
+        '''Manage PCAP recorders'''
 
-        parser = parsing_opts.gen_parser(self,
-                                         "capture",
-                                         self.start_capture_line.__doc__,
-                                         parsing_opts.TX_PORT_LIST,
-                                         parsing_opts.RX_PORT_LIST,
-                                         parsing_opts.LIMIT)
+        # default
+        if not line:
+            line = "show"
+
+        parser = parsing_opts.gen_parser(self, "capture", self.capture_line.__doc__)
+        subparsers = parser.add_subparsers(title = "commands", dest="commands")
+
+        # start
+        start_parser = subparsers.add_parser('start', help = "starts a new capture")
+        start_parser.add_arg_list(parsing_opts.TX_PORT_LIST,
+                                  parsing_opts.RX_PORT_LIST,
+                                  parsing_opts.LIMIT)
+
+        # stop
+        stop_parser = subparsers.add_parser('stop', help = "stops an active capture")
+        stop_parser.add_arg_list(parsing_opts.CAPTURE_ID,
+                                 parsing_opts.OUTPUT_FILENAME)
+
+        # show
+        show_parser = subparsers.add_parser('show', help = "show all active captures")
+
+        opts = parser.parse_args(line.split())
 
-        opts = parser.parse_args(line.split(), default_ports = self.get_acquired_ports(), verify_acquired = True)
         if not opts:
             return opts
 
-        self.start_capture(opts.tx_port_list, opts.rx_port_list, opts.limit)
+        # start
+        if opts.commands == 'start':
+            if not opts.tx_port_list and not opts.rx_port_list:
+                start_parser.formatted_error('please provide either --tx or --rx')
+                return
+
+            self.start_capture(opts.tx_port_list, opts.rx_port_list, opts.limit)
+
+        # stop
+        elif opts.commands == 'stop':
+            self.stop_capture(opts.capture_id, opts.output_filename)
+
+        # show
+        else:
+            data = self.get_capture_status()
+
+            stats_table = text_tables.TRexTextTable()
+            stats_table.set_cols_align(["c"] * 6)
+            stats_table.set_cols_width([15] * 6)
+
+            for elem in data:
+                row = [elem['id'],
+                       elem['state'],
+                       '[{0}/{1}]'.format(elem['count'], elem['limit']),
+                       format_num(elem['bytes'], suffix = 'B'),
+                       bitfield_to_str(elem['filter']['tx']),
+                       bitfield_to_str(elem['filter']['rx'])]
+                
+                stats_table.add_rows([row], header=False)
+
+            stats_table.header(['ID', 'Status', 'Count', 'Bytes', 'TX Ports', 'RX Ports'])
+            text_tables.print_table_with_header(stats_table, "Captures")
+            
 
         return RC_OK()
+
         
 
     @__console
index cbbacb2..c386451 100644 (file)
@@ -107,4 +107,18 @@ def list_remove_dup (l):
             
     return tmp
             
-        
+def bitfield_to_list (bf):
+    rc = []
+    bitpos = 0
+
+    while bf > 0:
+        if bf & 0x1:
+            rc.append(bitpos)
+        bitpos += 1
+        bf = bf >> 1
+
+    return rc
+
+def bitfield_to_str (bf):
+    lst = bitfield_to_list(bf)
+    return "-" if not lst else ', '.join([str(x) for x in lst])
index 265c43f..cb594ef 100755 (executable)
@@ -69,6 +69,8 @@ RX_PORT_LIST
 SRC_IPV4
 DST_IPV4
 
+CAPTURE_ID
+
 GLOBAL_STATS
 PORT_STATS
 PORT_STATUS
@@ -81,12 +83,14 @@ EXTENDED_INC_ZERO_STATS
 
 STREAMS_MASK
 CORE_MASK_GROUP
+CAPTURE_PORTS_GROUP
 
 # ALL_STREAMS
 # STREAM_LIST_WITH_ALL
 
 # list of ArgumentGroup types
 MUTEX
+NON_MUTEX
 
 '''
 
@@ -392,7 +396,6 @@ OPTIONS_DB = {MULTIPLIER: ArgumentPack(['-m', '--multiplier'],
                                             {'help': 'Output PCAP filename',
                                              'dest': 'output_filename',
                                              'default': None,
-                                             'required': True,
                                              'type': str}),
 
 
@@ -612,6 +615,12 @@ OPTIONS_DB = {MULTIPLIER: ArgumentPack(['-m', '--multiplier'],
                                           'help': 'A list of ports to capture on the RX side',
                                           'default': []}),
                 
+              CAPTURE_ID: ArgumentPack(['-i', '--id'],
+                                  {'help': "capture ID to remove",
+                                   'dest': "capture_id",
+                                   'type': int,
+                                   'required': True}),
+
               # advanced options
               PORT_LIST_WITH_ALL: ArgumentGroup(MUTEX, [PORT_LIST,
                                                         ALL_PORTS],
@@ -636,6 +645,7 @@ OPTIONS_DB = {MULTIPLIER: ArgumentPack(['-m', '--multiplier'],
                                                       CORE_MASK],
                                               {'required': False}),
 
+              CAPTURE_PORTS_GROUP: ArgumentGroup(NON_MUTEX, [TX_PORT_LIST, RX_PORT_LIST], {}),
               }
 
 class _MergeAction(argparse._AppendAction):
@@ -654,12 +664,30 @@ class _MergeAction(argparse._AppendAction):
 
 class CCmdArgParser(argparse.ArgumentParser):
 
-    def __init__(self, stateless_client, *args, **kwargs):
+    def __init__(self, stateless_client = None, x = None, *args, **kwargs):
         super(CCmdArgParser, self).__init__(*args, **kwargs)
         self.stateless_client = stateless_client
         self.cmd_name = kwargs.get('prog')
         self.register('action', 'merge', _MergeAction)
 
+
+    def add_arg_list (self, *args):
+        populate_parser(self, *args)
+
+    def add_subparsers(self, *args, **kwargs):
+        sub = super(CCmdArgParser, self).add_subparsers(*args, **kwargs)
+
+        add_parser = sub.add_parser
+        stateless_client = self.stateless_client
+
+        def add_parser_hook (self, *args, **kwargs):
+            parser = add_parser(self, *args, **kwargs)
+            parser.stateless_client = stateless_client
+            return parser
+
+        sub.add_parser = add_parser_hook
+        return sub
+
     # hook this to the logger
     def _print_message(self, message, file=None):
         self.stateless_client.logger.log(message)
@@ -730,13 +758,15 @@ class CCmdArgParser(argparse.ArgumentParser):
             # recover from system exit scenarios, such as "help", or bad arguments.
             return RC_ERR("'{0}' - {1}".format(self.cmd_name, "no action"))
 
+    def formatted_error (self, msg):
+        self.print_usage()
+        self.stateless_client.logger.log(msg)
+
 
 def get_flags (opt):
     return OPTIONS_DB[opt].name_or_flags
 
-def gen_parser(stateless_client, op_name, description, *args):
-    parser = CCmdArgParser(stateless_client, prog=op_name, conflict_handler='resolve',
-                           description=description)
+def populate_parser (parser, *args):
     for param in args:
         try:
 
@@ -752,6 +782,12 @@ def gen_parser(stateless_client, op_name, description, *args):
                     for sub_argument in argument.args:
                         group.add_argument(*OPTIONS_DB[sub_argument].name_or_flags,
                                            **OPTIONS_DB[sub_argument].options)
+
+                elif argument.type == NON_MUTEX:
+                    group = parser.add_argument_group(**argument.options)
+                    for sub_argument in argument.args:
+                        group.add_argument(*OPTIONS_DB[sub_argument].name_or_flags,
+                                           **OPTIONS_DB[sub_argument].options)
                 else:
                     # ignore invalid objects
                     continue
@@ -764,6 +800,12 @@ def gen_parser(stateless_client, op_name, description, *args):
         except KeyError as e:
             cause = e.args[0]
             raise KeyError("The attribute '{0}' is missing as a field of the {1} option.\n".format(cause, param))
+
+def gen_parser(stateless_client, op_name, description, *args):
+    parser = CCmdArgParser(stateless_client, prog=op_name, conflict_handler='resolve',
+                           description=description)
+
+    populate_parser(parser, *args)
     return parser
 
 
index ec5c315..80f69fa 100644 (file)
@@ -29,6 +29,7 @@ limitations under the License.
 
 #include "trex_stateless_rx_core.h"
 #include "trex_stateless_capture.h"
+#include "trex_stateless_messaging.h"
 
 #include <fstream>
 #include <iostream>
@@ -844,13 +845,38 @@ TrexRpcCmdSetL3::_run(const Json::Value &params, Json::Value &result) {
     return (TREX_RPC_CMD_OK);    
     
 }
-   
+
+
 /**
- * starts PCAP capturing
+ * capture command tree
  * 
  */
 trex_rpc_cmd_rc_e
-TrexRpcCmdStartCapture::_run(const Json::Value &params, Json::Value &result) {
+TrexRpcCmdCapture::_run(const Json::Value &params, Json::Value &result) {
+    const std::string cmd = parse_choice(params, "command", {"start", "stop", "fetch", "status"}, result);
+    
+    if (cmd == "start") {
+        parse_cmd_start(params, result);
+    } else if (cmd == "stop") {
+        parse_cmd_stop(params, result);
+    } else if (cmd == "fetch") {
+        parse_cmd_fetch(params, result);
+    } else if (cmd == "status") {
+        parse_cmd_status(params, result);
+    } else {
+        /* can't happen */
+        assert(0);
+    }
+    
+    return TREX_RPC_CMD_OK;
+}
+
+/**
+ * starts PCAP capturing
+ * 
+ */
+void
+TrexRpcCmdCapture::parse_cmd_start(const Json::Value &params, Json::Value &result) {
     
     uint32_t limit             = parse_uint32(params, "limit", result);
     const Json::Value &tx_json = parse_array(params, "tx", result);
@@ -881,8 +907,90 @@ TrexRpcCmdStartCapture::_run(const Json::Value &params, Json::Value &result) {
         }
     }
     
-    get_stateless_obj()->start_capture(filter, limit);
+    static MsgReply<TrexCaptureRCStart> reply;
+    reply.reset();
+    
+    TrexStatelessRxCaptureStart *start_msg = new TrexStatelessRxCaptureStart(filter, limit, reply);
+    get_stateless_obj()->send_msg_to_rx(start_msg);
+    
+    TrexCaptureRCStart rc = reply.wait_for_reply();
+    if (!rc) {
+        generate_execute_err(result, rc.get_err());
+    }
     
     result["result"] = Json::objectValue;
-    return (TREX_RPC_CMD_OK);
 }
+
+/**
+ * stops PCAP capturing
+ * 
+ */
+void
+TrexRpcCmdCapture::parse_cmd_stop(const Json::Value &params, Json::Value &result) {
+    
+    uint32_t capture_id = parse_uint32(params, "capture_id", result);
+    
+    static MsgReply<TrexCaptureRCStop> reply;
+    reply.reset();
+    
+    TrexStatelessRxCaptureStop *stop_msg = new TrexStatelessRxCaptureStop(capture_id, reply);
+    get_stateless_obj()->send_msg_to_rx(stop_msg);
+    
+    TrexCaptureRCStop rc = reply.wait_for_reply();
+    if (!rc) {
+        generate_execute_err(result, rc.get_err());
+    }
+    
+    result["result"]["pkt_count"] = rc.get_pkt_count();
+}
+
+/**
+ * gets the status of all captures in the system
+ * 
+ */
+void
+TrexRpcCmdCapture::parse_cmd_status(const Json::Value &params, Json::Value &result) {
+    
+    /* generate a status command */
+    
+    static MsgReply<TrexCaptureRCStatus> reply;
+    reply.reset();
+    
+    TrexStatelessRxCaptureStatus *status_msg = new TrexStatelessRxCaptureStatus(reply);
+    get_stateless_obj()->send_msg_to_rx(status_msg);
+    
+    TrexCaptureRCStatus rc = reply.wait_for_reply();
+    if (!rc) {
+        generate_execute_err(result, rc.get_err());
+    }
+    
+    result["result"] = rc.get_status();
+}
+
+/**
+ * fetch packets from a capture
+ * 
+ */
+void
+TrexRpcCmdCapture::parse_cmd_fetch(const Json::Value &params, Json::Value &result) {
+    
+    uint32_t capture_id = parse_uint32(params, "capture_id", result);
+    uint32_t pkt_limit  = parse_uint32(params, "pkt_limit", result);
+    
+    /* generate a fetch command */
+    
+    static MsgReply<TrexCaptureRCFetch> reply;
+    reply.reset();
+    
+    TrexStatelessRxCaptureFetch *fetch_msg = new TrexStatelessRxCaptureFetch(capture_id, pkt_limit, reply);
+    get_stateless_obj()->send_msg_to_rx(fetch_msg);
+    
+    TrexCaptureRCFetch rc = reply.wait_for_reply();
+    if (!rc) {
+        generate_execute_err(result, rc.get_err());
+    }
+    
+    result["result"]["pkts"]    = rc.get_pkt_buffer()->to_json();
+    result["result"]["pending"] = rc.get_pending();
+}
+
index 1ea63cc..bf78ff8 100644 (file)
@@ -160,7 +160,13 @@ TREX_RPC_CMD_DEFINE(TrexRpcCmdGetRxQueuePkts, "get_rx_queue_pkts", 1, true, APIC
 
 TREX_RPC_CMD_DEFINE(TrexRpcCmdSetServiceMode, "service", 2, true, APIClass::API_CLASS_TYPE_CORE);
 
-TREX_RPC_CMD_DEFINE(TrexRpcCmdStartCapture, "start_capture", 3, false, APIClass::API_CLASS_TYPE_CORE);
+TREX_RPC_CMD_DEFINE_EXTENDED(TrexRpcCmdCapture,  "capture", 1, false, APIClass::API_CLASS_TYPE_CORE, 
+    void parse_cmd_start(const Json::Value &msg, Json::Value &result);
+    void parse_cmd_stop(const Json::Value &msg, Json::Value &result);
+    void parse_cmd_status(const Json::Value &msg, Json::Value &result);
+    void parse_cmd_fetch(const Json::Value &msg, Json::Value &result);
+);
+
 
 #endif /* __TREX_RPC_CMD_H__ */
 
index 3d4d5a2..2af9f4f 100644 (file)
@@ -79,7 +79,7 @@ TrexRpcCommandsTable::TrexRpcCommandsTable() {
     register_command(new TrexRpcCmdSetL2());
     register_command(new TrexRpcCmdSetL3());
     
-    register_command(new TrexRpcCmdStartCapture());
+    register_command(new TrexRpcCmdCapture());
 }
 
 
index 32babbf..6ab9b41 100644 (file)
@@ -19,7 +19,6 @@ See the License for the specific language governing permissions and
 limitations under the License.
 */
 
-//#include <sched.h>
 #include <iostream>
 #include <unistd.h>
 
@@ -142,35 +141,11 @@ TrexStateless::get_dp_core_count() {
     return m_platform_api->get_dp_core_count();
 }
 
-capture_id_t
-TrexStateless::start_capture(const CaptureFilter &filter, uint64_t limit) {
-    static MsgReply<capture_id_t> reply;
-    
-    reply.reset();
-    
-    CNodeRing *ring = CMsgIns::Ins()->getCpRx()->getRingCpToDp(0);
-    TrexStatelessRxStartCapture *msg = new TrexStatelessRxStartCapture(filter, limit, reply);
-     
-    ring->Enqueue((CGenNode *)msg);
-    
-    capture_id_t new_id = reply.wait_for_reply();
-    
-    return (new_id);
-}
+void
+TrexStateless::send_msg_to_rx(TrexStatelessCpToRxMsgBase *msg) const {
 
-capture_id_t
-TrexStateless::stop_capture(capture_id_t capture_id) {
-    static MsgReply<capture_id_t> reply;
-    
-    reply.reset();
-    
     CNodeRing *ring = CMsgIns::Ins()->getCpRx()->getRingCpToDp(0);
-    TrexStatelessRxStopCapture *msg = new TrexStatelessRxStopCapture(capture_id, reply);
-     
     ring->Enqueue((CGenNode *)msg);
-    
-    capture_id_t rc = reply.wait_for_reply();
-    
-    return (rc);
 }
 
+
index 33f16ce..87d227f 100644 (file)
@@ -102,7 +102,6 @@ public:
  * defines the TRex stateless operation mode
  * 
  */
-class CaptureFilter;
 class TrexStateless {
 public:
 
@@ -133,16 +132,9 @@ public:
 
 
     /**
-     * starts a capture on a 'filter' of ports 
-     * with a limit of packets 
+     * send a message to the RX core
      */
-    capture_id_t start_capture(const CaptureFilter &filter, uint64_t limit);
-    
-    /**
-     * stops an active capture
-     * 
-     */
-    capture_id_t stop_capture(capture_id_t capture_id);
+    void send_msg_to_rx(TrexStatelessCpToRxMsgBase *msg) const;
     
     /**
      * shutdown the server
index 2cc1b9c..0ef8ae6 100644 (file)
@@ -140,7 +140,7 @@ public:
         }
 
         if (TrexStatelessCaptureMngr::getInstance().is_active(m_port_id)) {
-            throw TrexException("unable to disable service - an active capture on port " + std::to_string(m_port_id) + " exists");
+            throw TrexException("unable to disable service mode - an active capture on port " + std::to_string(m_port_id) + " exists");
         }
         
         m_port_attr->set_rx_filter_mode(RX_FILTER_MODE_HW);
@@ -439,19 +439,6 @@ public:
 
     void get_pci_info(std::string &pci_addr, int &numa_node);
 
-
-    /**
-     * starts capturing packets
-     * 
-     */
-    void start_capture(capture_mode_e mode, uint64_t limit);
-
-    /**
-     * stops capturing packets
-     * 
-     */
-    void stop_capture();
-
     /**
      * start RX queueing of packets
      * 
index f441c69..b9bb1d1 100644 (file)
@@ -262,24 +262,58 @@ bool TrexStatelessRxQuit::handle (CRxCoreStateless *rx_core) {
 
 
 bool
-TrexStatelessRxStartCapture::handle(CRxCoreStateless *rx_core) {
-    capture_id_t capture_id = rx_core->start_capture(m_limit, m_filter);
+TrexStatelessRxCaptureStart::handle(CRxCoreStateless *rx_core) {
+    
+    TrexCaptureRCStart start_rc;
+    
+    TrexStatelessCaptureMngr::getInstance().start(m_filter, m_limit, start_rc);
+    
+    /* mark as done */
+    m_reply.set_reply(start_rc);
+    
+    return true;
+}
+
+bool
+TrexStatelessRxCaptureStop::handle(CRxCoreStateless *rx_core) {
+    
+    TrexCaptureRCStop stop_rc;
+    
+    TrexStatelessCaptureMngr::getInstance().stop(m_capture_id, stop_rc);
+    
+    /* mark as done */
+    m_reply.set_reply(stop_rc);
+    
+    return true;
+}
 
+bool
+TrexStatelessRxCaptureFetch::handle(CRxCoreStateless *rx_core) {
+    
+    TrexCaptureRCFetch fetch_rc;
+    
+    TrexStatelessCaptureMngr::getInstance().fetch(m_capture_id, m_pkt_limit, fetch_rc);
+    
     /* mark as done */
-    m_reply.set_reply(capture_id);
+    m_reply.set_reply(fetch_rc);
     
     return true;
 }
 
 bool
-TrexStatelessRxStopCapture::handle(CRxCoreStateless *rx_core) {
-    capture_id_t rc = rx_core->stop_capture(m_capture_id);
+TrexStatelessRxCaptureStatus::handle(CRxCoreStateless *rx_core) {
+    
+    TrexCaptureRCStatus status_rc;
     
-    m_reply.set_reply(rc);
+    status_rc.set_status(TrexStatelessCaptureMngr::getInstance().to_json()); 
+    
+    /* mark as done */
+    m_reply.set_reply(status_rc);
     
     return true;
 }
 
+
 bool
 TrexStatelessRxStartQueue::handle(CRxCoreStateless *rx_core) {
     rx_core->start_queue(m_port_id, m_size);
index 5f4978f..4027d07 100644 (file)
@@ -485,12 +485,16 @@ class TrexStatelessRxQuit : public TrexStatelessCpToRxMsgBase {
 };
 
 
+class TrexStatelessRxCapture : public TrexStatelessCpToRxMsgBase {
+public:
+    virtual bool handle (CRxCoreStateless *rx_core) = 0;
+};
 
-class TrexStatelessRxStartCapture : public TrexStatelessCpToRxMsgBase {
+class TrexStatelessRxCaptureStart : public TrexStatelessRxCapture {
 public:
-    TrexStatelessRxStartCapture(const CaptureFilter& filter,
+    TrexStatelessRxCaptureStart(const CaptureFilter& filter,
                                 uint64_t limit,
-                                MsgReply<capture_id_t> &reply) : m_reply(reply) {
+                                MsgReply<TrexCaptureRCStart> &reply) : m_reply(reply) {
         
         m_limit  = limit;
         m_filter = filter;
@@ -499,24 +503,52 @@ public:
     virtual bool handle(CRxCoreStateless *rx_core);
 
 private:
-    uint8_t                    m_port_id;
-    uint64_t                   m_limit;
-    CaptureFilter              m_filter;
-    MsgReply<capture_id_t>    &m_reply;
+    uint8_t                          m_port_id;
+    uint64_t                         m_limit;
+    CaptureFilter                    m_filter;
+    MsgReply<TrexCaptureRCStart>    &m_reply;
+};
+
+
+class TrexStatelessRxCaptureStop : public TrexStatelessRxCapture {
+public:
+    TrexStatelessRxCaptureStop(capture_id_t capture_id, MsgReply<TrexCaptureRCStop> &reply) : m_reply(reply) {
+        m_capture_id = capture_id;
+    }
+
+    virtual bool handle(CRxCoreStateless *rx_core);
+
+private:
+    capture_id_t                   m_capture_id;
+    MsgReply<TrexCaptureRCStop>   &m_reply;
 };
 
 
-class TrexStatelessRxStopCapture : public TrexStatelessCpToRxMsgBase {
+class TrexStatelessRxCaptureFetch : public TrexStatelessRxCapture {
 public:
-    TrexStatelessRxStopCapture(capture_id_t capture_id, MsgReply<capture_id_t> &reply) : m_reply(reply) {
+    TrexStatelessRxCaptureFetch(capture_id_t capture_id, uint32_t pkt_limit, MsgReply<TrexCaptureRCFetch> &reply) : m_reply(reply) {
         m_capture_id = capture_id;
+        m_pkt_limit  = pkt_limit;
+    }
+
+    virtual bool handle(CRxCoreStateless *rx_core);
+
+private:
+    capture_id_t                   m_capture_id;
+    uint32_t                       m_pkt_limit;
+    MsgReply<TrexCaptureRCFetch>  &m_reply;
+};
+
+
+class TrexStatelessRxCaptureStatus : public TrexStatelessRxCapture {
+public:
+    TrexStatelessRxCaptureStatus(MsgReply<TrexCaptureRCStatus> &reply) : m_reply(reply) {
     }
 
     virtual bool handle(CRxCoreStateless *rx_core);
 
 private:
-    capture_id_t              m_capture_id;
-    MsgReply<capture_id_t>   &m_reply;
+    MsgReply<TrexCaptureRCStatus>   &m_reply;
 };
 
 
index 4ed126c..85be7ae 100644 (file)
@@ -25,6 +25,7 @@ TrexStatelessCapture::TrexStatelessCapture(capture_id_t id, uint64_t limit, cons
     m_id         = id;
     m_pkt_buffer = new TrexPktBuffer(limit, TrexPktBuffer::MODE_DROP_TAIL);
     m_filter     = filter;
+    m_state      = STATE_ACTIVE;
 }
 
 TrexStatelessCapture::~TrexStatelessCapture() {
@@ -35,9 +36,15 @@ TrexStatelessCapture::~TrexStatelessCapture() {
 
 void
 TrexStatelessCapture::handle_pkt_tx(const TrexPkt *pkt) {
+
+    if (m_state != STATE_ACTIVE) {
+        delete pkt;
+        return;
+    }
     
     /* if not in filter - back off */
     if (!m_filter.in_filter(pkt)) {
+        delete pkt;
         return;
     }
     
@@ -46,6 +53,11 @@ TrexStatelessCapture::handle_pkt_tx(const TrexPkt *pkt) {
 
 void
 TrexStatelessCapture::handle_pkt_rx(const rte_mbuf_t *m, int port) {
+
+    if (m_state != STATE_ACTIVE) {
+        return;
+    }
+    
     if (!m_filter.in_rx(port)) {
         return;
     }
@@ -53,6 +65,56 @@ TrexStatelessCapture::handle_pkt_rx(const rte_mbuf_t *m, int port) {
     m_pkt_buffer->push(m);
 }
 
+
+Json::Value
+TrexStatelessCapture::to_json() const {
+    Json::Value output = Json::objectValue;
+
+    output["id"]     = Json::UInt64(m_id);
+    output["filter"] = m_filter.to_json();
+    output["count"]  = m_pkt_buffer->get_element_count();
+    output["bytes"]  = m_pkt_buffer->get_bytes();
+    output["limit"]  = m_pkt_buffer->get_capacity();
+    
+    switch (m_state) {
+    case STATE_ACTIVE:
+        output["state"]  = "ACTIVE";
+        break;
+        
+    case STATE_STOPPED:
+        output["state"]  = "STOPPED";
+        break;
+        
+    default:
+        assert(0);
+        
+    }
+    
+    return output;
+}
+
+TrexPktBuffer *
+TrexStatelessCapture::fetch(uint32_t pkt_limit, uint32_t &pending) {
+    
+    /* if the total sum of packets is within the limit range - take it */
+    if (m_pkt_buffer->get_element_count() <= pkt_limit) {
+        TrexPktBuffer *current = m_pkt_buffer;
+        m_pkt_buffer = new TrexPktBuffer(m_pkt_buffer->get_capacity(), m_pkt_buffer->get_mode());
+        pending = 0;
+        return current;
+    }
+    
+    /* harder part - partial fetch */
+    TrexPktBuffer *partial = new TrexPktBuffer(pkt_limit);
+    for (int i = 0; i < pkt_limit; i++) {
+        const TrexPkt *pkt = m_pkt_buffer->pop();
+        partial->push(pkt);
+    }
+    
+    pending = m_pkt_buffer->get_element_count();
+    return partial;
+}
+
 void
 TrexStatelessCaptureMngr::update_global_filter() {
     CaptureFilter new_filter;
@@ -64,11 +126,25 @@ TrexStatelessCaptureMngr::update_global_filter() {
     m_global_filter = new_filter;
 }
 
-capture_id_t
-TrexStatelessCaptureMngr::add(uint64_t limit, const CaptureFilter &filter) {
+TrexStatelessCapture *
+TrexStatelessCaptureMngr::lookup(capture_id_t capture_id) {
     
+    for (int i = 0; i < m_captures.size(); i++) {
+        if (m_captures[i]->get_id() == capture_id) {
+            return m_captures[i];
+        }
+    }
+    
+    /* does not exist */
+    return nullptr;
+}
+
+void
+TrexStatelessCaptureMngr::start(const CaptureFilter &filter, uint64_t limit, TrexCaptureRCStart &rc) {
+
     if (m_captures.size() > MAX_CAPTURE_SIZE) {
-        return CAPTURE_TOO_MANY_CAPTURES;
+        rc.set_err(TrexCaptureRC::RC_CAPTURE_LIMIT_REACHED);
+        return;
     }
     
 
@@ -79,15 +155,46 @@ TrexStatelessCaptureMngr::add(uint64_t limit, const CaptureFilter &filter) {
     /* update global filter */
     update_global_filter();
     
-    return new_id;
+    /* result */
+    rc.set_new_id(new_id);
+}
+
+void
+TrexStatelessCaptureMngr::stop(capture_id_t capture_id, TrexCaptureRCStop &rc) {
+    TrexStatelessCapture *capture = lookup(capture_id);
+    if (!capture) {
+        rc.set_err(TrexCaptureRC::RC_CAPTURE_NOT_FOUND);
+        return;
+    }
+    
+    capture->stop();
+    rc.set_count(capture->get_pkt_count());
 }
 
-capture_id_t
-TrexStatelessCaptureMngr::remove(capture_id_t id) {
+void
+TrexStatelessCaptureMngr::fetch(capture_id_t capture_id, uint32_t pkt_limit, TrexCaptureRCFetch &rc) {
+    TrexStatelessCapture *capture = lookup(capture_id);
+    if (!capture) {
+        rc.set_err(TrexCaptureRC::RC_CAPTURE_NOT_FOUND);
+        return;
+    }
+    if (capture->is_active()) {
+        rc.set_err(TrexCaptureRC::RC_CAPTURE_FETCH_UNDER_ACTIVE);
+        return;
+    }
     
+    uint32_t pending = 0;
+    TrexPktBuffer *pkt_buffer = capture->fetch(pkt_limit, pending);
+    
+    rc.set_pkt_buffer(pkt_buffer, pending);
+}
+
+void
+TrexStatelessCaptureMngr::remove(capture_id_t capture_id, TrexCaptureRCRemove &rc) {
+
     int index = -1;
     for (int i = 0; i < m_captures.size(); i++) {
-        if (m_captures[i]->get_id() == id) {
+        if (m_captures[i]->get_id() == capture_id) {
             index = i;
             break;
         }
@@ -95,24 +202,26 @@ TrexStatelessCaptureMngr::remove(capture_id_t id) {
     
     /* does not exist */
     if (index == -1) {
-        return CAPTURE_ID_NOT_FOUND;
+        rc.set_err(TrexCaptureRC::RC_CAPTURE_NOT_FOUND);
+        return;
     }
     
     TrexStatelessCapture *capture =  m_captures[index];
     m_captures.erase(m_captures.begin() + index);
     
+    /* free memory */
     delete capture;
     
     /* update global filter */
     update_global_filter();
-    
-    return id;
 }
 
 void
 TrexStatelessCaptureMngr::reset() {
+    TrexCaptureRCRemove dummy;
+    
     while (m_captures.size() > 0) {
-        remove(m_captures[0]->get_id());
+        remove(m_captures[0]->get_id(), dummy);
     }
 }
 
@@ -130,3 +239,14 @@ TrexStatelessCaptureMngr::handle_pkt_rx_slow_path(const rte_mbuf_t *m, int port)
     }
 }
 
+Json::Value
+TrexStatelessCaptureMngr::to_json() const {
+    Json::Value lst = Json::arrayValue;
+
+    for (TrexStatelessCapture *capture : m_captures) {
+        lst.append(capture->to_json());
+    }
+
+    return lst;
+}
+
index 4d0b6a7..6cd25a9 100644 (file)
@@ -22,7 +22,146 @@ limitations under the License.
 #define __TREX_STATELESS_CAPTURE_H__
 
 #include <stdint.h>
+#include <assert.h>
+
 #include "trex_stateless_pkt.h"
+#include "trex_stateless_capture_msg.h"
+
+typedef int64_t capture_id_t;
+
+class TrexCaptureRC {
+public:
+
+    TrexCaptureRC() {
+        m_rc = RC_INVALID;
+        m_pkt_buffer = NULL;
+    }
+
+    enum rc_e {
+        RC_INVALID = 0,
+        RC_OK = 1,
+        RC_CAPTURE_NOT_FOUND,
+        RC_CAPTURE_LIMIT_REACHED,
+        RC_CAPTURE_FETCH_UNDER_ACTIVE
+    };
+
+    bool operator !() const {
+        return (m_rc != RC_OK);
+    }
+    
+    std::string get_err() const {
+        assert(m_rc != RC_INVALID);
+        
+        switch (m_rc) {
+        case RC_OK:
+            return "";
+        case RC_CAPTURE_LIMIT_REACHED:
+            return "capture limit has reached";
+        case RC_CAPTURE_NOT_FOUND:
+            return "capture ID not found";
+        case RC_CAPTURE_FETCH_UNDER_ACTIVE:
+            return "fetch command cannot be executed on an active capture";
+        default:
+            assert(0);
+        }
+    }
+    
+    void set_err(rc_e rc) {
+        m_rc = rc;
+    }
+    
+    Json::Value get_json() const {
+        return m_json_rc;
+    }
+    
+public:
+    rc_e              m_rc;
+    capture_id_t      m_capture_id;
+    TrexPktBuffer    *m_pkt_buffer;
+    Json::Value       m_json_rc;
+};
+
+class TrexCaptureRCStart : public TrexCaptureRC {
+public:
+
+    void set_new_id(capture_id_t new_id) {
+        m_capture_id = new_id;
+        m_rc = RC_OK;
+    }
+    
+    capture_id_t get_new_id() const {
+        return m_capture_id;
+    }
+    
+private:
+    capture_id_t  m_capture_id;
+};
+
+
+class TrexCaptureRCStop : public TrexCaptureRC {
+public:
+    void set_count(uint32_t pkt_count) {
+        m_pkt_count = pkt_count;
+        m_rc = RC_OK;
+    }
+    
+    uint32_t get_pkt_count() const {
+        return m_pkt_count;
+    }
+    
+private:
+    uint32_t m_pkt_count;
+};
+
+class TrexCaptureRCFetch : public TrexCaptureRC {
+public:
+
+    TrexCaptureRCFetch() {
+        m_pkt_buffer = nullptr;
+        m_pending    = 0;
+    }
+    
+    void set_pkt_buffer(const TrexPktBuffer *pkt_buffer, uint32_t pending) {
+        m_pkt_buffer = pkt_buffer;
+        m_pending = pending;
+        m_rc = RC_OK;
+    }
+    
+    const TrexPktBuffer *get_pkt_buffer() const {
+        return m_pkt_buffer;
+    }
+    
+    uint32_t get_pending() const {
+        return m_pending;
+    }
+    
+private:
+    const TrexPktBuffer *m_pkt_buffer;
+    uint32_t             m_pending;
+};
+
+class TrexCaptureRCRemove : public TrexCaptureRC {
+public:
+    void set_ok() {
+        m_rc = RC_OK;
+    }
+};
+
+class TrexCaptureRCStatus : public TrexCaptureRC {
+public:
+    
+    void set_status(const Json::Value &json) {
+        m_json = json;
+        m_rc   = RC_OK;
+    }
+    
+    const Json::Value & get_status() const {
+        return m_json;
+    }
+    
+private:
+    Json::Value m_json;
+};
 
 /**
  * capture filter 
@@ -82,20 +221,27 @@ public:
         return *this;
     }
     
+    Json::Value to_json() const {
+        Json::Value output = Json::objectValue;
+        output["tx"] = Json::UInt64(m_tx_active);
+        output["rx"] = Json::UInt64(m_rx_active);
+
+        return output;
+    }
+
 private:
     
     uint64_t  m_tx_active;
     uint64_t  m_rx_active;
 };
 
-typedef int64_t capture_id_t;
-enum {
-    CAPTURE_ID_NOT_FOUND = -1,
-    CAPTURE_TOO_MANY_CAPTURES = -2,
-};
 
 class TrexStatelessCapture {
 public:
+    enum state_e {
+        STATE_ACTIVE,
+        STATE_STOPPED,
+    };
     
     TrexStatelessCapture(capture_id_t id, uint64_t limit, const CaptureFilter &filter);
     
@@ -112,7 +258,24 @@ public:
         return m_filter;
     }
     
+    Json::Value to_json() const;
+
+    void stop() {
+        m_state = STATE_STOPPED;
+    }
+    
+    TrexPktBuffer * fetch(uint32_t pkt_limit, uint32_t &pending);
+    
+    bool is_active() const {
+        return m_state == STATE_ACTIVE;
+    }
+    
+    uint32_t get_pkt_count() const {
+        return m_pkt_buffer->get_element_count();
+    }
+    
 private:
+    state_e          m_state;
     TrexPktBuffer   *m_pkt_buffer;
     CaptureFilter    m_filter;
     uint64_t         m_id;
@@ -134,18 +297,28 @@ public:
     }
     
     /**
-     * adds a capture buffer
-     * returns ID 
+     * starts a new capture
      */
-    capture_id_t add(uint64_t limit, const CaptureFilter &filter);
+    void start(const CaptureFilter &filter, uint64_t limit, TrexCaptureRCStart &rc);
    
-     
     /**
-     * stops capture mode 
-     * on success, will return the ID of the removed one 
-     * o.w it will be an error 
+     * stops an existing capture
+     * 
      */
-    capture_id_t remove(capture_id_t id);
+    void stop(capture_id_t capture_id, TrexCaptureRCStop &rc);
+    
+    /**
+     * fetch packets from an existing capture
+     * 
+     */
+    void fetch(capture_id_t capture_id, uint32_t pkt_limit, TrexCaptureRCFetch &rc);
+
+    /** 
+     * removes an existing capture 
+     * all packets captured will be detroyed 
+     */
+    void remove(capture_id_t capture_id, TrexCaptureRCRemove &rc);
+    
     
     /**
      * removes all captures
@@ -153,6 +326,7 @@ public:
      */
     void reset();
     
+    
     /**
      * return true if any filter is active
      * 
@@ -182,6 +356,8 @@ public:
         handle_pkt_rx_slow_path(m, port);
     }
     
+    Json::Value to_json() const;
+        
 private:
     
     TrexStatelessCaptureMngr() {
@@ -189,6 +365,9 @@ private:
         m_id_counter = 1;
     }
     
+    
+    TrexStatelessCapture * lookup(capture_id_t capture_id);
+    
     void handle_pkt_rx_slow_path(const rte_mbuf_t *m, int port);
     void update_global_filter();
     
index f1ba303..00c1808 100644 (file)
@@ -270,10 +270,6 @@ void CRxCoreStateless::start() {
     m_monitor.disable();  
 }
 
-void CRxCoreStateless::capture_pkt(rte_mbuf_t *m) {
-
-}
-
 int CRxCoreStateless::process_all_pending_pkts(bool flush_rx) {
 
     int total_pkts = 0;
@@ -332,16 +328,6 @@ double CRxCoreStateless::get_cpu_util() {
 }
 
 
-capture_id_t
-CRxCoreStateless::start_capture(uint64_t limit, const CaptureFilter &filter) {
-    return TrexStatelessCaptureMngr::getInstance().add(limit, filter);
-}
-
-capture_id_t
-CRxCoreStateless::stop_capture(capture_id_t capture_id) {
-    return TrexStatelessCaptureMngr::getInstance().remove(capture_id);
-}
-
 void
 CRxCoreStateless::start_queue(uint8_t port_id, uint64_t size) {
     m_rx_port_mngr[port_id].start_queue(size);
index 21ed51b..954a5f0 100644 (file)
@@ -131,14 +131,7 @@ class CRxCoreStateless {
     const TrexPktBuffer *get_rx_queue_pkts(uint8_t port_id) {
         return m_rx_port_mngr[port_id].get_pkt_buffer();
     }
-
-    /**
-     * start capturing packets
-     *  
-     */
-    capture_id_t start_capture(uint64_t limit, const CaptureFilter &filter);
-    capture_id_t stop_capture(capture_id_t capture_id);
-
+    
     /**
      * start RX queueing of packets
      * 
@@ -175,7 +168,6 @@ class CRxCoreStateless {
     void recalculate_next_state();
     bool are_any_features_active();
 
-    void capture_pkt(rte_mbuf_t *m);
     void handle_rx_queue_msgs(uint8_t thread_id, CNodeRing * r);
     void handle_work_stage();
     void handle_grat_arp();