def __init__ (self, logger, is_brief):
self.logger = logger
self.is_brief = is_brief
- self.pkt_count = 0
- self.byte_count = 0
self.RX_ARROW = u'\u25c0\u2500\u2500'
self.TX_ARROW = u'\u25b6\u2500\u2500'
return RC_OK()
def handle_pkts (self, pkts):
+ byte_count = 0
+
for pkt in pkts:
- self.__handle_pkt(pkt)
+ byte_count += self.__handle_pkt(pkt)
self.logger.prompt_redraw()
- return True
+ return RC_OK(byte_count)
def get_scapy_name (self, pkt_scapy):
def __handle_pkt (self, pkt):
pkt_bin = base64.b64decode(pkt['binary'])
- self.pkt_count += 1
- self.byte_count += len(pkt_bin)
-
pkt_scapy = Ether(pkt_bin)
- self.logger.log(format_text(u'\n\n#{} Port: {} {}\n'.format(self.pkt_count, pkt['port'], self.format_origin(pkt['origin'])), 'bold', ''))
+ self.logger.log(format_text(u'\n\n#{} Port: {} {}\n'.format(pkt['index'], pkt['port'], self.format_origin(pkt['origin'])), 'bold', ''))
self.logger.log(format_text(' Type: {}, Size: {} B, TS: {:.2f} [sec]\n'.format(self.get_scapy_name(pkt_scapy), len(pkt_bin), pkt['ts'] - self.start_ts), 'bold'))
pkt_scapy.show(label_lvl = ' ')
self.logger.log('')
+ return len(pkt_bin)
#
class CaptureMonitorWriterPipe(CaptureMonitorWriter):
if not rc:
return rc
+ byte_count = 0
+
for pkt in pkts:
pkt_bin = base64.b64decode(pkt['binary'])
- ts = pkt['ts'] - self.start_ts
- sec = int(ts)
- usec = int( (ts - sec) * 1e6 )
+ ts = pkt['ts']
+ sec = int(ts)
+ usec = int( (ts - sec) * 1e6 )
try:
self.writer._write_packet(pkt_bin, sec = sec, usec = usec)
except IOError:
return RC_ERR("*** failed to write packet to pipe ***")
-
- return RC_OK()
+
+ byte_count += len(pkt_bin)
+
+ return RC_OK(byte_count)
class CaptureMonitor(object):
self.capture_id = None
self.logger = client.logger
self.writer = None
-
+
def is_active (self):
return self.active
return self.capture_id
- def start (self, tx_port_list, rx_port_list, rate_pps, mon_type):
+ def start (self, tx_port_list, rx_port_list, rate_pps, mon_type):
try:
self.start_internal(tx_port_list, rx_port_list, rate_pps, mon_type)
except Exception as e:
self.t.start()
except Exception as e:
self.active = False
- self.client.stop_capture(self.capture_id)
+ self.stop()
raise e
-
+ # entry point stop
def stop (self):
+
+ if self.active:
+ self.stop_logged()
+ else:
+ self.__stop()
+
+ # wraps stop with a logging
+ def stop_logged (self):
self.logger.pre_cmd("Stopping capture monitor")
+
try:
self.__stop()
except Exception as e:
self.logger.post_cmd(RC_OK())
+ # internal stop
def __stop (self):
# shutdown thread
self.writer.deinit()
self.writer = None
- # cleanup capture ID
- if self.capture_id is not None:
- try:
- with self.logger.supress():
- self.client.stop_capture(self.capture_id)
- self.capture_id = None
- except STLError as e:
- self.logger.post_cmd(RC_ERR(""))
- raise e
+ # cleanup capture ID if possible
+ if self.capture_id is None:
+ return
+
+ capture_id = self.capture_id
+ self.capture_id = None
+
+ # if we are disconnected - we cannot cleanup the capture
+ if not self.client.is_connected():
+ return
+
+ try:
+ captures = [x['id'] for x in self.client.get_capture_status()]
+ if capture_id not in captures:
+ return
+
+ with self.logger.supress():
+ self.client.stop_capture(capture_id)
+
+ except STLError as e:
+ self.logger.post_cmd(RC_ERR(""))
+ raise e
def get_mon_row (self):
self.byte_count = 0
while self.active:
+
# sleep
if not self.__sleep():
break
break
try:
+ if not self.client.is_connected():
+ return RC_ERR('*** client has been disconnected, aborting monitoring ***')
rc = self.client._transmit("capture", params = {'command': 'fetch', 'capture_id': self.capture_id, 'pkt_limit': 10})
if not rc:
return rc
rc = self.writer.handle_pkts(pkts)
if not rc:
return rc
+
+ self.pkt_count += len(pkts)
+ self.byte_count += rc.data()
# graceful shutdown
return RC_OK()
self.record_start_parser.formatted_error('please provide either --tx or --rx')
return
- self.c.start_capture(opts.tx_port_list, opts.rx_port_list, opts.limit)
-
+ rc = self.c.start_capture(opts.tx_port_list, opts.rx_port_list, opts.limit)
+
+ self.logger.log(format_text("*** Capturing ID is set to '{0}' ***".format(rc['id']), 'bold'))
+ self.logger.log(format_text("*** Please call 'capture record stop --id {0} -o <out.pcap>' when done ***\n".format(rc['id']), 'bold'))
+
def parse_record_stop (self, opts):
captures = self.c.get_capture_status()
m_pkt_buffer = new TrexPktBuffer(limit, TrexPktBuffer::MODE_DROP_TAIL);
m_filter = filter;
m_state = STATE_ACTIVE;
+ m_start_ts = now_sec();
+ m_pkt_index = 0;
}
TrexStatelessCapture::~TrexStatelessCapture() {
}
void
-TrexStatelessCapture::handle_pkt_tx(const TrexPkt *pkt) {
+TrexStatelessCapture::handle_pkt_tx(TrexPkt *pkt) {
if (m_state != STATE_ACTIVE) {
delete pkt;
return;
}
+ if (pkt->get_ts() < m_start_ts) {
+ delete pkt;
+ return;
+ }
+
+ pkt->set_index(++m_pkt_index);
m_pkt_buffer->push(pkt);
}
return;
}
- m_pkt_buffer->push(m, port, TrexPkt::ORIGIN_RX);
+ m_pkt_buffer->push(m, port, TrexPkt::ORIGIN_RX, ++m_pkt_index);
}
partial->push(pkt);
}
- pending = m_pkt_buffer->get_element_count();
+ pending = m_pkt_buffer->get_element_count();
+
return partial;
}
uint32_t pending = 0;
TrexPktBuffer *pkt_buffer = capture->fetch(pkt_limit, pending);
- rc.set_pkt_buffer(pkt_buffer, pending);
+ rc.set_pkt_buffer(pkt_buffer, pending, capture->get_start_ts());
}
void
}
void
-TrexStatelessCaptureMngr::handle_pkt_tx(const TrexPkt *pkt) {
+TrexStatelessCaptureMngr::handle_pkt_tx(TrexPkt *pkt) {
for (TrexStatelessCapture *capture : m_captures) {
capture->handle_pkt_tx(pkt);
}
m_pending = 0;
}
- void set_pkt_buffer(const TrexPktBuffer *pkt_buffer, uint32_t pending) {
- m_pkt_buffer = pkt_buffer;
- m_pending = pending;
- m_rc = RC_OK;
+ void set_pkt_buffer(const TrexPktBuffer *pkt_buffer, uint32_t pending, dsec_t start_ts) {
+ m_pkt_buffer = pkt_buffer;
+ m_pending = pending;
+ m_start_ts = start_ts;
+ m_rc = RC_OK;
}
const TrexPktBuffer *get_pkt_buffer() const {
return m_pending;
}
+ dsec_t get_start_ts() const {
+ return m_start_ts;
+ }
+
private:
const TrexPktBuffer *m_pkt_buffer;
uint32_t m_pending;
+ dsec_t m_start_ts;
};
class TrexCaptureRCRemove : public TrexCaptureRC {
TrexStatelessCapture(capture_id_t id, uint64_t limit, const CaptureFilter &filter);
- void handle_pkt_tx(const TrexPkt *pkt);
+ void handle_pkt_tx(TrexPkt *pkt);
void handle_pkt_rx(const rte_mbuf_t *m, int port);
~TrexStatelessCapture();
return m_pkt_buffer->get_element_count();
}
+ dsec_t get_start_ts() const {
+ return m_start_ts;
+ }
+
private:
state_e m_state;
TrexPktBuffer *m_pkt_buffer;
+ dsec_t m_start_ts;
CaptureFilter m_filter;
uint64_t m_id;
+ uint64_t m_pkt_index;
};
class TrexStatelessCaptureMngr {
/**
* handle packet from TX
*/
- void handle_pkt_tx(const TrexPkt *pkt);
+ void handle_pkt_tx(TrexPkt *pkt);
/**
* handle packet from RX