few tweaks 63/5263/1
authorimarom <[email protected]>
Sun, 22 Jan 2017 13:36:20 +0000 (15:36 +0200)
committerimarom <[email protected]>
Sun, 22 Jan 2017 13:36:20 +0000 (15:36 +0200)
Signed-off-by: imarom <[email protected]>
scripts/automation/trex_control_plane/stl/console/trex_capture.py
scripts/automation/trex_control_plane/stl/console/trex_console.py
scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py
scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_jsonrpc_client.py
src/rpc-server/commands/trex_rpc_cmd_general.cpp
src/stateless/rx/trex_stateless_capture.cpp
src/stateless/rx/trex_stateless_capture.h

index 67b6c08..dfd7f0a 100644 (file)
@@ -22,8 +22,6 @@ class CaptureMonitorWriterStdout(CaptureMonitorWriter):
     def __init__ (self, logger, is_brief):
         self.logger      = logger
         self.is_brief    = is_brief
-        self.pkt_count   = 0
-        self.byte_count  = 0
 
         self.RX_ARROW = u'\u25c0\u2500\u2500'
         self.TX_ARROW = u'\u25b6\u2500\u2500'
@@ -45,11 +43,13 @@ class CaptureMonitorWriterStdout(CaptureMonitorWriter):
         return RC_OK()
         
     def handle_pkts (self, pkts):
+        byte_count = 0
+        
         for pkt in pkts:
-            self.__handle_pkt(pkt)
+            byte_count += self.__handle_pkt(pkt)
         
         self.logger.prompt_redraw()
-        return True
+        return RC_OK(byte_count)
         
         
     def get_scapy_name (self, pkt_scapy):
@@ -72,11 +72,8 @@ class CaptureMonitorWriterStdout(CaptureMonitorWriter):
     def __handle_pkt (self, pkt):
         pkt_bin = base64.b64decode(pkt['binary'])
 
-        self.pkt_count  += 1
-        self.byte_count += len(pkt_bin)
-
         pkt_scapy = Ether(pkt_bin)
-        self.logger.log(format_text(u'\n\n#{} Port: {} {}\n'.format(self.pkt_count, pkt['port'], self.format_origin(pkt['origin'])), 'bold', ''))
+        self.logger.log(format_text(u'\n\n#{} Port: {} {}\n'.format(pkt['index'], pkt['port'], self.format_origin(pkt['origin'])), 'bold', ''))
         self.logger.log(format_text('    Type: {}, Size: {} B, TS: {:.2f} [sec]\n'.format(self.get_scapy_name(pkt_scapy), len(pkt_bin), pkt['ts'] - self.start_ts), 'bold'))
 
         
@@ -86,6 +83,7 @@ class CaptureMonitorWriterStdout(CaptureMonitorWriter):
             pkt_scapy.show(label_lvl = '    ')
             self.logger.log('')
 
+        return len(pkt_bin)
 
 #
 class CaptureMonitorWriterPipe(CaptureMonitorWriter):
@@ -148,18 +146,22 @@ class CaptureMonitorWriterPipe(CaptureMonitorWriter):
         if not rc:
             return rc
         
+        byte_count = 0
+        
         for pkt in pkts:
             pkt_bin = base64.b64decode(pkt['binary'])
-            ts = pkt['ts'] - self.start_ts
-            sec = int(ts)
-            usec = int( (ts - sec) * 1e6 )
+            ts      = pkt['ts']
+            sec     = int(ts)
+            usec    = int( (ts - sec) * 1e6 )
                 
             try:
                 self.writer._write_packet(pkt_bin, sec = sec, usec = usec)
             except IOError:
                 return RC_ERR("*** failed to write packet to pipe ***")
-                
-        return RC_OK()
+             
+            byte_count += len(pkt_bin)
+               
+        return RC_OK(byte_count)
         
         
 class CaptureMonitor(object):
@@ -170,7 +172,7 @@ class CaptureMonitor(object):
         self.capture_id  = None
         self.logger      = client.logger
         self.writer      = None
-                
+        
     def is_active (self):
         return self.active
         
