from logging import FileHandler, DEBUG, Formatter
from enum import Enum
from abc import ABC, abstractmethod
+from struct import pack, unpack
import scapy.compat
from scapy.packet import Raw
from vpp_lo_interface import VppLoInterface
from vpp_bvi_interface import VppBviInterface
from vpp_papi_provider import VppPapiProvider
+from vpp_papi import VppEnum
import vpp_papi
from vpp_papi.vpp_stats import VPPStats
from vpp_papi.vpp_transport_socket import VppTransportSocketIOError
c = os.getenv("CACHE_OUTPUT", "1")
cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
- cls.plugin_path = os.getenv('VPP_PLUGIN_PATH')
- cls.test_plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
- cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
- plugin_path = None
- if cls.plugin_path is not None:
- if cls.extern_plugin_path is not None:
- plugin_path = "%s:%s" % (
- cls.plugin_path, cls.extern_plugin_path)
- else:
- plugin_path = cls.plugin_path
- elif cls.extern_plugin_path is not None:
- plugin_path = cls.extern_plugin_path
+ extern_plugin_path = os.getenv('EXTERN_PLUGINS')
debug_cli = ""
if cls.step or cls.debug_gdb or cls.debug_gdbserver:
debug_cli = "cli-listen localhost:5002"
"api-trace", "{", "on", "}",
"api-segment", "{", "prefix", cls.get_api_segment_prefix(), "}",
"cpu", "{", "main-core", str(cls.cpus[0]), ]
+ if extern_plugin_path is not None:
+ cls.extra_vpp_plugin_config.append(
+ "add-path %s" % extern_plugin_path)
if cls.get_vpp_worker_count():
cls.vpp_cmdline.extend([
"corelist-workers", ",".join([str(x) for x in cls.cpus[1:]])])
if cls.extra_vpp_punt_config is not None:
cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
- if plugin_path is not None:
- cls.vpp_cmdline.extend(["plugin_path", plugin_path])
- if cls.test_plugin_path is not None:
- cls.vpp_cmdline.extend(["test_plugin_path", cls.test_plugin_path])
if not cls.debug_attach:
cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
vpp_api_trace_log))
os.rename(tmp_api_trace, vpp_api_trace_log)
- self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
- vpp_api_trace_log))
except VppTransportSocketIOError:
self.logger.debug("VppTransportSocketIOError: Vpp dead. "
"Cannot log show commands.")
cls._pcaps = []
@classmethod
- def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0):
+ def create_pg_interfaces_internal(cls, interfaces, gso=0, gso_size=0,
+ mode=None):
"""
Create packet-generator interfaces.
"""
result = []
for i in interfaces:
- intf = VppPGInterface(cls, i, gso, gso_size)
+ intf = VppPGInterface(cls, i, gso, gso_size, mode)
setattr(cls, intf.name, intf)
result.append(intf)
cls.pg_interfaces = result
return result
+ @classmethod
+ def create_pg_ip4_interfaces(cls, interfaces, gso=0, gso_size=0):
+ pgmode = VppEnum.vl_api_pg_interface_mode_t
+ return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
+ pgmode.PG_API_MODE_IP4)
+
+ @classmethod
+ def create_pg_ip6_interfaces(cls, interfaces, gso=0, gso_size=0):
+ pgmode = VppEnum.vl_api_pg_interface_mode_t
+ return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
+ pgmode.PG_API_MODE_IP6)
+
+ @classmethod
+ def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0):
+ pgmode = VppEnum.vl_api_pg_interface_mode_t
+ return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
+ pgmode.PG_API_MODE_ETHERNET)
+
+ @classmethod
+ def create_pg_ethernet_interfaces(cls, interfaces, gso=0, gso_size=0):
+ pgmode = VppEnum.vl_api_pg_interface_mode_t
+ return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
+ pgmode.PG_API_MODE_ETHERNET)
+
@classmethod
def create_loopback_interfaces(cls, count):
"""
:returns: string containing serialized data from packet info
"""
- return "%d %d %d %d %d" % (info.index, info.src, info.dst,
- info.ip, info.proto)
+
+ # retrieve payload, currently 18 bytes (4 x ints + 1 short)
+ return pack('iiiih', info.index, info.src,
+ info.dst, info.ip, info.proto)
@staticmethod
def payload_to_info(payload, payload_field='load'):
:returns: _PacketInfo object containing de-serialized data from payload
"""
- numbers = getattr(payload, payload_field).split()
+
+ # retrieve payload, currently 18 bytes (4 x ints + 1 short)
+ payload_b = getattr(payload, payload_field)[:18]
+
info = _PacketInfo()
- info.index = int(numbers[0])
- info.src = int(numbers[1])
- info.dst = int(numbers[2])
- info.ip = int(numbers[3])
- info.proto = int(numbers[4])
+ info.index, info.src, info.dst, info.ip, info.proto \
+ = unpack('iiiih', payload_b)
+
+ # some SRv6 TCs depend on get an exception if bad values are detected
+ if info.index > 0x4000:
+ raise ValueError('Index value is invalid')
+
return info
def get_next_packet_info(self, info):
n_rx = len(pkts)
self.pg_send(intf, pkts, worker=worker, trace=trace)
rx = output.get_capture(n_rx)
+ if trace:
+ self.logger.debug(self.vapi.cli("show trace"))
return rx
def send_and_expect_only(self, intf, pkts, output, timeout=None):
self.result = os.EX_OSFILE
raise EnvironmentError(
"executable '%s' is not found or executable." % executable)
- self.logger.debug("Running executable: '{app}'"
- .format(app=' '.join(self.args)))
+ self.logger.debug("Running executable '{app}': '{cmd}'"
+ .format(app=self.app_name,
+ cmd=' '.join(self.args)))
env = os.environ.copy()
env.update(self.env)
env["CK_LOG_FILE_NAME"] = "-"
self.process = subprocess.Popen(
- self.args, shell=False, env=env, preexec_fn=os.setpgrp,
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ ['stdbuf', '-o0', '-e0'] + self.args, shell=False, env=env,
+ preexec_fn=os.setpgrp, stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
self.wait_for_enter()
out, err = self.process.communicate()
self.logger.debug("Finished running `{app}'".format(app=self.app_name))