import time
from traceback import format_exc, format_stack
+from config import config
import scapy.compat
from scapy.utils import wrpcap, rdpcap, PcapReader
from scapy.plist import PacketList
from vpp_interface import VppInterface
+from vpp_papi import VppEnum
from scapy.layers.l2 import Ether, ARP
from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, ICMPv6ND_NA,\
ICMPv6NDOptSrcLLAddr, ICMPv6NDOptDstLLAddr, ICMPv6ND_RA, RouterAlert, \
IPv6ExtHdrHopByHop
-from util import ppp, ppc
+from util import ppp, ppc, UnexpectedPacketError
from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_ismaddr
self._out_history_counter += 1
return v
- def __init__(self, test, pg_index, gso, gso_size):
+ def __init__(self, test, pg_index, gso, gso_size, mode):
""" Create VPP packet-generator interface """
super().__init__(test)
- r = test.vapi.pg_create_interface(pg_index, gso, gso_size)
+ r = test.vapi.pg_create_interface_v2(pg_index, gso, gso_size, mode)
self.set_sw_if_index(r.sw_if_index)
self._in_history_counter = 0
self._cap_name = "pcap%u-sw_if_index-%s" % (
self.pg_index, self.sw_if_index)
- def rename_previous_capture_file(self, path, counter):
- # if a file from a previous capture exists, rename it.
+ def handle_old_pcap_file(self, path, counter):
filename = os.path.basename(path)
+
+ if not config.keep_pcaps:
+ try:
+ self.test.logger.debug(f"Removing {path}")
+ os.remove(path)
+ except OSError:
+ self.test.logger.debug(f"OSError: Could not remove {path}")
+ return
+
+ # keep
try:
+
if os.path.isfile(path):
name = "%s/history.[timestamp:%f].[%s-counter:%04d].%s" % \
(self.test.tempdir,
self.name,
counter,
filename)
- self.test.logger.debug("Renaming %s->%s" %
- (path, name))
+ self.test.logger.debug("Renaming %s->%s" % (path, name))
os.rename(path, name)
except OSError:
self.test.logger.debug("OSError: Could not rename %s %s" %
"""
# disable the capture to flush the capture
self.disable_capture()
- self.rename_previous_capture_file(self.out_path,
- self.out_history_counter)
+ self.handle_old_pcap_file(self.out_path, self.out_history_counter)
# FIXME this should be an API, but no such exists atm
self.test.vapi.cli(self.capture_cli)
self._pcap_reader = None
"""
wrpcap(self.get_in_path(worker), pkts)
- self.test.register_capture(self, worker)
+ self.test.register_pcap(self, worker)
# FIXME this should be an API, but no such exists atm
self.test.vapi.cli(self.get_input_cli(nb_replays, worker))
remaining_time -= elapsed_time
if capture:
self.generate_debug_aid("count-mismatch")
+ if len(capture) > 0 and 0 == expected_count:
+ rem = f"\n{remark}" if remark else ""
+ raise UnexpectedPacketError(
+ capture[0],
+ f"\n({len(capture)} packets captured in total){rem}")
raise Exception("Captured packets mismatch, captured %s packets, "
"expected %s packets on %s" %
(len(capture.res), expected_count, name))
else:
raise Exception("No packets captured on %s" % name)
- def assert_nothing_captured(self, remark=None, filter_out_fn=is_ipv6_misc):
+ def assert_nothing_captured(self, timeout=1, remark=None,
+ filter_out_fn=is_ipv6_misc):
""" Assert that nothing unfiltered was captured on interface
:param remark: remark printed into debug logs
if os.path.isfile(self.out_path):
try:
capture = self.get_capture(
- 0, remark=remark, filter_out_fn=filter_out_fn)
+ 0, timeout=timeout, remark=remark,
+ filter_out_fn=filter_out_fn)
if not capture or len(capture.res) == 0:
# junk filtered out, we're good
return
pass
self.generate_debug_aid("empty-assert")
if remark:
- raise AssertionError(
- "Non-empty capture file present for interface %s (%s)" %
- (self.name, remark))
+ raise UnexpectedPacketError(
+ capture[0],
+ f" ({len(capture)} packets captured in total) ({remark})")
else:
- raise AssertionError("Capture file present for interface %s" %
- self.name)
+ raise UnexpectedPacketError(
+ capture[0], f" ({len(capture)} packets captured in total)")
def wait_for_pg_stop(self):
# wait till packet-generator is stopped