@@ -179,7 +181,7 @@ class CaptureMonitor(object):
         return self.capture_id
         
         
-    def start (self,  tx_port_list, rx_port_list, rate_pps, mon_type):
+    def start (self, tx_port_list, rx_port_list, rate_pps, mon_type):
         try:
             self.start_internal(tx_port_list, rx_port_list, rate_pps, mon_type)
         except Exception as e:
@@ -221,12 +223,21 @@ class CaptureMonitor(object):
             self.t.start()
         except Exception as e:
             self.active = False
-            self.client.stop_capture(self.capture_id)
+            self.stop()
             raise e
         
-        
+    # entry point stop 
     def stop (self):
+
+        if self.active:
+            self.stop_logged()
+        else:
+            self.__stop()
+        
+    # wraps stop with a logging
+    def stop_logged (self):
         self.logger.pre_cmd("Stopping capture monitor")
+        
         try:
             self.__stop()
         except Exception as e:
@@ -235,6 +246,7 @@ class CaptureMonitor(object):
         
         self.logger.post_cmd(RC_OK())
             
+    # internal stop
     def __stop (self):
 
         # shutdown thread
@@ -247,15 +259,28 @@ class CaptureMonitor(object):
             self.writer.deinit()
             self.writer = None
             
-        # cleanup capture ID
-        if self.capture_id is not None:
-            try:
-                with self.logger.supress():
-                    self.client.stop_capture(self.capture_id)
-                    self.capture_id = None
-            except STLError as e:
-                self.logger.post_cmd(RC_ERR(""))
-                raise e
+        # cleanup capture ID if possible
+        if self.capture_id is None:
+            return
+
+        capture_id = self.capture_id
+        self.capture_id = None
+        
+        # if we are disconnected - we cannot cleanup the capture
+        if not self.client.is_connected():
+            return
+            
+        try:
+            captures = [x['id'] for x in self.client.get_capture_status()]
+            if capture_id not in captures:
+                return
+                
+            with self.logger.supress():
+                self.client.stop_capture(capture_id)
+            
+        except STLError as e:
+            self.logger.post_cmd(RC_ERR(""))
+            raise e
             
                 
     def get_mon_row (self):
@@ -311,6 +336,7 @@ class CaptureMonitor(object):
         self.byte_count = 0
         
         while self.active:
+            
             # sleep
             if not self.__sleep():
                 break
@@ -325,6 +351,8 @@ class CaptureMonitor(object):
                 break
                 
             try:
+                if not self.client.is_connected():
+                    return RC_ERR('*** client has been disconnected, aborting monitoring ***')
                 rc = self.client._transmit("capture", params = {'command': 'fetch', 'capture_id': self.capture_id, 'pkt_limit': 10})
                 if not rc:
                     return rc
@@ -340,6 +368,9 @@ class CaptureMonitor(object):
             rc = self.writer.handle_pkts(pkts)
             if not rc:
                 return rc
+            
+            self.pkt_count  += len(pkts)
+            self.byte_count += rc.data()
                 
         # graceful shutdown
         return RC_OK()
@@ -446,8 +477,11 @@ class CaptureManager(object):
             self.record_start_parser.formatted_error('please provide either --tx or --rx')
             return
 
-        self.c.start_capture(opts.tx_port_list, opts.rx_port_list, opts.limit)
-            
+        rc = self.c.start_capture(opts.tx_port_list, opts.rx_port_list, opts.limit)
+        
+        self.logger.log(format_text("*** Capturing ID is set to '{0}' ***".format(rc['id']), 'bold'))
+        self.logger.log(format_text("*** Please call 'capture record stop --id {0} -o <out.pcap>' when done ***\n".format(rc['id']), 'bold'))
+
         
     def parse_record_stop (self, opts):
         captures = self.c.get_capture_status()
index bf54304..270ef31 100755 (executable)
@@ -707,20 +707,21 @@ class TRexConsole(TRexGeneralCmd):
              
     # a custorm cmdloop wrapper
     def start(self):
