from logging import FileHandler, DEBUG, Formatter
from enum import Enum
from abc import ABC, abstractmethod
+from struct import pack, unpack
import scapy.compat
from scapy.packet import Raw
c = os.getenv("CACHE_OUTPUT", "1")
cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
- cls.plugin_path = os.getenv('VPP_PLUGIN_PATH')
- cls.test_plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
- cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
- plugin_path = None
- if cls.plugin_path is not None:
- if cls.extern_plugin_path is not None:
- plugin_path = "%s:%s" % (
- cls.plugin_path, cls.extern_plugin_path)
- else:
- plugin_path = cls.plugin_path
- elif cls.extern_plugin_path is not None:
- plugin_path = cls.extern_plugin_path
+ extern_plugin_path = os.getenv('EXTERN_PLUGINS')
debug_cli = ""
if cls.step or cls.debug_gdb or cls.debug_gdbserver:
debug_cli = "cli-listen localhost:5002"
"api-trace", "{", "on", "}",
"api-segment", "{", "prefix", cls.get_api_segment_prefix(), "}",
"cpu", "{", "main-core", str(cls.cpus[0]), ]
+ if extern_plugin_path is not None:
+ cls.extra_vpp_plugin_config.append(
+ "add-path %s" % extern_plugin_path)
if cls.get_vpp_worker_count():
cls.vpp_cmdline.extend([
"corelist-workers", ",".join([str(x) for x in cls.cpus[1:]])])
if cls.extra_vpp_punt_config is not None:
cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
- if plugin_path is not None:
- cls.vpp_cmdline.extend(["plugin_path", plugin_path])
- if cls.test_plugin_path is not None:
- cls.vpp_cmdline.extend(["test_plugin_path", cls.test_plugin_path])
if not cls.debug_attach:
cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
vpp_api_trace_log))
os.rename(tmp_api_trace, vpp_api_trace_log)
- self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
- vpp_api_trace_log))
except VppTransportSocketIOError:
self.logger.debug("VppTransportSocketIOError: Vpp dead. "
"Cannot log show commands.")
:returns: string containing serialized data from packet info
"""
- return "%d %d %d %d %d" % (info.index, info.src, info.dst,
- info.ip, info.proto)
+
+ # retrieve payload, currently 18 bytes (4 x ints + 1 short)
+ return pack('iiiih', info.index, info.src,
+ info.dst, info.ip, info.proto)
@staticmethod
def payload_to_info(payload, payload_field='load'):
:returns: _PacketInfo object containing de-serialized data from payload
"""
- numbers = getattr(payload, payload_field).split()
+
+ # retrieve payload, currently 18 bytes (4 x ints + 1 short)
+ payload_b = getattr(payload, payload_field)[:18]
+
info = _PacketInfo()
- info.index = int(numbers[0])
- info.src = int(numbers[1])
- info.dst = int(numbers[2])
- info.ip = int(numbers[3])
- info.proto = int(numbers[4])
+ info.index, info.src, info.dst, info.ip, info.proto \
+ = unpack('iiiih', payload_b)
+
+ # some SRv6 TCs depend on get an exception if bad values are detected
+ if info.index > 0x4000:
+ raise ValueError('Index value is invalid')
+
return info
def get_next_packet_info(self, info):
"Finished sleep (%s) - slept %es (wanted %es)",
remark, after - before, timeout)
+ def virtual_sleep(self, timeout, remark=None):
+ self.logger.debug("Moving VPP time by %s (%s)", timeout, remark)
+ self.vapi.cli("set clock adjust %s" % timeout)
+
def pg_send(self, intf, pkts, worker=None, trace=True):
intf.add_stream(pkts, worker=worker)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start(trace=trace)
- def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
+ def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None,
+ trace=True):
self.pg_send(intf, pkts)
if not timeout:
timeout = 1
i.get_capture(0, timeout=timeout)
i.assert_nothing_captured(remark=remark)
timeout = 0.1
+ if trace:
+ self.logger.debug(self.vapi.cli("show trace"))
def send_and_expect(self, intf, pkts, output, n_rx=None, worker=None,
trace=True):
n_rx = len(pkts)
self.pg_send(intf, pkts, worker=worker, trace=trace)
rx = output.get_capture(n_rx)
+ if trace:
+ self.logger.debug(self.vapi.cli("show trace"))
return rx
def send_and_expect_only(self, intf, pkts, output, timeout=None):