import socket
import struct
from traceback import format_exc, format_stack
+
+import scapy.compat
from scapy.utils import wrpcap, rdpcap, PcapReader
from scapy.plist import PacketList
from vpp_interface import VppInterface
@property
def input_cli(self):
"""CLI string to load the injected packets"""
+ if self._nb_replays is not None:
+ return "%s limit %d" % (self._input_cli, self._nb_replays)
return self._input_cli
@property
self._input_cli = \
"packet-generator new pcap %s source pg%u name %s" % (
self.in_path, self.pg_index, self.cap_name)
+ self._nb_replays = None
def enable_capture(self):
- """ Enable capture on this packet-generator interface"""
+ """ Enable capture on this packet-generator interface
+ of at most n packets.
+ If n < 0, this is no limit
+ """
try:
if os.path.isfile(self.out_path):
name = "%s/history.[timestamp:%f].[%s-counter:%04d].%s" % \
self.test.vapi.cli(self.capture_cli)
self._pcap_reader = None
- def add_stream(self, pkts):
+ def disable_capture(self):
+ self.test.vapi.cli("%s disable" % self.capture_cli)
+
+ def add_stream(self, pkts, nb_replays=None):
"""
Add a stream of packets to this packet-generator
:param pkts: iterable packets
"""
+ self._nb_replays = nb_replays
try:
if os.path.isfile(self.in_path):
name = "%s/history.[timestamp:%f].[%s-counter:%04d].%s" %\
while time.time() < deadline:
if os.path.isfile(self.out_path):
break
- time.sleep(0) # yield
+ self._test.sleep(0) # yield
if os.path.isfile(self.out_path):
self.test.logger.debug("Capture file appeared after %fs" %
(time.time() - (deadline - timeout)))
self.test.logger.debug("Polling for packet")
while time.time() < deadline or poll:
if not self.verify_enough_packet_data_in_pcap():
- time.sleep(0) # yield
+ self._test.sleep(0) # yield
poll = False
continue
p = self._pcap_reader.recv()
"Packet received after %fs" %
(time.time() - (deadline - timeout)))
return p
- time.sleep(0) # yield
+ self._test.sleep(0) # yield
poll = False
self.test.logger.debug("Timeout - no packets received")
raise CaptureTimeoutError("Packet didn't arrive within timeout")
# Make Dot1AD packet content recognizable to scapy
if arp_reply.type == 0x88a8:
arp_reply.type = 0x8100
- arp_reply = Ether(str(arp_reply))
+ arp_reply = Ether(scapy.compat.raw(arp_reply))
try:
if arp_reply[ARP].op == ARP.is_at:
self.test.logger.info("VPP %s MAC address is %s " %
ndp_reply = captured_packet.copy() # keep original for exception
# Make Dot1AD packet content recognizable to scapy
if ndp_reply.type == 0x88a8:
+ self._test.logger.info(
+ "Replacing EtherType: 0x88a8 with "
+ "0x8100 and regenerating Ethernet header. ")
ndp_reply.type = 0x8100
- ndp_reply = Ether(str(ndp_reply))
+ ndp_reply = Ether(scapy.compat.raw(ndp_reply))
try:
ndp_na = ndp_reply[ICMPv6ND_NA]
opt = ndp_na[ICMPv6NDOptDstLLAddr]