-        while True:
-            try:
-                self.cmdloop()
-                break
-            except KeyboardInterrupt as e:
-                if not readline.get_line_buffer():
-                    raise KeyboardInterrupt
-                else:
-                    print("")
-                    self.intro = None
-                    continue
-
-            finally:
-                self.cap_mngr.stop()
+        try:
+            while True:
+                try:
+                    self.cmdloop()
+                    break
+                except KeyboardInterrupt as e:
+                    if not readline.get_line_buffer():
+                        raise KeyboardInterrupt
+                    else:
+                        print("")
+                        self.intro = None
+                        continue
+    
+        finally:
+            self.cap_mngr.stop()
 
         if self.terminal:
             self.terminal.kill()
index 5435619..c82d77f 100755 (executable)
@@ -3058,13 +3058,19 @@ class STLClient(object):
                 self.logger.post_cmd(rc)
                 raise STLError(rc)
         
-            pkts = rc.data()['pkts']
+            pkts      = rc.data()['pkts']
+            pending   = rc.data()['pending']
+            start_ts  = rc.data()['start_ts']
+            
             for pkt in pkts:
-                ts = pkt['ts']
+                ts = pkt['ts'] - start_ts
+                ts_sec  = int(ts)
+                ts_usec = int( (ts - ts_sec) * 1e6 )
+                
                 pkt_bin = base64.b64decode(pkt['binary'])
-                writer._write_packet(pkt_bin, sec = 0, usec = 0)
+                writer._write_packet(pkt_bin, sec = ts_sec, usec = ts_usec)
                 
-            pending = rc.data()['pending']
+            
             
         
         self.logger.post_cmd(rc)
index 72c9317..ff07b59 100644 (file)
@@ -184,7 +184,7 @@ class JsonRpcClient(object):
                 break
             except zmq.Again:
                 tries += 1
-                if tries > 5:
+                if tries > 0:
                     self.disconnect()
                     return RC_ERR("*** [RPC] - Failed to send message to server")
 
@@ -200,7 +200,7 @@ class JsonRpcClient(object):
                 break
             except zmq.Again:
                 tries += 1
-                if tries > 5:
+                if tries > 0:
                     self.disconnect()
                     return RC_ERR("*** [RPC] - Failed to get server response from {0}".format(self.transport))
 
