tests: Add support for getting corefile patterns on FreeBSD
[vpp.git] / test / asf / asfframework.py
1 #!/usr/bin/env python3
2
3 from __future__ import print_function
4 import logging
5 import sys
6 import os
7 import select
8 import signal
9 import subprocess
10 import unittest
11 import re
12 import time
13 import faulthandler
14 import random
15 import copy
16 import platform
17 import shutil
18 from pathlib import Path
19 from collections import deque
20 from threading import Thread, Event
21 from inspect import getdoc, isclass
22 from traceback import format_exception
23 from logging import FileHandler, DEBUG, Formatter
24 from enum import Enum
25 from abc import ABC, abstractmethod
26
27 from config import config, max_vpp_cpus
28 import hook as hookmodule
29 from vpp_lo_interface import VppLoInterface
30 from vpp_papi_provider import VppPapiProvider
31 import vpp_papi
32 from vpp_papi.vpp_stats import VPPStats
33 from vpp_papi.vpp_transport_socket import VppTransportSocketIOError
34 from log import (
35     RED,
36     GREEN,
37     YELLOW,
38     double_line_delim,
39     single_line_delim,
40     get_logger,
41     colorize,
42 )
43 from vpp_object import VppObjectRegistry
44 from util import is_core_present
45 from test_result_code import TestResultCode
46
47 logger = logging.getLogger(__name__)
48
49 # Set up an empty logger for the testcase that can be overridden as necessary
50 null_logger = logging.getLogger("VppAsfTestCase")
51 null_logger.addHandler(logging.NullHandler())
52
53
54 if config.debug_framework:
55     import debug_internal
56
57 """
58   Test framework module.
59
60   The module provides a set of tools for constructing and running tests and
61   representing the results.
62 """
63
64
65 class VppDiedError(Exception):
66     """exception for reporting that the subprocess has died."""
67
68     signals_by_value = {
69         v: k
70         for k, v in signal.__dict__.items()
71         if k.startswith("SIG") and not k.startswith("SIG_")
72     }
73
74     def __init__(self, rv=None, testcase=None, method_name=None):
75         self.rv = rv
76         self.signal_name = None
77         self.testcase = testcase
78         self.method_name = method_name
79
80         try:
81             self.signal_name = VppDiedError.signals_by_value[-rv]
82         except (KeyError, TypeError):
83             pass
84
85         if testcase is None and method_name is None:
86             in_msg = ""
87         else:
88             in_msg = " while running %s.%s" % (testcase, method_name)
89
90         if self.rv:
91             msg = "VPP subprocess died unexpectedly%s with return code: %d%s." % (
92                 in_msg,
93                 self.rv,
94                 " [%s]" % (self.signal_name if self.signal_name is not None else ""),
95             )
96         else:
97             msg = "VPP subprocess died unexpectedly%s." % in_msg
98
99         super(VppDiedError, self).__init__(msg)
100
101
102 def pump_output(testclass):
103     """pump output from vpp stdout/stderr to proper queues"""
104     stdout_fragment = ""
105     stderr_fragment = ""
106     while not testclass.pump_thread_stop_flag.is_set():
107         readable = select.select(
108             [
109                 testclass.vpp.stdout.fileno(),
110                 testclass.vpp.stderr.fileno(),
111                 testclass.pump_thread_wakeup_pipe[0],
112             ],
113             [],
114             [],
115         )[0]
116         if testclass.vpp.stdout.fileno() in readable:
117             read = os.read(testclass.vpp.stdout.fileno(), 102400)
118             if len(read) > 0:
119                 split = read.decode("ascii", errors="backslashreplace").splitlines(True)
120                 if len(stdout_fragment) > 0:
121                     split[0] = "%s%s" % (stdout_fragment, split[0])
122                 if len(split) > 0 and split[-1].endswith("\n"):
123                     limit = None
124                 else:
125                     limit = -1
126                     stdout_fragment = split[-1]
127                 testclass.vpp_stdout_deque.extend(split[:limit])
128                 if not config.cache_vpp_output:
129                     for line in split[:limit]:
130                         testclass.logger.info("VPP STDOUT: %s" % line.rstrip("\n"))
131         if testclass.vpp.stderr.fileno() in readable:
132             read = os.read(testclass.vpp.stderr.fileno(), 102400)
133             if len(read) > 0:
134                 split = read.decode("ascii", errors="backslashreplace").splitlines(True)
135                 if len(stderr_fragment) > 0:
136                     split[0] = "%s%s" % (stderr_fragment, split[0])
137                 if len(split) > 0 and split[-1].endswith("\n"):
138                     limit = None
139                 else:
140                     limit = -1
141                     stderr_fragment = split[-1]
142
143                 testclass.vpp_stderr_deque.extend(split[:limit])
144                 if not config.cache_vpp_output:
145                     for line in split[:limit]:
146                         testclass.logger.error("VPP STDERR: %s" % line.rstrip("\n"))
147                         # ignoring the dummy pipe here intentionally - the
148                         # flag will take care of properly terminating the loop
149
150
151 def _is_platform_aarch64():
152     return platform.machine() == "aarch64"
153
154
155 is_platform_aarch64 = _is_platform_aarch64()
156
157
158 def _is_distro_ubuntu2204():
159     with open("/etc/os-release") as f:
160         for line in f.readlines():
161             if "jammy" in line:
162                 return True
163     return False
164
165
166 is_distro_ubuntu2204 = _is_distro_ubuntu2204()
167
168
169 def _is_distro_debian11():
170     with open("/etc/os-release") as f:
171         for line in f.readlines():
172             if "bullseye" in line:
173                 return True
174     return False
175
176
177 is_distro_debian11 = _is_distro_debian11()
178
179
180 def _is_distro_ubuntu2204():
181     with open("/etc/os-release") as f:
182         for line in f.readlines():
183             if "jammy" in line:
184                 return True
185     return False
186
187
188 class KeepAliveReporter(object):
189     """
190     Singleton object which reports test start to parent process
191     """
192
193     _shared_state = {}
194
195     def __init__(self):
196         self.__dict__ = self._shared_state
197         self._pipe = None
198
199     @property
200     def pipe(self):
201         return self._pipe
202
203     @pipe.setter
204     def pipe(self, pipe):
205         if self._pipe is not None:
206             raise Exception("Internal error - pipe should only be set once.")
207         self._pipe = pipe
208
209     def send_keep_alive(self, test, desc=None):
210         """
211         Write current test tmpdir & desc to keep-alive pipe to signal liveness
212         """
213         if self.pipe is None:
214             # if not running forked..
215             return
216
217         if isclass(test):
218             desc = "%s (%s)" % (desc, unittest.util.strclass(test))
219         else:
220             desc = test.id()
221
222         self.pipe.send((desc, config.vpp, test.tempdir, test.vpp.pid))
223
224
225 class TestCaseTag(Enum):
226     # marks the suites that must run at the end
227     # using only a single test runner
228     RUN_SOLO = 1
229     # marks the suites broken on VPP multi-worker
230     FIXME_VPP_WORKERS = 2
231     # marks the suites broken when ASan is enabled
232     FIXME_ASAN = 3
233     # marks suites broken on Ubuntu-22.04
234     FIXME_UBUNTU2204 = 4
235     # marks suites broken on Debian-11
236     FIXME_DEBIAN11 = 5
237     # marks suites broken on debug vpp image
238     FIXME_VPP_DEBUG = 6
239
240
241 def create_tag_decorator(e):
242     def decorator(cls):
243         try:
244             cls.test_tags.append(e)
245         except AttributeError:
246             cls.test_tags = [e]
247         return cls
248
249     return decorator
250
251
252 tag_run_solo = create_tag_decorator(TestCaseTag.RUN_SOLO)
253 tag_fixme_vpp_workers = create_tag_decorator(TestCaseTag.FIXME_VPP_WORKERS)
254 tag_fixme_asan = create_tag_decorator(TestCaseTag.FIXME_ASAN)
255 tag_fixme_ubuntu2204 = create_tag_decorator(TestCaseTag.FIXME_UBUNTU2204)
256 tag_fixme_debian11 = create_tag_decorator(TestCaseTag.FIXME_DEBIAN11)
257 tag_fixme_vpp_debug = create_tag_decorator(TestCaseTag.FIXME_VPP_DEBUG)
258
259
260 class DummyVpp:
261     returncode = None
262     pid = 0xCAFEBAFE
263
264     def poll(self):
265         pass
266
267     def terminate(self):
268         pass
269
270
271 class CPUInterface(ABC):
272     cpus = []
273     skipped_due_to_cpu_lack = False
274
275     @classmethod
276     @abstractmethod
277     def get_cpus_required(cls):
278         pass
279
280     @classmethod
281     def assign_cpus(cls, cpus):
282         cls.cpus = cpus
283
284
285 class VppAsfTestCase(CPUInterface, unittest.TestCase):
286     """This subclass is a base class for VPP test cases that are implemented as
287     classes. It provides methods to create and run test case.
288     """
289
290     extra_vpp_statseg_config = ""
291     extra_vpp_config = []
292     extra_vpp_plugin_config = []
293     logger = null_logger
294     vapi_response_timeout = 5
295     remove_configured_vpp_objects_on_tear_down = True
296
297     @classmethod
298     def has_tag(cls, tag):
299         """if the test case has a given tag - return true"""
300         try:
301             return tag in cls.test_tags
302         except AttributeError:
303             pass
304         return False
305
306     @classmethod
307     def is_tagged_run_solo(cls):
308         """if the test case class is timing-sensitive - return true"""
309         return cls.has_tag(TestCaseTag.RUN_SOLO)
310
311     @classmethod
312     def skip_fixme_asan(cls):
313         """if @tag_fixme_asan & ASan is enabled - mark for skip"""
314         if cls.has_tag(TestCaseTag.FIXME_ASAN):
315             vpp_extra_cmake_args = os.environ.get("VPP_EXTRA_CMAKE_ARGS", "")
316             if "DVPP_ENABLE_SANITIZE_ADDR=ON" in vpp_extra_cmake_args:
317                 cls = unittest.skip("Skipping @tag_fixme_asan tests")(cls)
318
319     @classmethod
320     def instance(cls):
321         """Return the instance of this testcase"""
322         return cls.test_instance
323
324     @classmethod
325     def set_debug_flags(cls, d):
326         cls.gdbserver_port = 7777
327         cls.debug_core = False
328         cls.debug_gdb = False
329         cls.debug_gdbserver = False
330         cls.debug_all = False
331         cls.debug_attach = False
332         if d is None:
333             return
334         dl = d.lower()
335         if dl == "core":
336             cls.debug_core = True
337         elif dl == "gdb" or dl == "gdb-all":
338             cls.debug_gdb = True
339         elif dl == "gdbserver" or dl == "gdbserver-all":
340             cls.debug_gdbserver = True
341         elif dl == "attach":
342             cls.debug_attach = True
343         else:
344             raise Exception("Unrecognized DEBUG option: '%s'" % d)
345         if dl == "gdb-all" or dl == "gdbserver-all":
346             cls.debug_all = True
347
348     @classmethod
349     def get_vpp_worker_count(cls):
350         if not hasattr(cls, "vpp_worker_count"):
351             if cls.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
352                 cls.vpp_worker_count = 0
353             else:
354                 cls.vpp_worker_count = config.vpp_worker_count
355         return cls.vpp_worker_count
356
357     @classmethod
358     def get_cpus_required(cls):
359         return 1 + cls.get_vpp_worker_count()
360
361     @classmethod
362     def setUpConstants(cls):
363         """Set-up the test case class based on environment variables"""
364         cls.step = config.step
365         cls.plugin_path = ":".join(config.vpp_plugin_dir)
366         cls.test_plugin_path = ":".join(config.vpp_test_plugin_dir)
367         cls.extern_plugin_path = ":".join(config.extern_plugin_dir)
368         debug_cli = ""
369         if cls.step or cls.debug_gdb or cls.debug_gdbserver:
370             debug_cli = "cli-listen localhost:5002"
371         size = re.search(r"\d+[gG]", config.coredump_size)
372         if size:
373             coredump_size = f"coredump-size {config.coredump_size}".lower()
374         else:
375             coredump_size = "coredump-size unlimited"
376         default_variant = config.variant
377         if default_variant is not None:
378             default_variant = "default { variant %s 100 }" % default_variant
379         else:
380             default_variant = ""
381
382         api_fuzzing = config.api_fuzz
383         if api_fuzzing is None:
384             api_fuzzing = "off"
385
386         cls.vpp_cmdline = [
387             config.vpp,
388             "unix",
389             "{",
390             "nodaemon",
391             debug_cli,
392             "full-coredump",
393             coredump_size,
394             "runtime-dir",
395             cls.tempdir,
396             "}",
397             "api-trace",
398             "{",
399             "on",
400             "}",
401             "api-segment",
402             "{",
403             "prefix",
404             cls.get_api_segment_prefix(),
405             "}",
406             "cpu",
407             "{",
408             "main-core",
409             str(cls.cpus[0]),
410         ]
411         if cls.extern_plugin_path not in (None, ""):
412             cls.extra_vpp_plugin_config.append("add-path %s" % cls.extern_plugin_path)
413         if cls.get_vpp_worker_count():
414             cls.vpp_cmdline.extend(
415                 ["corelist-workers", ",".join([str(x) for x in cls.cpus[1:]])]
416             )
417         cls.vpp_cmdline.extend(
418             [
419                 "}",
420                 "physmem",
421                 "{",
422                 "max-size",
423                 "32m",
424                 "}",
425                 "statseg",
426                 "{",
427                 "socket-name",
428                 cls.get_stats_sock_path(),
429                 cls.extra_vpp_statseg_config,
430                 "}",
431                 "socksvr",
432                 "{",
433                 "socket-name",
434                 cls.get_api_sock_path(),
435                 "}",
436                 "node { ",
437                 default_variant,
438                 "}",
439                 "api-fuzz {",
440                 api_fuzzing,
441                 "}",
442                 "plugins",
443                 "{",
444                 "plugin",
445                 "dpdk_plugin.so",
446                 "{",
447                 "disable",
448                 "}",
449                 "plugin",
450                 "rdma_plugin.so",
451                 "{",
452                 "disable",
453                 "}",
454                 "plugin",
455                 "lisp_unittest_plugin.so",
456                 "{",
457                 "enable",
458                 "}",
459                 "plugin",
460                 "unittest_plugin.so",
461                 "{",
462                 "enable",
463                 "}",
464             ]
465             + cls.extra_vpp_plugin_config
466             + [
467                 "}",
468             ]
469         )
470
471         if cls.extra_vpp_config is not None:
472             cls.vpp_cmdline.extend(cls.extra_vpp_config)
473
474         if not cls.debug_attach:
475             cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
476             cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
477
478     @classmethod
479     def wait_for_enter(cls):
480         if cls.debug_gdbserver:
481             print(double_line_delim)
482             print("Spawned GDB server with PID: %d" % cls.vpp.pid)
483         elif cls.debug_gdb:
484             print(double_line_delim)
485             print("Spawned VPP with PID: %d" % cls.vpp.pid)
486         else:
487             cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
488             return
489         print(single_line_delim)
490         print("You can debug VPP using:")
491         if cls.debug_gdbserver:
492             print(
493                 f"sudo gdb {config.vpp} "
494                 f"-ex 'target remote localhost:{cls.gdbserver_port}'"
495             )
496             print(
497                 "Now is the time to attach gdb by running the above "
498                 "command, set up breakpoints etc., then resume VPP from "
499                 "within gdb by issuing the 'continue' command"
500             )
501             cls.gdbserver_port += 1
502         elif cls.debug_gdb:
503             print(f"sudo gdb {config.vpp} -ex 'attach {cls.vpp.pid}'")
504             print(
505                 "Now is the time to attach gdb by running the above "
506                 "command and set up breakpoints etc., then resume VPP from"
507                 " within gdb by issuing the 'continue' command"
508             )
509         print(single_line_delim)
510         input("Press ENTER to continue running the testcase...")
511
512     @classmethod
513     def attach_vpp(cls):
514         cls.vpp = DummyVpp()
515
516     @classmethod
517     def run_vpp(cls):
518         cls.logger.debug(f"Assigned cpus: {cls.cpus}")
519         cmdline = cls.vpp_cmdline
520
521         if cls.debug_gdbserver:
522             gdbserver = "/usr/bin/gdbserver"
523             if not os.path.isfile(gdbserver) or not os.access(gdbserver, os.X_OK):
524                 raise Exception(
525                     "gdbserver binary '%s' does not exist or is "
526                     "not executable" % gdbserver
527                 )
528
529             cmdline = [
530                 gdbserver,
531                 "localhost:{port}".format(port=cls.gdbserver_port),
532             ] + cls.vpp_cmdline
533             cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
534
535         try:
536             cls.vpp = subprocess.Popen(
537                 cmdline, stdout=subprocess.PIPE, stderr=subprocess.PIPE
538             )
539         except subprocess.CalledProcessError as e:
540             cls.logger.critical(
541                 "Subprocess returned with non-0 return code: (%s)", e.returncode
542             )
543             raise
544         except OSError as e:
545             cls.logger.critical(
546                 "Subprocess returned with OS error: (%s) %s", e.errno, e.strerror
547             )
548             raise
549         except Exception as e:
550             cls.logger.exception("Subprocess returned unexpected from %s:", cmdline)
551             raise
552
553         cls.wait_for_enter()
554
555     @classmethod
556     def wait_for_coredump(cls):
557         corefile = cls.tempdir + "/core"
558         if os.path.isfile(corefile):
559             cls.logger.error("Waiting for coredump to complete: %s", corefile)
560             curr_size = os.path.getsize(corefile)
561             deadline = time.time() + 60
562             ok = False
563             while time.time() < deadline:
564                 cls.sleep(1)
565                 size = curr_size
566                 curr_size = os.path.getsize(corefile)
567                 if size == curr_size:
568                     ok = True
569                     break
570             if not ok:
571                 cls.logger.error(
572                     "Timed out waiting for coredump to complete: %s", corefile
573                 )
574             else:
575                 cls.logger.error("Coredump complete: %s, size %d", corefile, curr_size)
576
577     @classmethod
578     def get_stats_sock_path(cls):
579         return "%s/stats.sock" % cls.tempdir
580
581     @classmethod
582     def get_api_sock_path(cls):
583         return "%s/api.sock" % cls.tempdir
584
585     @classmethod
586     def get_memif_sock_path(cls):
587         return "%s/memif.sock" % cls.tempdir
588
589     @classmethod
590     def get_api_segment_prefix(cls):
591         return os.path.basename(cls.tempdir)  # Only used for VAPI
592
593     @classmethod
594     def get_tempdir(cls):
595         if cls.debug_attach:
596             tmpdir = f"{config.tmp_dir}/unittest-attach-gdb"
597         else:
598             tmpdir = f"{config.tmp_dir}/{get_testcase_dirname(cls.__name__)}"
599             if config.wipe_tmp_dir:
600                 shutil.rmtree(tmpdir, ignore_errors=True)
601             os.mkdir(tmpdir)
602         return tmpdir
603
604     @classmethod
605     def create_file_handler(cls):
606         if config.log_dir is None:
607             cls.file_handler = FileHandler(f"{cls.tempdir}/log.txt")
608             return
609
610         logdir = f"{config.log_dir}/{get_testcase_dirname(cls.__name__)}"
611         if config.wipe_tmp_dir:
612             shutil.rmtree(logdir, ignore_errors=True)
613         os.mkdir(logdir)
614         cls.file_handler = FileHandler(f"{logdir}/log.txt")
615
616     @classmethod
617     def setUpClass(cls):
618         """
619         Perform class setup before running the testcase
620         Remove shared memory files, start vpp and connect the vpp-api
621         """
622         super(VppAsfTestCase, cls).setUpClass()
623         cls.logger = get_logger(cls.__name__)
624         cls.logger.debug(f"--- START setUpClass() {cls.__name__} ---")
625         random.seed(config.rnd_seed)
626         if hasattr(cls, "parallel_handler"):
627             cls.logger.addHandler(cls.parallel_handler)
628             cls.logger.propagate = False
629         cls.set_debug_flags(config.debug)
630         cls.tempdir = cls.get_tempdir()
631         cls.create_file_handler()
632         cls.file_handler.setFormatter(
633             Formatter(fmt="%(asctime)s,%(msecs)03d %(message)s", datefmt="%H:%M:%S")
634         )
635         cls.file_handler.setLevel(DEBUG)
636         cls.logger.addHandler(cls.file_handler)
637         cls.logger.debug("--- setUpClass() for %s called ---" % cls.__name__)
638         os.chdir(cls.tempdir)
639         cls.logger.info(
640             "Temporary dir is %s, api socket is %s",
641             cls.tempdir,
642             cls.get_api_sock_path(),
643         )
644         cls.logger.debug("Random seed is %s", config.rnd_seed)
645         cls.setUpConstants()
646         cls.verbose = 0
647         cls.vpp_dead = False
648         cls.registry = VppObjectRegistry()
649         cls.vpp_startup_failed = False
650         cls.reporter = KeepAliveReporter()
651         # need to catch exceptions here because if we raise, then the cleanup
652         # doesn't get called and we might end with a zombie vpp
653         try:
654             if cls.debug_attach:
655                 cls.attach_vpp()
656             else:
657                 cls.run_vpp()
658             cls.reporter.send_keep_alive(cls, "setUpClass")
659             VppTestResult.current_test_case_info = TestCaseInfo(
660                 cls.logger, cls.tempdir, cls.vpp.pid, config.vpp
661             )
662             cls.vpp_stdout_deque = deque()
663             cls.vpp_stderr_deque = deque()
664             if not cls.debug_attach:
665                 cls.pump_thread_stop_flag = Event()
666                 cls.pump_thread_wakeup_pipe = os.pipe()
667                 cls.pump_thread = Thread(target=pump_output, args=(cls,))
668                 cls.pump_thread.daemon = True
669                 cls.pump_thread.start()
670             if cls.debug_gdb or cls.debug_gdbserver or cls.debug_attach:
671                 cls.vapi_response_timeout = 0
672             cls.vapi = VppPapiProvider(cls.__name__, cls, cls.vapi_response_timeout)
673             if cls.step:
674                 hook = hookmodule.StepHook(cls)
675             else:
676                 hook = hookmodule.PollHook(cls)
677             cls.vapi.register_hook(hook)
678             cls.statistics = VPPStats(socketname=cls.get_stats_sock_path())
679             try:
680                 hook.poll_vpp()
681             except VppDiedError:
682                 cls.wait_for_coredump()
683                 cls.vpp_startup_failed = True
684                 cls.logger.critical(
685                     "VPP died shortly after startup, check the"
686                     " output to standard error for possible cause"
687                 )
688                 raise
689             try:
690                 cls.vapi.connect()
691             except (vpp_papi.VPPIOError, Exception) as e:
692                 cls.logger.debug("Exception connecting to vapi: %s" % e)
693                 cls.vapi.disconnect()
694
695                 if cls.debug_gdbserver:
696                     print(
697                         colorize(
698                             "You're running VPP inside gdbserver but "
699                             "VPP-API connection failed, did you forget "
700                             "to 'continue' VPP from within gdb?",
701                             RED,
702                         )
703                     )
704                 raise e
705             if cls.debug_attach:
706                 last_line = cls.vapi.cli("show thread").split("\n")[-2]
707                 cls.vpp_worker_count = int(last_line.split(" ")[0])
708                 print("Detected VPP with %s workers." % cls.vpp_worker_count)
709         except vpp_papi.VPPRuntimeError as e:
710             cls.logger.debug("%s" % e)
711             cls.quit()
712             raise e
713         except Exception as e:
714             cls.logger.debug("Exception connecting to VPP: %s" % e)
715             cls.quit()
716             raise e
717         cls.logger.debug(f"--- END setUpClass() {cls.__name__} ---")
718
719     @classmethod
720     def _debug_quit(cls):
721         if cls.debug_gdbserver or cls.debug_gdb:
722             try:
723                 cls.vpp.poll()
724
725                 if cls.vpp.returncode is None:
726                     print()
727                     print(double_line_delim)
728                     print("VPP or GDB server is still running")
729                     print(single_line_delim)
730                     input(
731                         "When done debugging, press ENTER to kill the "
732                         "process and finish running the testcase..."
733                     )
734             except AttributeError:
735                 pass
736
737     @classmethod
738     def quit(cls):
739         """
740         Disconnect vpp-api, kill vpp and cleanup shared memory files
741         """
742         cls._debug_quit()
743
744         # first signal that we want to stop the pump thread, then wake it up
745         if hasattr(cls, "pump_thread_stop_flag"):
746             cls.pump_thread_stop_flag.set()
747         if hasattr(cls, "pump_thread_wakeup_pipe"):
748             os.write(cls.pump_thread_wakeup_pipe[1], b"ding dong wake up")
749         if hasattr(cls, "pump_thread"):
750             cls.logger.debug("Waiting for pump thread to stop")
751             cls.pump_thread.join()
752         if hasattr(cls, "vpp_stderr_reader_thread"):
753             cls.logger.debug("Waiting for stderr pump to stop")
754             cls.vpp_stderr_reader_thread.join()
755
756         if hasattr(cls, "vpp"):
757             if hasattr(cls, "vapi"):
758                 cls.logger.debug(cls.vapi.vpp.get_stats())
759                 cls.logger.debug("Disconnecting class vapi client on %s", cls.__name__)
760                 cls.vapi.disconnect()
761                 cls.logger.debug("Deleting class vapi attribute on %s", cls.__name__)
762                 del cls.vapi
763             cls.vpp.poll()
764             if not cls.debug_attach and cls.vpp.returncode is None:
765                 cls.wait_for_coredump()
766                 cls.logger.debug("Sending TERM to vpp")
767                 cls.vpp.terminate()
768                 cls.logger.debug("Waiting for vpp to die")
769                 try:
770                     outs, errs = cls.vpp.communicate(timeout=5)
771                 except subprocess.TimeoutExpired:
772                     cls.vpp.kill()
773                     outs, errs = cls.vpp.communicate()
774             cls.logger.debug("Deleting class vpp attribute on %s", cls.__name__)
775             if not cls.debug_attach:
776                 cls.vpp.stdout.close()
777                 cls.vpp.stderr.close()
778             del cls.vpp
779
780         if cls.vpp_startup_failed:
781             stdout_log = cls.logger.info
782             stderr_log = cls.logger.critical
783         else:
784             stdout_log = cls.logger.info
785             stderr_log = cls.logger.info
786
787         if hasattr(cls, "vpp_stdout_deque"):
788             stdout_log(single_line_delim)
789             stdout_log("VPP output to stdout while running %s:", cls.__name__)
790             stdout_log(single_line_delim)
791             vpp_output = "".join(cls.vpp_stdout_deque)
792             with open(cls.tempdir + "/vpp_stdout.txt", "w") as f:
793                 f.write(vpp_output)
794             stdout_log("\n%s", vpp_output)
795             stdout_log(single_line_delim)
796
797         if hasattr(cls, "vpp_stderr_deque"):
798             stderr_log(single_line_delim)
799             stderr_log("VPP output to stderr while running %s:", cls.__name__)
800             stderr_log(single_line_delim)
801             vpp_output = "".join(cls.vpp_stderr_deque)
802             with open(cls.tempdir + "/vpp_stderr.txt", "w") as f:
803                 f.write(vpp_output)
804             stderr_log("\n%s", vpp_output)
805             stderr_log(single_line_delim)
806
807     @classmethod
808     def tearDownClass(cls):
809         """Perform final cleanup after running all tests in this test-case"""
810         cls.logger.debug(f"--- START tearDownClass() {cls.__name__} ---")
811         cls.reporter.send_keep_alive(cls, "tearDownClass")
812         cls.quit()
813         cls.file_handler.close()
814         if config.debug_framework:
815             debug_internal.on_tear_down_class(cls)
816         cls.logger.debug(f"--- END tearDownClass() {cls.__name__} ---")
817
818     def show_commands_at_teardown(self):
819         """Allow subclass specific teardown logging additions."""
820         self.logger.info("--- No test specific show commands provided. ---")
821
822     def unlink_testcase_file(self, path):
823         MAX_ATTEMPTS = 9
824         retries = MAX_ATTEMPTS
825         while retries > 0:
826             retries = retries - 1
827             self.logger.debug(f"Unlinking {path}")
828             try:
829                 path.unlink()
830             # Loop until unlink() fails with FileNotFoundError to ensure file is removed
831             except FileNotFoundError:
832                 break
833             except OSError:
834                 self.logger.debug(f"OSError: unlinking {path}")
835                 self.sleep(0.25, f"{retries} retries left")
836         if retries == 0 and os.path.isfile(path):
837             self.logger.error(
838                 f"Unable to delete testcase file in {MAX_ATTEMPTS} attempts: {path}"
839             )
840             raise OSError
841
842     def tearDown(self):
843         """Show various debug prints after each test"""
844         self.logger.debug(
845             f"--- START tearDown() {self.__class__.__name__}.{self._testMethodName}({self._testMethodDoc}) ---"
846         )
847
848         try:
849             if not self.vpp_dead:
850                 self.logger.debug(self.vapi.cli("show trace max 1000"))
851                 self.logger.info(self.vapi.ppcli("show interface"))
852                 self.logger.info(self.vapi.ppcli("show hardware"))
853                 self.logger.info(self.statistics.set_errors_str())
854                 self.logger.info(self.vapi.ppcli("show run"))
855                 self.logger.info(self.vapi.ppcli("show log"))
856                 self.logger.info(self.vapi.ppcli("show bihash"))
857                 self.logger.info("Logging testcase specific show commands.")
858                 self.show_commands_at_teardown()
859                 if self.remove_configured_vpp_objects_on_tear_down:
860                     self.registry.remove_vpp_config(self.logger)
861             # Save/Dump VPP api trace log
862             m = self._testMethodName
863             api_trace = "vpp_api_trace.%s.%d.log" % (m, self.vpp.pid)
864             tmp_api_trace = "/tmp/%s" % api_trace
865             vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
866             self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
867             self.logger.info("Moving %s to %s\n" % (tmp_api_trace, vpp_api_trace_log))
868             shutil.move(tmp_api_trace, vpp_api_trace_log)
869         except VppTransportSocketIOError:
870             self.logger.debug(
871                 "VppTransportSocketIOError: Vpp dead. Cannot log show commands."
872             )
873             self.vpp_dead = True
874         else:
875             self.registry.unregister_all(self.logger)
876         # Remove any leftover pcap files
877         if hasattr(self, "pg_interfaces") and len(self.pg_interfaces) > 0:
878             testcase_dir = os.path.dirname(self.pg_interfaces[0].out_path)
879             for p in Path(testcase_dir).glob("pg*.pcap"):
880                 self.unlink_testcase_file(p)
881         self.logger.debug(
882             f"--- END tearDown() {self.__class__.__name__}.{self._testMethodName}('{self._testMethodDoc}') ---"
883         )
884
885     def setUp(self):
886         """Clear trace before running each test"""
887         super(VppAsfTestCase, self).setUp()
888         self.logger.debug(
889             f"--- START setUp() {self.__class__.__name__}.{self._testMethodName}('{self._testMethodDoc}') ---"
890         )
891         # Save testname include in pcap history filenames
892         if hasattr(self, "pg_interfaces"):
893             for i in self.pg_interfaces:
894                 i.test_name = self._testMethodName
895         self.reporter.send_keep_alive(self)
896         if self.vpp_dead:
897             self.wait_for_coredump()
898             raise VppDiedError(
899                 rv=None,
900                 testcase=self.__class__.__name__,
901                 method_name=self._testMethodName,
902             )
903         self.sleep(0.1, "during setUp")
904         self.vpp_stdout_deque.append(
905             "--- test setUp() for %s.%s(%s) starts here ---\n"
906             % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
907         )
908         self.vpp_stderr_deque.append(
909             "--- test setUp() for %s.%s(%s) starts here ---\n"
910             % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
911         )
912         self.vapi.cli("clear trace")
913         # store the test instance inside the test class - so that objects
914         # holding the class can access instance methods (like assertEqual)
915         type(self).test_instance = self
916         self.logger.debug(
917             f"--- END setUp() {self.__class__.__name__}.{self._testMethodName}('{self._testMethodDoc}') ---"
918         )
919
920     @classmethod
921     def get_vpp_time(cls):
922         # processes e.g. "Time now 2.190522, Wed, 11 Mar 2020 17:29:54 GMT"
923         # returns float("2.190522")
924         timestr = cls.vapi.cli("show clock")
925         head, sep, tail = timestr.partition(",")
926         head, sep, tail = head.partition("Time now")
927         return float(tail)
928
929     @classmethod
930     def sleep_on_vpp_time(cls, sec):
931         """Sleep according to time in VPP world"""
932         # On a busy system with many processes
933         # we might end up with VPP time being slower than real world
934         # So take that into account when waiting for VPP to do something
935         start_time = cls.get_vpp_time()
936         while cls.get_vpp_time() - start_time < sec:
937             cls.sleep(0.1)
938
939     @classmethod
940     def create_loopback_interfaces(cls, count):
941         """
942         Create loopback interfaces.
943
944         :param count: number of interfaces created.
945         :returns: List of created interfaces.
946         """
947         result = [VppLoInterface(cls) for i in range(count)]
948         for intf in result:
949             setattr(cls, intf.name, intf)
950         cls.lo_interfaces = result
951         return result
952
953     def assert_equal(self, real_value, expected_value, name_or_class=None):
954         if name_or_class is None:
955             self.assertEqual(real_value, expected_value)
956             return
957         try:
958             msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
959             msg = msg % (
960                 getdoc(name_or_class).strip(),
961                 real_value,
962                 str(name_or_class(real_value)),
963                 expected_value,
964                 str(name_or_class(expected_value)),
965             )
966         except Exception:
967             msg = "Invalid %s: %s does not match expected value %s" % (
968                 name_or_class,
969                 real_value,
970                 expected_value,
971             )
972
973         self.assertEqual(real_value, expected_value, msg)
974
975     def assert_in_range(self, real_value, expected_min, expected_max, name=None):
976         if name is None:
977             msg = None
978         else:
979             msg = "Invalid %s: %s out of range <%s,%s>" % (
980                 name,
981                 real_value,
982                 expected_min,
983                 expected_max,
984             )
985         self.assertTrue(expected_min <= real_value <= expected_max, msg)
986
987     def get_counter(self, counter):
988         if counter.startswith("/"):
989             counter_value = self.statistics.get_counter(counter)
990         else:
991             counters = self.vapi.cli("sh errors").split("\n")
992             counter_value = 0
993             for i in range(1, len(counters) - 1):
994                 results = counters[i].split()
995                 if results[1] == counter:
996                     counter_value = int(results[0])
997                     break
998         return counter_value
999
1000     def assert_counter_equal(self, counter, expected_value, thread=None, index=0):
1001         c = self.get_counter(counter)
1002         if thread is not None:
1003             c = c[thread][index]
1004         else:
1005             c = sum(x[index] for x in c)
1006         self.logger.debug(
1007             "validate counter `%s[%s]', expected: %s, real value: %s"
1008             % (counter, index, expected_value, c)
1009         )
1010         self.assert_equal(c, expected_value, "counter `%s[%s]'" % (counter, index))
1011
1012     def assert_error_counter_equal(self, counter, expected_value):
1013         counter_value = self.statistics[counter].sum()
1014         self.assert_equal(counter_value, expected_value, "error counter `%s'" % counter)
1015
1016     @classmethod
1017     def sleep(cls, timeout, remark=None):
1018         # /* Allow sleep(0) to maintain win32 semantics, and as decreed
1019         #  * by Guido, only the main thread can be interrupted.
1020         # */
1021         # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892  # noqa
1022         if timeout == 0:
1023             # yield quantum
1024             if hasattr(os, "sched_yield"):
1025                 os.sched_yield()
1026             else:
1027                 time.sleep(0)
1028             return
1029
1030         cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1031         before = time.time()
1032         time.sleep(timeout)
1033         after = time.time()
1034         if after - before > 2 * timeout:
1035             cls.logger.error(
1036                 "unexpected self.sleep() result - slept for %es instead of ~%es!",
1037                 after - before,
1038                 timeout,
1039             )
1040
1041         cls.logger.debug(
1042             "Finished sleep (%s) - slept %es (wanted %es)",
1043             remark,
1044             after - before,
1045             timeout,
1046         )
1047
1048     def virtual_sleep(self, timeout, remark=None):
1049         self.logger.debug("Moving VPP time by %s (%s)", timeout, remark)
1050         self.vapi.cli("set clock adjust %s" % timeout)
1051
1052     def snapshot_stats(self, stats_diff):
1053         """Return snapshot of interesting stats based on diff dictionary."""
1054         stats_snapshot = {}
1055         for sw_if_index in stats_diff:
1056             for counter in stats_diff[sw_if_index]:
1057                 stats_snapshot[counter] = self.statistics[counter]
1058         self.logger.debug(f"Took statistics stats_snapshot: {stats_snapshot}")
1059         return stats_snapshot
1060
1061     def compare_stats_with_snapshot(self, stats_diff, stats_snapshot):
1062         """Assert appropriate difference between current stats and snapshot."""
1063         for sw_if_index in stats_diff:
1064             for cntr, diff in stats_diff[sw_if_index].items():
1065                 if sw_if_index == "err":
1066                     self.assert_equal(
1067                         self.statistics[cntr].sum(),
1068                         stats_snapshot[cntr].sum() + diff,
1069                         f"'{cntr}' counter value (previous value: "
1070                         f"{stats_snapshot[cntr].sum()}, "
1071                         f"expected diff: {diff})",
1072                     )
1073                 else:
1074                     try:
1075                         self.assert_equal(
1076                             self.statistics[cntr][:, sw_if_index].sum(),
1077                             stats_snapshot[cntr][:, sw_if_index].sum() + diff,
1078                             f"'{cntr}' counter value (previous value: "
1079                             f"{stats_snapshot[cntr][:, sw_if_index].sum()}, "
1080                             f"expected diff: {diff})",
1081                         )
1082                     except IndexError as e:
1083                         # if diff is 0, then this most probably a case where
1084                         # test declares multiple interfaces but traffic hasn't
1085                         # passed through this one yet - which means the counter
1086                         # value is 0 and can be ignored
1087                         if 0 != diff:
1088                             raise Exception(
1089                                 f"Couldn't sum counter: {cntr} on sw_if_index: {sw_if_index}"
1090                             ) from e
1091
1092
1093 def get_testcase_doc_name(test):
1094     return getdoc(test.__class__).splitlines()[0]
1095
1096
1097 def get_test_description(descriptions, test):
1098     short_description = test.shortDescription()
1099     if descriptions and short_description:
1100         return short_description
1101     else:
1102         return str(test)
1103
1104
1105 def get_failed_testcase_linkname(failed_dir, testcase_dirname):
1106     return os.path.join(failed_dir, f"{testcase_dirname}-FAILED")
1107
1108
1109 def get_testcase_dirname(testcase_class_name):
1110     return f"vpp-unittest-{testcase_class_name}"
1111
1112
1113 class TestCaseInfo(object):
1114     def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1115         self.logger = logger
1116         self.tempdir = tempdir
1117         self.vpp_pid = vpp_pid
1118         self.vpp_bin_path = vpp_bin_path
1119         self.core_crash_test = None
1120
1121
1122 class VppTestResult(unittest.TestResult):
1123     """
1124     @property result_string
1125      String variable to store the test case result string.
1126     @property errors
1127      List variable containing 2-tuples of TestCase instances and strings
1128      holding formatted tracebacks. Each tuple represents a test which
1129      raised an unexpected exception.
1130     @property failures
1131      List variable containing 2-tuples of TestCase instances and strings
1132      holding formatted tracebacks. Each tuple represents a test where
1133      a failure was explicitly signalled using the TestCase.assert*()
1134      methods.
1135     """
1136
1137     failed_test_cases_info = set()
1138     core_crash_test_cases_info = set()
1139     current_test_case_info = None
1140
1141     def __init__(self, stream=None, descriptions=None, verbosity=None, runner=None):
1142         """
1143         :param stream File descriptor to store where to report test results.
1144             Set to the standard error stream by default.
1145         :param descriptions Boolean variable to store information if to use
1146             test case descriptions.
1147         :param verbosity Integer variable to store required verbosity level.
1148         """
1149         super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1150         self.stream = stream
1151         self.descriptions = descriptions
1152         self.verbosity = verbosity
1153         self.result_code = TestResultCode.TEST_RUN
1154         self.result_string = None
1155         self.runner = runner
1156         self.printed = []
1157
1158     def decodePcapFiles(self, test, when_configured=False):
1159         if when_configured == False or config.decode_pcaps == True:
1160             if hasattr(test, "pg_interfaces") and len(test.pg_interfaces) > 0:
1161                 testcase_dir = os.path.dirname(test.pg_interfaces[0].out_path)
1162                 test.pg_interfaces[0].decode_pcap_files(
1163                     testcase_dir, f"suite{test.__class__.__name__}"
1164                 )
1165                 test.pg_interfaces[0].decode_pcap_files(
1166                     testcase_dir, test._testMethodName
1167                 )
1168
1169     def addSuccess(self, test):
1170         """
1171         Record a test succeeded result
1172
1173         :param test:
1174
1175         """
1176         self.log_result("addSuccess", test)
1177         self.decodePcapFiles(test, when_configured=True)
1178         unittest.TestResult.addSuccess(self, test)
1179         self.result_string = colorize("OK", GREEN)
1180         self.result_code = TestResultCode.PASS
1181         self.send_result_through_pipe(test, self.result_code)
1182
1183     def addExpectedFailure(self, test, err):
1184         self.log_result("addExpectedFailure", test, err)
1185         self.decodePcapFiles(test)
1186         super().addExpectedFailure(test, err)
1187         self.result_string = colorize("FAIL", GREEN)
1188         self.result_code = TestResultCode.EXPECTED_FAIL
1189         self.send_result_through_pipe(test, self.result_code)
1190
1191     def addUnexpectedSuccess(self, test):
1192         self.log_result("addUnexpectedSuccess", test)
1193         self.decodePcapFiles(test, when_configured=True)
1194         super().addUnexpectedSuccess(test)
1195         self.result_string = colorize("OK", RED)
1196         self.result_code = TestResultCode.UNEXPECTED_PASS
1197         self.send_result_through_pipe(test, self.result_code)
1198
1199     def addSkip(self, test, reason):
1200         """
1201         Record a test skipped.
1202
1203         :param test:
1204         :param reason:
1205
1206         """
1207         self.log_result("addSkip", test, reason=reason)
1208         unittest.TestResult.addSkip(self, test, reason)
1209         self.result_string = colorize("SKIP", YELLOW)
1210
1211         if reason == "not enough cpus":
1212             self.result_code = TestResultCode.SKIP_CPU_SHORTAGE
1213         else:
1214             self.result_code = TestResultCode.SKIP
1215         self.send_result_through_pipe(test, self.result_code)
1216
1217     def symlink_failed(self):
1218         if self.current_test_case_info:
1219             try:
1220                 failed_dir = config.failed_dir
1221                 link_path = get_failed_testcase_linkname(
1222                     failed_dir, os.path.basename(self.current_test_case_info.tempdir)
1223                 )
1224
1225                 self.current_test_case_info.logger.debug(
1226                     "creating a link to the failed test"
1227                 )
1228                 self.current_test_case_info.logger.debug(
1229                     "os.symlink(%s, %s)"
1230                     % (self.current_test_case_info.tempdir, link_path)
1231                 )
1232                 if os.path.exists(link_path):
1233                     self.current_test_case_info.logger.debug("symlink already exists")
1234                 else:
1235                     os.symlink(self.current_test_case_info.tempdir, link_path)
1236
1237             except Exception as e:
1238                 self.current_test_case_info.logger.error(e)
1239
1240     def send_result_through_pipe(self, test, result):
1241         if hasattr(self, "test_framework_result_pipe"):
1242             pipe = self.test_framework_result_pipe
1243             if pipe:
1244                 pipe.send((test.id(), result))
1245
1246     def log_result(self, fn, test, err=None, reason=None):
1247         if self.current_test_case_info:
1248             if isinstance(test, unittest.suite._ErrorHolder):
1249                 test_name = test.description
1250             else:
1251                 test_name = "%s.%s(%s)" % (
1252                     test.__class__.__name__,
1253                     test._testMethodName,
1254                     test._testMethodDoc,
1255                 )
1256             extra_msg = ""
1257             if err:
1258                 extra_msg += f", error is {err}"
1259             if reason:
1260                 extra_msg += f", reason is {reason}"
1261             self.current_test_case_info.logger.debug(
1262                 f"--- {fn}() {test_name} called{extra_msg}"
1263             )
1264             if err:
1265                 self.current_test_case_info.logger.debug(
1266                     "formatted exception is:\n%s" % "".join(format_exception(*err))
1267                 )
1268
1269     def add_error(self, test, err, unittest_fn, result_code):
1270         self.result_code = result_code
1271         if result_code == TestResultCode.FAIL:
1272             self.log_result("addFailure", test, err=err)
1273             error_type_str = colorize("FAIL", RED)
1274         elif result_code == TestResultCode.ERROR:
1275             self.log_result("addError", test, err=err)
1276             error_type_str = colorize("ERROR", RED)
1277         else:
1278             raise Exception(f"Unexpected result code {result_code}")
1279         self.decodePcapFiles(test)
1280
1281         unittest_fn(self, test, err)
1282         if self.current_test_case_info:
1283             self.result_string = "%s [ temp dir used by test case: %s ]" % (
1284                 error_type_str,
1285                 self.current_test_case_info.tempdir,
1286             )
1287             self.symlink_failed()
1288             self.failed_test_cases_info.add(self.current_test_case_info)
1289             if is_core_present(self.current_test_case_info.tempdir):
1290                 if not self.current_test_case_info.core_crash_test:
1291                     if isinstance(test, unittest.suite._ErrorHolder):
1292                         test_name = str(test)
1293                     else:
1294                         test_name = "'{!s}' ({!s})".format(
1295                             get_testcase_doc_name(test), test.id()
1296                         )
1297                     self.current_test_case_info.core_crash_test = test_name
1298                 self.core_crash_test_cases_info.add(self.current_test_case_info)
1299         else:
1300             self.result_string = "%s [no temp dir]" % error_type_str
1301
1302         self.send_result_through_pipe(test, result_code)
1303
1304     def addFailure(self, test, err):
1305         """
1306         Record a test failed result
1307
1308         :param test:
1309         :param err: error message
1310
1311         """
1312         self.add_error(test, err, unittest.TestResult.addFailure, TestResultCode.FAIL)
1313
1314     def addError(self, test, err):
1315         """
1316         Record a test error result
1317
1318         :param test:
1319         :param err: error message
1320
1321         """
1322         self.add_error(test, err, unittest.TestResult.addError, TestResultCode.ERROR)
1323
1324     def getDescription(self, test):
1325         """
1326         Get test description
1327
1328         :param test:
1329         :returns: test description
1330
1331         """
1332         return get_test_description(self.descriptions, test)
1333
1334     def startTest(self, test):
1335         """
1336         Start a test
1337
1338         :param test:
1339
1340         """
1341
1342         def print_header(test):
1343             if test.__class__ in self.printed:
1344                 return
1345
1346             test_doc = getdoc(test)
1347             if not test_doc:
1348                 raise Exception("No doc string for test '%s'" % test.id())
1349
1350             test_title = test_doc.splitlines()[0].rstrip()
1351             test_title = colorize(test_title, GREEN)
1352             if test.is_tagged_run_solo():
1353                 test_title = colorize(f"SOLO RUN: {test_title}", YELLOW)
1354
1355             # This block may overwrite the colorized title above,
1356             # but we want this to stand out and be fixed
1357             if test.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
1358                 test_title = colorize(f"FIXME with VPP workers: {test_title}", RED)
1359
1360             if test.has_tag(TestCaseTag.FIXME_ASAN):
1361                 test_title = colorize(f"FIXME with ASAN: {test_title}", RED)
1362                 test.skip_fixme_asan()
1363
1364             if hasattr(test, "vpp_worker_count"):
1365                 if test.vpp_worker_count == 0:
1366                     test_title += " [main thread only]"
1367                 elif test.vpp_worker_count == 1:
1368                     test_title += " [1 worker thread]"
1369                 else:
1370                     test_title += f" [{test.vpp_worker_count} worker threads]"
1371
1372             if test.__class__.skipped_due_to_cpu_lack:
1373                 test_title = colorize(
1374                     f"{test_title} [skipped - not enough cpus, "
1375                     f"required={test.__class__.get_cpus_required()}, "
1376                     f"available={max_vpp_cpus}]",
1377                     YELLOW,
1378                 )
1379
1380             print(double_line_delim)
1381             print(test_title)
1382             print(double_line_delim)
1383             self.printed.append(test.__class__)
1384
1385         print_header(test)
1386         self.start_test = time.time()
1387         unittest.TestResult.startTest(self, test)
1388         if self.verbosity > 0:
1389             self.stream.writeln("Starting " + self.getDescription(test) + " ...")
1390             self.stream.writeln(single_line_delim)
1391
1392     def stopTest(self, test):
1393         """
1394         Called when the given test has been run
1395
1396         :param test:
1397
1398         """
1399         unittest.TestResult.stopTest(self, test)
1400
1401         result_code_to_suffix = {
1402             TestResultCode.PASS: "",
1403             TestResultCode.FAIL: "",
1404             TestResultCode.ERROR: "",
1405             TestResultCode.SKIP: "",
1406             TestResultCode.TEST_RUN: "",
1407             TestResultCode.SKIP_CPU_SHORTAGE: "",
1408             TestResultCode.EXPECTED_FAIL: " [EXPECTED FAIL]",
1409             TestResultCode.UNEXPECTED_PASS: " [UNEXPECTED PASS]",
1410         }
1411
1412         if self.verbosity > 0:
1413             self.stream.writeln(single_line_delim)
1414             self.stream.writeln(
1415                 "%-72s%s%s"
1416                 % (
1417                     self.getDescription(test),
1418                     self.result_string,
1419                     result_code_to_suffix[self.result_code],
1420                 )
1421             )
1422             self.stream.writeln(single_line_delim)
1423         else:
1424             self.stream.writeln(
1425                 "%-67s %4.2f %s%s"
1426                 % (
1427                     self.getDescription(test),
1428                     time.time() - self.start_test,
1429                     self.result_string,
1430                     result_code_to_suffix[self.result_code],
1431                 )
1432             )
1433
1434         self.send_result_through_pipe(test, TestResultCode.TEST_RUN)
1435
1436     def printErrors(self):
1437         """
1438         Print errors from running the test case
1439         """
1440         if len(self.errors) > 0 or len(self.failures) > 0:
1441             self.stream.writeln()
1442             self.printErrorList("ERROR", self.errors)
1443             self.printErrorList("FAIL", self.failures)
1444
1445         # ^^ that is the last output from unittest before summary
1446         if not self.runner.print_summary:
1447             devnull = unittest.runner._WritelnDecorator(open(os.devnull, "w"))
1448             self.stream = devnull
1449             self.runner.stream = devnull
1450
1451     def printErrorList(self, flavour, errors):
1452         """
1453         Print error list to the output stream together with error type
1454         and test case description.
1455
1456         :param flavour: error type
1457         :param errors: iterable errors
1458
1459         """
1460         for test, err in errors:
1461             self.stream.writeln(double_line_delim)
1462             self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
1463             self.stream.writeln(single_line_delim)
1464             self.stream.writeln("%s" % err)
1465
1466
1467 class VppTestRunner(unittest.TextTestRunner):
1468     """
1469     A basic test runner implementation which prints results to standard error.
1470     """
1471
1472     @property
1473     def resultclass(self):
1474         """Class maintaining the results of the tests"""
1475         return VppTestResult
1476
1477     def __init__(
1478         self,
1479         keep_alive_pipe=None,
1480         descriptions=True,
1481         verbosity=1,
1482         result_pipe=None,
1483         failfast=False,
1484         buffer=False,
1485         resultclass=None,
1486         print_summary=True,
1487         **kwargs,
1488     ):
1489         # ignore stream setting here, use hard-coded stdout to be in sync
1490         # with prints from VppAsfTestCase methods ...
1491         super(VppTestRunner, self).__init__(
1492             sys.stdout, descriptions, verbosity, failfast, buffer, resultclass, **kwargs
1493         )
1494         KeepAliveReporter.pipe = keep_alive_pipe
1495
1496         self.orig_stream = self.stream
1497         self.resultclass.test_framework_result_pipe = result_pipe
1498
1499         self.print_summary = print_summary
1500
1501     def _makeResult(self):
1502         return self.resultclass(self.stream, self.descriptions, self.verbosity, self)
1503
1504     def run(self, test):
1505         """
1506         Run the tests
1507
1508         :param test:
1509
1510         """
1511         faulthandler.enable()  # emit stack trace to stderr if killed by signal
1512
1513         result = super(VppTestRunner, self).run(test)
1514         if not self.print_summary:
1515             self.stream = self.orig_stream
1516             result.stream = self.orig_stream
1517         return result
1518
1519
1520 class Worker(Thread):
1521     def __init__(self, executable_args, logger, env=None, *args, **kwargs):
1522         super(Worker, self).__init__(*args, **kwargs)
1523         self.logger = logger
1524         self.args = executable_args
1525         if hasattr(self, "testcase") and self.testcase.debug_all:
1526             if self.testcase.debug_gdbserver:
1527                 self.args = [
1528                     "/usr/bin/gdbserver",
1529                     "localhost:{port}".format(port=self.testcase.gdbserver_port),
1530                 ] + args
1531             elif self.testcase.debug_gdb and hasattr(self, "wait_for_gdb"):
1532                 self.args.append(self.wait_for_gdb)
1533         self.app_bin = executable_args[0]
1534         self.app_name = os.path.basename(self.app_bin)
1535         if hasattr(self, "role"):
1536             self.app_name += " {role}".format(role=self.role)
1537         self.process = None
1538         self.result = None
1539         env = {} if env is None else env
1540         self.env = copy.deepcopy(env)
1541
1542     def wait_for_enter(self):
1543         if not hasattr(self, "testcase"):
1544             return
1545         if self.testcase.debug_all and self.testcase.debug_gdbserver:
1546             print()
1547             print(double_line_delim)
1548             print(
1549                 "Spawned GDB Server for '{app}' with PID: {pid}".format(
1550                     app=self.app_name, pid=self.process.pid
1551                 )
1552             )
1553         elif self.testcase.debug_all and self.testcase.debug_gdb:
1554             print()
1555             print(double_line_delim)
1556             print(
1557                 "Spawned '{app}' with PID: {pid}".format(
1558                     app=self.app_name, pid=self.process.pid
1559                 )
1560             )
1561         else:
1562             return
1563         print(single_line_delim)
1564         print("You can debug '{app}' using:".format(app=self.app_name))
1565         if self.testcase.debug_gdbserver:
1566             print(
1567                 "sudo gdb "
1568                 + self.app_bin
1569                 + " -ex 'target remote localhost:{port}'".format(
1570                     port=self.testcase.gdbserver_port
1571                 )
1572             )
1573             print(
1574                 "Now is the time to attach gdb by running the above "
1575                 "command, set up breakpoints etc., then resume from "
1576                 "within gdb by issuing the 'continue' command"
1577             )
1578             self.testcase.gdbserver_port += 1
1579         elif self.testcase.debug_gdb:
1580             print(
1581                 "sudo gdb "
1582                 + self.app_bin
1583                 + " -ex 'attach {pid}'".format(pid=self.process.pid)
1584             )
1585             print(
1586                 "Now is the time to attach gdb by running the above "
1587                 "command and set up breakpoints etc., then resume from"
1588                 " within gdb by issuing the 'continue' command"
1589             )
1590         print(single_line_delim)
1591         input("Press ENTER to continue running the testcase...")
1592
1593     def run(self):
1594         executable = self.args[0]
1595         if not os.path.exists(executable) or not os.access(
1596             executable, os.F_OK | os.X_OK
1597         ):
1598             # Exit code that means some system file did not exist,
1599             # could not be opened, or had some other kind of error.
1600             self.result = os.EX_OSFILE
1601             raise EnvironmentError(
1602                 "executable '%s' is not found or executable." % executable
1603             )
1604         self.logger.debug(
1605             "Running executable '{app}': '{cmd}'".format(
1606                 app=self.app_name, cmd=" ".join(self.args)
1607             )
1608         )
1609         env = os.environ.copy()
1610         env.update(self.env)
1611         env["CK_LOG_FILE_NAME"] = "-"
1612         self.process = subprocess.Popen(
1613             ["stdbuf", "-o0", "-e0"] + self.args,
1614             shell=False,
1615             env=env,
1616             preexec_fn=os.setpgrp,
1617             stdout=subprocess.PIPE,
1618             stderr=subprocess.PIPE,
1619         )
1620         self.wait_for_enter()
1621         out, err = self.process.communicate()
1622         self.logger.debug("Finished running `{app}'".format(app=self.app_name))
1623         self.logger.info("Return code is `%s'" % self.process.returncode)
1624         self.logger.info(single_line_delim)
1625         self.logger.info(
1626             "Executable `{app}' wrote to stdout:".format(app=self.app_name)
1627         )
1628         self.logger.info(single_line_delim)
1629         self.logger.info(out.decode("utf-8"))
1630         self.logger.info(single_line_delim)
1631         self.logger.info(
1632             "Executable `{app}' wrote to stderr:".format(app=self.app_name)
1633         )
1634         self.logger.info(single_line_delim)
1635         self.logger.info(err.decode("utf-8"))
1636         self.logger.info(single_line_delim)
1637         self.result = self.process.returncode
1638
1639
1640 if __name__ == "__main__":
1641     pass