import traceback
import os.path
-
############################ logger #############################
############################ #############################
############################ #############################
if not rc:
return rc
-
+
# API sync
rc = self._transmit("api_sync", params = {'api_vers': self.api_vers}, api_class = None)
if not rc:
self.logger.set_verbose(modes[level])
-
@__api_check(False)
def connect (self):
"""
+ :exc:`STLError`
"""
-
rc = self.__connect()
if not rc:
raise STLError(rc)
if not opts:
return opts
- ports = list_intersect(opts.ports, self.get_resolvable_ports())
- if not ports:
- if not opts.ports:
- msg = 'resolve - no ports with IPv4 destination'
- else:
- msg = 'pause - none of ports {0} are configured with IPv4 destination'.format(opts.ports)
-
- self.logger.log(msg)
- return RC_ERR(msg)
-
- self.resolve(ports = ports, retries = opts.retries)
+
+ self.resolve(ports = opts.ports, retries = opts.retries)
return RC_OK()
# invalidates the current ARP
def invalidate_arp (self):
dest = self.__attr['dest']
+
+ if not self.is_l3_mode():
+ return self.err('port is not configured with L3')
+
+ self.set_l3_mode(self.get_src_addr()['ipv4'], self.get_dst_addr()['ipv4'])
if dest['type'] != 'mac':
return self.set_attr(dest = dest['ipv4'])
# RX filter mode
info['rx_filter_mode'] = 'hardware match' if attr['rx_filter_mode'] == 'hw' else 'fetch all'
+ info['layer_mode'] = 'IPv4' if self.is_l3_mode() else 'Ethernet'
+
# src MAC and IPv4
info['src_mac'] = attr['src_mac']
info['src_ipv4'] = attr['src_ipv4']
else:
assert(0)
+ def is_l3_mode (self):
+ return self.get_dst_addr()['ipv4'] is not None
# port is considered resolved if it's dest is either MAC or resolved IPv4
def is_resolved (self):
"src MAC": info['src_mac'],
"src IPv4": info['src_ipv4'],
"Destination": info['dest'],
- "ARP Resolution": format_text("{0}".format(info['arp']), 'bold', 'red') if info['arp'] == 'unresolved' else info['arp'],
+ "ARP Resolution": format_text("{0}".format(info['arp']), 'bold', 'red' if info['arp'] == 'unresolved' else None),
"PCI Address": info['pci_addr'],
"NUMA Node": info['numa'],
"--": "",
"promiscuous" : info['prom'],
"flow ctrl" : info['fc'],
+ "layer mode": format_text(info['layer_mode'], 'green' if info['layer_mode'] == 'IPv4' else 'magenta'),
"RX Filter Mode": info['rx_filter_mode'],
"RX Queueing": info['rx_queue'],
"RX sniffer": info['rx_sniffer'],
# before resolve
def pre_send (self):
+
+ if not self.port.is_l3_mode():
+ return self.port.err("arp - port is not configured as L3 layer")
+
self.dst = self.port.get_dst_addr()
self.src = self.port.get_src_addr()
-
- if self.dst['ipv4'] is None:
- return self.port.err("Port has a non-IPv4 destination: '{0}'".format(self.dst['mac']))
- if self.src['ipv4'] is None:
- return self.port.err('Port must have an IPv4 source address configured')
-
- # invalidate the current ARP resolution (if exists)
return self.port.invalidate_arp()
return None
arp = scapy_pkt['ARP']
-
+
# check this is the right ARP (ARP reply with the address)
if (arp.op != 2) or (arp.psrc != self.dst['ipv4']):
return None
self.pkt_size = pkt_size
def pre_send (self):
+ if not self.port.is_l3_mode():
+ return self.port.err('ping - port is not configured as L3 layer')
+
+ if not self.port.is_resolved():
+ return self.port.err('ping - port has an unresolved destination, cannot determine next hop MAC address')
self.src = self.port.get_src_addr()
self.dst = self.port.get_dst_addr()
- if self.src['ipv4'] is None:
- return self.port.err('Ping - port does not have an IPv4 address configured')
-
- if self.dst['mac'] is None:
- return self.port.err('Ping - port has an unresolved destination, cannot determine next hop MAC address')
return self.port.ok()
base_pkt = base_pkt / (pad * 'x')
- #base_pkt.show2()
s1 = STLStream( packet = STLPktBuilder(pkt = base_pkt), mode = STLTXSingleBurst(total_pkts = 1) )
self.base_pkt = base_pkt
("promiscuous", []),
("flow ctrl", []),
("--", []),
+ ("layer mode", []),
("src IPv4", []),
("src MAC", []),
("---", []),