fine tune 62/5262/1
authorimarom <[email protected]>
Thu, 19 Jan 2017 11:30:48 +0000 (13:30 +0200)
committerimarom <[email protected]>
Thu, 19 Jan 2017 11:30:48 +0000 (13:30 +0200)
Signed-off-by: imarom <[email protected]>
scripts/automation/trex_control_plane/stl/console/trex_capture.py
scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py
src/rpc-server/commands/trex_rpc_cmd_general.cpp

index e5708e9..67b6c08 100644 (file)
@@ -2,9 +2,10 @@ from trex_stl_lib.api import *
 from trex_stl_lib.utils import parsing_opts, text_tables
 import threading
 import tempfile
+import select
 
 class CaptureMonitorWriter(object):
-    def init (self):
+    def init (self, start_ts):
         raise NotImplementedError
 
     def deinit(self):
@@ -13,6 +14,9 @@ class CaptureMonitorWriter(object):
     def handle_pkts (self, pkts):
         raise NotImplementedError
         
+    def periodic_check (self):
+        raise NotImplementedError
+        
         
 class CaptureMonitorWriterStdout(CaptureMonitorWriter):
     def __init__ (self, logger, is_brief):
@@ -24,21 +28,29 @@ class CaptureMonitorWriterStdout(CaptureMonitorWriter):
         self.RX_ARROW = u'\u25c0\u2500\u2500'
         self.TX_ARROW = u'\u25b6\u2500\u2500'
 
-    def init (self):
-        self.logger.log(format_text("\nStarting capture monitor on selected ports", 'bold'))
-        self.logger.log(format_text("*** any captured packet will be displayed on screen ***\n"))
-        self.logger.log(format_text("('capture monitor stop' to abort capturing...)\n", 'bold')) 
+    def init (self, start_ts):
+        self.start_ts = start_ts
+        
+        self.logger.pre_cmd("Starting stdout capture monitor - verbose: '{0}'".format('low' if self.is_brief else 'high'))
+        self.logger.post_cmd(RC_OK)
+        
+        self.logger.log(format_text("\n*** use 'capture monitor stop' to abort capturing... ***\n", 'bold')) 
         
         
     def deinit (self):
         pass
         
+       
+    def periodic_check (self):
+        return RC_OK()
         
     def handle_pkts (self, pkts):
         for pkt in pkts:
             self.__handle_pkt(pkt)
         
         self.logger.prompt_redraw()
+        return True
+        
         
     def get_scapy_name (self, pkt_scapy):
         layer = pkt_scapy
@@ -46,6 +58,7 @@ class CaptureMonitorWriterStdout(CaptureMonitorWriter):
             layer = layer.payload
         
         return layer.name
+        
                 
     def format_origin (self, origin):
         if origin == 'RX':
@@ -63,11 +76,10 @@ class CaptureMonitorWriterStdout(CaptureMonitorWriter):
         self.byte_count += len(pkt_bin)
 
         pkt_scapy = Ether(pkt_bin)
-        self.logger.log(format_text(u'\n\nPort: {0} {1}\n'.format(pkt['port'], self.format_origin(pkt['origin'])), 'bold', ''))
-        self.logger.log(format_text('    Type: {:}, Size: {:} B, TS: {:.2f} [sec]\n'.format(self.get_scapy_name(pkt_scapy), len(pkt_bin), pkt['ts']), 'bold'))
+        self.logger.log(format_text(u'\n\n#{} Port: {} {}\n'.format(self.pkt_count, pkt['port'], self.format_origin(pkt['origin'])), 'bold', ''))
+        self.logger.log(format_text('    Type: {}, Size: {} B, TS: {:.2f} [sec]\n'.format(self.get_scapy_name(pkt_scapy), len(pkt_bin), pkt['ts'] - self.start_ts), 'bold'))
 
         
-        
         if self.is_brief:
             self.logger.log('    {0}'.format(pkt_scapy.command()))
         else:
@@ -80,43 +92,74 @@ class CaptureMonitorWriterPipe(CaptureMonitorWriter):
     def __init__ (self, logger):
         self.logger   = logger
         
-    def init (self):
+    def init (self, start_ts):
+        self.start_ts  = start_ts
         self.fifo_name = tempfile.mktemp()
         
         try:
+            self.logger.pre_cmd('Starting pipe capture monitor')
             os.mkfifo(self.fifo_name)
+            self.logger.post_cmd(RC_OK)
 
