from logging import FileHandler, DEBUG, Formatter
from scapy.packet import Raw
from hook import StepHook, PollHook, VppDiedError
-from vpp_pg_interface import VppPGInterface
+from vpp_config import VppTestCaseVppConfig
+from vpp_interface import VppInterface
from vpp_sub_interface import VppSubInterface
+from vpp_pg_interface import VppPGInterface
from vpp_lo_interface import VppLoInterface
from vpp_papi_provider import VppPapiProvider
from vpp_papi.vpp_stats import VPPStats
else:
import subprocess
+# Python2/3 compatible
+try:
+ input = raw_input
+except NameError:
+ pass
+
PASS = 0
FAIL = 1
ERROR = 2
# flag will take care of properly terminating the loop
-def is_skip_aarch64_set():
+def _is_skip_aarch64_set():
return os.getenv('SKIP_AARCH64', 'n').lower() in ('yes', 'y', '1')
+is_skip_aarch64_set = _is_skip_aarch64_set()
+
-def is_platform_aarch64():
+def _is_platform_aarch64():
return platform.machine() == 'aarch64'
+is_platform_aarch64 = _is_platform_aarch64()
-def running_extended_tests():
+
+def _running_extended_tests():
s = os.getenv("EXTENDED_TESTS", "n")
return True if s.lower() in ("y", "yes", "1") else False
+running_extended_tests = _running_extended_tests()
+
-def running_on_centos():
+def _running_on_centos():
os_id = os.getenv("OS_ID", "")
return True if "centos" in os_id.lower() else False
+running_on_centos = _running_on_centos
+
class KeepAliveReporter(object):
"""
classes. It provides methods to create and run test case.
"""
- extra_vpp_punt_config = []
+ CLI_LISTEN_DEFAULT = 'localhost:5002'
+ config = VppTestCaseVppConfig()
@property
def packet_infos(self):
plugin_path = cls.plugin_path
elif cls.extern_plugin_path is not None:
plugin_path = cls.extern_plugin_path
- debug_cli = ""
+
if cls.step or cls.debug_gdb or cls.debug_gdbserver:
- debug_cli = "cli-listen localhost:5002"
+ cls.config.add('unix', 'cli-listen', cls.CLI_LISTEN_DEFAULT)
+
coredump_size = None
size = os.getenv("COREDUMP_SIZE")
- if size is not None:
- coredump_size = "coredump-size %s" % size
- if coredump_size is None:
- coredump_size = "coredump-size unlimited"
-
- cpu_core_number = cls.get_least_used_cpu()
-
- cls.vpp_cmdline = [cls.vpp_bin, "unix",
- "{", "nodaemon", debug_cli, "full-coredump",
- coredump_size, "runtime-dir", cls.tempdir, "}",
- "api-trace", "{", "on", "}", "api-segment", "{",
- "prefix", cls.shm_prefix, "}", "cpu", "{",
- "main-core", str(cpu_core_number), "}", "statseg",
- "{", "socket-name", cls.stats_sock, "}", "plugins",
- "{", "plugin", "dpdk_plugin.so", "{", "disable",
- "}", "plugin", "unittest_plugin.so", "{", "enable",
- "}", "}", ]
- if cls.extra_vpp_punt_config is not None:
- cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
+ cls.config.add('unix', 'coredump-size',
+ size if size is not None else 'unlimited')
+
+ cls.config.add('unix', 'runtime-dir', cls.tempdir)
+ cls.config.add('api-segment', 'prefix', cls.shm_prefix)
+ cls.config.add('cpu', 'main-core', str(cls.get_least_used_cpu()))
+ cls.config.add('statseg', 'socket-name', cls.stats_sock)
+
if plugin_path is not None:
- cls.vpp_cmdline.extend(["plugin_path", plugin_path])
+ cls.config.add('plugins', 'path', plugin_path)
+ cls.config.add_plugin('dpdk_plugin.so', 'disable')
+ cls.config.add_plugin('unittest_plugin.so', 'enable')
+
+ cls.vpp_cmdline = [cls.vpp_bin] + cls.config.shlex()
cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
print("Now is the time to attach a gdb by running the above "
"command and set up breakpoints etc.")
print(single_line_delim)
- raw_input("Press ENTER to continue running the testcase...")
+ input("Press ENTER to continue running the testcase...")
@classmethod
def run_vpp(cls):
print(double_line_delim)
print("VPP or GDB server is still running")
print(single_line_delim)
- raw_input("When done debugging, press ENTER to kill the "
- "process and finish running the testcase...")
+ input("When done debugging, press ENTER to kill the "
+ "process and finish running the testcase...")
# first signal that we want to stop the pump thread, then wake it up
if hasattr(cls, 'pump_thread_stop_flag'):
for cf in checksum_fields:
if hasattr(layer, cf):
if ignore_zero_udp_checksums and \
- 0 == getattr(layer, cf) and \
- layer.name in udp_layers:
+ 0 == getattr(layer, cf) and \
+ layer.name in udp_layers:
continue
delattr(layer, cf)
checksums.append((counter, cf))
self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
def assert_packet_counter_equal(self, counter, expected_value):
- counters = self.vapi.cli("sh errors").split('\n')
- counter_value = -1
- for i in range(1, len(counters) - 1):
- results = counters[i].split()
- if results[1] == counter:
- counter_value = int(results[0])
- break
- self.assert_equal(counter_value, expected_value,
- "packet counter `%s'" % counter)
+ if counter.startswith("/"):
+ counter_value = self.statistics.get_counter(counter)
+ self.assert_equal(counter_value, expected_value,
+ "packet counter `%s'" % counter)
+ else:
+ counters = self.vapi.cli("sh errors").split('\n')
+ counter_value = -1
+ for i in range(1, len(counters) - 1):
+ results = counters[i].split()
+ if results[1] == counter:
+ counter_value = int(results[0])
+ break
@classmethod
def sleep(cls, timeout, remark=None):