self.owner = ''
self.last_factor_type = None
-
+
self.attr_lock = threading.Lock()
# decorator to verify port is up
# save this for TUI
self.last_factor_type = mul['type']
-
- return self.ok()
+
+ return rc
# stop traffic
return self.err(rc.err())
self.state = self.STATE_STREAMS
+
self.last_factor_type = None
-
+
# timestamp for last tx
self.tx_stopped_ts = datetime.now()
# decode the packets
for i in range(len(pkts)):
- pkts[i] = base64.b64decode(pkts[i])
+ pkts[i]['binary'] = base64.b64decode(pkts[i]['binary'])
+
return pkts
# until thread is locked - order is important
self.tx_stopped_ts = datetime.now()
self.state = self.STATE_STREAMS
+
self.last_factor_type = None
def async_event_port_attr_changed (self, new_attr):
raise NotImplementedError()
# return None for more packets otherwise RC object
- def on_pkt_rx (self, pkt, dt):
+ def on_pkt_rx (self, pkt):
raise NotImplementedError()
# return value in case of timeout
if not rc:
return rc
+ # save the start timestamp
+ self.start_ts = rc.data()['ts']
+
# block until traffic finishes
while self.port.is_active():
time.sleep(0.01)
- self.tx_time = time.time()
-
return self.wait_for_rx_response()
while polling > 0:
# fetch the queue
rx_pkts = self.port.get_rx_queue_pkts()
- dt = time.time() - self.tx_time
+
# for each packet - examine it
for pkt in rx_pkts:
- rc = self.on_pkt_rx(pkt, dt)
+ rc = self.on_pkt_rx(pkt)
if rc is not None:
return rc
# return None in case more packets are needed else the status rc
- def on_pkt_rx (self, pkt, dt):
- scapy_pkt = Ether(pkt)
+ def on_pkt_rx (self, pkt):
+ scapy_pkt = Ether(pkt['binary'])
if not 'ARP' in scapy_pkt:
return None
return [s1]
# return None for more packets otherwise RC object
- def on_pkt_rx (self, pkt, dt):
- scapy_pkt = Ether(pkt)
+ def on_pkt_rx (self, pkt):
+ scapy_pkt = Ether(pkt['binary'])
if not 'ICMP' in scapy_pkt:
return None
ip = scapy_pkt['IP']
icmp = scapy_pkt['ICMP']
+
+ dt = pkt['ts'] - self.start_ts
+
if icmp.type == 0:
# echo reply
- return self.port.ok('Reply from {0}: bytes={1}, time={2:.2f}ms, TTL={3}'.format(ip.src, len(pkt), dt * 1000, ip.ttl))
+ return self.port.ok('Reply from {0}: bytes={1}, time={2:.2f}ms, TTL={3}'.format(ip.src, len(pkt['binary']), dt * 1000, ip.ttl))
# unreachable
elif icmp.type == 3:
- return self.port.ok('destination {0} is unreachable'.format(icmp.dst))
+ return self.port.ok('Reply from {0}: Destination host unreachable'.format(icmp.src))
else:
scapy_pkt.show2()
return self.port.err('unknown ICMP reply')
m_raw = (uint8_t *)malloc(m_size);
memcpy(m_raw, p, m_size);
+
+ /* save the packet timestamp */
+ m_timestamp = now_sec();
}
- /* RVO here - no performance impact */
- const std::string to_base64_str() const {
- return base64_encode(m_raw, m_size);
+ /* slow path and also RVO - pass by value is ok */
+ Json::Value to_json() {
+ Json::Value output;
+ output["ts"] = m_timestamp;
+ output["binary"] = base64_encode(m_raw, m_size);
+ return output;
}
~RxPacket() {
private:
- uint8_t *m_raw;
- uint16_t m_size;
+ uint8_t *m_raw;
+ uint16_t m_size;
+ dsec_t m_timestamp;
};
/**
int tmp = m_tail;
while (tmp != m_head) {
RxPacket *pkt = m_buffer[tmp];
- output.append(pkt->to_base64_str());
+ output.append(pkt->to_json());
tmp = next(tmp);
}