index be261fb..55249fc 100644 (file)
@@ -994,8 +994,9 @@ TrexRpcCmdCapture::parse_cmd_fetch(const Json::Value &params, Json::Value &resul
     
     const TrexPktBuffer *pkt_buffer = rc.get_pkt_buffer();
     
-    result["result"]["pending"] = rc.get_pending();
-    result["result"]["pkts"]    = pkt_buffer->to_json();
+    result["result"]["pending"]     = rc.get_pending();
+    result["result"]["start_ts"]    = rc.get_start_ts();
+    result["result"]["pkts"]        = pkt_buffer->to_json();
     
     /* delete the buffer */
     delete pkt_buffer;
index 5d43ced..f0d4e80 100644 (file)
@@ -26,6 +26,8 @@ TrexStatelessCapture::TrexStatelessCapture(capture_id_t id, uint64_t limit, cons
     m_pkt_buffer = new TrexPktBuffer(limit, TrexPktBuffer::MODE_DROP_TAIL);
     m_filter     = filter;
     m_state      = STATE_ACTIVE;
+    m_start_ts   = now_sec();
+    m_pkt_index  = 0;
 }
 
 TrexStatelessCapture::~TrexStatelessCapture() {
@@ -35,7 +37,7 @@ TrexStatelessCapture::~TrexStatelessCapture() {
 }
 
 void
-TrexStatelessCapture::handle_pkt_tx(const TrexPkt *pkt) {
+TrexStatelessCapture::handle_pkt_tx(TrexPkt *pkt) {
 
     if (m_state != STATE_ACTIVE) {
         delete pkt;
@@ -48,6 +50,12 @@ TrexStatelessCapture::handle_pkt_tx(const TrexPkt *pkt) {
         return;
     }
     
+    if (pkt->get_ts() < m_start_ts) {
+        delete pkt;
+        return;
+    }
+    
+    pkt->set_index(++m_pkt_index);
     m_pkt_buffer->push(pkt);
 }
 
@@ -62,7 +70,7 @@ TrexStatelessCapture::handle_pkt_rx(const rte_mbuf_t *m, int port) {
         return;
     }
     
-    m_pkt_buffer->push(m, port, TrexPkt::ORIGIN_RX);
+    m_pkt_buffer->push(m, port, TrexPkt::ORIGIN_RX, ++m_pkt_index);
 }
 
 
@@ -110,7 +118,8 @@ TrexStatelessCapture::fetch(uint32_t pkt_limit, uint32_t &pending) {
         partial->push(pkt);
     }
     
-    pending = m_pkt_buffer->get_element_count();
+    pending  = m_pkt_buffer->get_element_count();
+    
     return partial;
 }
 
@@ -181,7 +190,7 @@ TrexStatelessCaptureMngr::fetch(capture_id_t capture_id, uint32_t pkt_limit, Tre
     uint32_t pending = 0;
     TrexPktBuffer *pkt_buffer = capture->fetch(pkt_limit, pending);
     
-    rc.set_pkt_buffer(pkt_buffer, pending);
+    rc.set_pkt_buffer(pkt_buffer, pending, capture->get_start_ts());
 }
 
 void
@@ -223,7 +232,7 @@ TrexStatelessCaptureMngr::reset() {
 }
 
 void 
-TrexStatelessCaptureMngr::handle_pkt_tx(const TrexPkt *pkt) {
+TrexStatelessCaptureMngr::handle_pkt_tx(TrexPkt *pkt) {
     for (TrexStatelessCapture *capture : m_captures) {
         capture->handle_pkt_tx(pkt);
     }
index 4a9efea..bc1b88c 100644 (file)
@@ -121,10 +121,11 @@ public:
         m_pending    = 0;
     }
     
-    void set_pkt_buffer(const TrexPktBuffer *pkt_buffer, uint32_t pending) {
-        m_pkt_buffer = pkt_buffer;
-        m_pending = pending;
-        m_rc = RC_OK;
+    void set_pkt_buffer(const TrexPktBuffer *pkt_buffer, uint32_t pending, dsec_t start_ts) {
+        m_pkt_buffer  = pkt_buffer;
+        m_pending     = pending;
+        m_start_ts    = start_ts;
+        m_rc          = RC_OK;
     }
     
     const TrexPktBuffer *get_pkt_buffer() const {
@@ -135,9 +136,14 @@ public:
         return m_pending;
     }
     
+    dsec_t get_start_ts() const {
+        return m_start_ts;
+    }
+    
 private:
     const TrexPktBuffer *m_pkt_buffer;
     uint32_t             m_pending;
+    dsec_t               m_start_ts;
 };
 
 class TrexCaptureRCRemove : public TrexCaptureRC {
@@ -245,7 +251,7 @@ public:
     
     TrexStatelessCapture(capture_id_t id, uint64_t limit, const CaptureFilter &filter);
     
-    void handle_pkt_tx(const TrexPkt *pkt);
+    void handle_pkt_tx(TrexPkt *pkt);
     void handle_pkt_rx(const rte_mbuf_t *m, int port);
     
     ~TrexStatelessCapture();
@@ -274,11 +280,17 @@ public:
         return m_pkt_buffer->get_element_count();
     }
     
+    dsec_t get_start_ts() const {
+        return m_start_ts;
+    }
+    
 private:
     state_e          m_state;
     TrexPktBuffer   *m_pkt_buffer;
+    dsec_t           m_start_ts;
     CaptureFilter    m_filter;
     uint64_t         m_id;
+    uint64_t         m_pkt_index;
 };
 
 class TrexStatelessCaptureMngr {
@@ -341,7 +353,7 @@ public:
     /**
      *  handle packet from TX
      */
-    void handle_pkt_tx(const TrexPkt *pkt);
+    void handle_pkt_tx(TrexPkt *pkt);
     
     /** 
      * handle packet from RX