-            self.logger.log(format_text("\nPlease run 'wireshark -k -i {0}'".format(self.fifo_name), 'bold'))
-            self.logger.log('\nWaiting for Wireshark connection...')
+            self.logger.log(format_text("*** Please run 'wireshark -k -i {0}' ***".format(self.fifo_name), 'bold'))
             
+            self.logger.pre_cmd("Waiting for Wireshark pipe connection")
             self.fifo = os.open(self.fifo_name, os.O_WRONLY)
-            self.logger.log('Successfuly connected to Wireshark...')
+            self.logger.post_cmd(RC_OK())
+            
             self.logger.log(format_text('\n*** Capture monitoring started ***\n', 'bold'))
             
             self.writer = RawPcapWriter(self.fifo_name, linktype = 1, sync = True)
             self.writer._write_header(None)
-
+            
+            # register a poller
+            self.poll = select.poll()
+            self.poll.register(self.fifo, select.EPOLLERR)
+            
         except KeyboardInterrupt as e:
-            os.unlink(self.fifo_name)
+            self.logger.post_cmd(RC_ERR(""))
             raise STLError("*** pipe monitor aborted...cleaning up")
 
         except OSError as e:
-            os.unlink(self.fifo_name)
+            self.logger.post_cmd(RC_ERR(""))
             raise STLError("failed to create pipe {0}\n{1}".format(self.fifo_name, str(e)))
         
         
     def deinit (self):
-        os.unlink(self.fifo_name)
+        try:
+            os.unlink(self.fifo_name)
+        except OSError:
+            pass
 
+       
+    def periodic_check (self):
+        return self.check_pipe()
+        
+        
+    def check_pipe (self):
+        if self.poll.poll(0):
+            return RC_ERR('*** pipe has been disconnected - aborting monitoring ***')
+            
+        return RC_OK()
+        
         
     def handle_pkts (self, pkts):
+        rc = self.check_pipe()
+        if not rc:
+            return rc
+        
         for pkt in pkts:
             pkt_bin = base64.b64decode(pkt['binary'])
+            ts = pkt['ts'] - self.start_ts
+            sec = int(ts)
+            usec = int( (ts - sec) * 1e6 )
+                
             try:
-                self.writer._write_packet(pkt_bin, sec = 0, usec = 0)
+                self.writer._write_packet(pkt_bin, sec = sec, usec = usec)
             except IOError:
-                klgjdf
-
+                return RC_ERR("*** failed to write packet to pipe ***")
+                
+        return RC_OK()
         
         
 class CaptureMonitor(object):
@@ -124,9 +167,9 @@ class CaptureMonitor(object):
         self.client      = client
         self.cmd_lock    = cmd_lock
         self.active      = False
-        self.capture_id  = -1
+        self.capture_id  = None
         self.logger      = client.logger
-        
+        self.writer      = None
                 
     def is_active (self):
         return self.active
@@ -137,10 +180,20 @@ class CaptureMonitor(object):
         
         
     def start (self,  tx_port_list, rx_port_list, rate_pps, mon_type):
+        try:
+            self.start_internal(tx_port_list, rx_port_list, rate_pps, mon_type)
+        except Exception as e:
+            self.__stop()
+            raise e
+            
+    def start_internal (self,  tx_port_list, rx_port_list, rate_pps, mon_type):
         # stop any previous monitors
         if self.active:
             self.stop()
         
+        self.tx_port_list = tx_port_list
+        self.rx_port_list = rx_port_list
+
         if mon_type == 'compact':
             self.writer = CaptureMonitorWriterStdout(self.logger, is_brief = True)
         elif mon_type == 'verbose':
@@ -152,13 +205,14 @@ class CaptureMonitor(object):
             
         
         with self.logger.supress():
-                self.capture_id = self.client.start_capture(tx_port_list, rx_port_list, limit = rate_pps)
-
-        self.tx_port_list = tx_port_list
-        self.rx_port_list = rx_port_list
+            data = self.client.start_capture(tx_port_list, rx_port_list, limit = rate_pps)
         
-        self.writer.init()
+        self.capture_id = data['id']
+        self.start_ts   = data['ts']
 
+        self.writer.init(self.start_ts)
+
+        
         self.t = threading.Thread(target = self.__thread_cb)
         self.t.setDaemon(True)
         
@@ -167,19 +221,43 @@ class CaptureMonitor(object):
             self.t.start()
         except Exception as e:
             self.active = False
+            self.client.stop_capture(self.capture_id)
             raise e
-
+        
         
     def stop (self):
+        self.logger.pre_cmd("Stopping capture monitor")
+        try:
+            self.__stop()
+        except Exception as e:
+            self.logger.post_cmd(RC_ERR(""))
+            raise e
+        
+        self.logger.post_cmd(RC_OK())
+            
+    def __stop (self):
+
+        # shutdown thread
         if self.active:
             self.active = False
             self.t.join()
             
-            self.client.stop_capture(self.capture_id, None)
-            self.capture_id = -1
+        # deinit the writer
+        if self.writer is not None:
             self.writer.deinit()
-
+            self.writer = None
+            
+        # cleanup capture ID
+        if self.capture_id is not None:
+            try:
+                with self.logger.supress():
+                    self.client.stop_capture(self.capture_id)
+                    self.capture_id = None
+            except STLError as e:
+                self.logger.post_cmd(RC_ERR(""))
+                raise e
             
+                
     def get_mon_row (self):
         if not self.is_active():
             return None
@@ -215,8 +293,20 @@ class CaptureMonitor(object):
     def __unlock (self):
         self.cmd_lock.release()
         
-        
+    
     def __thread_cb (self):
+        try:
+            rc = self.__thread_main_loop()
+        finally:
+            pass
+            
+        if not rc:
+            self.logger.log(str(rc))
+            self.logger.log(format_text('\n*** monitor is inactive - please restart the monitor ***\n', 'bold'))
+            self.logger.prompt_redraw()
+            
+            
+    def __thread_main_loop (self):
         self.pkt_count  = 0
         self.byte_count = 0
         
@@ -225,6 +315,11 @@ class CaptureMonitor(object):
             if not self.__sleep():
                 break
             
+            # check that the writer is ok
+            rc = self.writer.periodic_check()
+            if not rc:
+                return rc
+
             # try to lock
             if not self.__lock():
                 break
@@ -232,7 +327,8 @@ class CaptureMonitor(object):
             try:
                 rc = self.client._transmit("capture", params = {'command': 'fetch', 'capture_id': self.capture_id, 'pkt_limit': 10})
                 if not rc:
-                    raise STLError(rc)
+                    return rc
+                    
             finally:
                 self.__unlock()
                 
@@ -241,9 +337,13 @@ class CaptureMonitor(object):
             if not pkts:
                 continue
             
-            self.writer.handle_pkts(pkts)
+            rc = self.writer.handle_pkts(pkts)
+            if not rc:
+                return rc
                 
-
+        # graceful shutdown
+        return RC_OK()
+        
      
 
 # main class
@@ -338,7 +438,8 @@ class CaptureManager(object):
         elif opts.record_cmd == 'stop':
             self.parse_record_stop(opts)
         else:
-            assert(0)
+            self.record_parser.formatted_error("too few arguments")
+            
             
     def parse_record_start (self, opts):
         if not opts.tx_port_list and not opts.rx_port_list:
@@ -370,7 +471,8 @@ class CaptureManager(object):
         elif opts.mon_cmd == 'stop':
             self.parse_monitor_stop(opts)
         else:
-            assert(0)
+            self.monitor_parser.formatted_error("too few arguments")
+            
             
     def parse_monitor_start (self, opts):
         mon_type = 'compact'
index c632ad7..5435619 100755 (executable)
@@ -3003,7 +3003,8 @@ class STLClient(object):
                 limit          - limit how many packets will be written
                 
             :returns:
-                the new capture_id
+                returns a dictionary containing
+                {'id: <new_id>, 'ts': <starting timestamp>}
                 
             :raises:
                 + :exe:'STLError'
@@ -3036,7 +3037,7 @@ class STLClient(object):
         if not rc:
             raise STLError(rc)
 
-        return rc.data()['capture_id']
+        return {'id': rc.data()['capture_id'], 'ts': rc.data()['ts']}
 
 
         
@@ -3070,7 +3071,7 @@ class STLClient(object):
         
         
     @__api_check(True)
-    def stop_capture (self, capture_id, output_filename):
+    def stop_capture (self, capture_id, output_filename = None):
         """
             Stops an active capture
 
index 8f7431e..be261fb 100644 (file)
@@ -920,6 +920,7 @@ TrexRpcCmdCapture::parse_cmd_start(const Json::Value &params, Json::Value &resul
     }
     
     result["result"]["capture_id"] = rc.get_new_id();
+    result["result"]["ts"]         = now_sec();
 }
 
 /**