tests: enhance counter comparison error message
[vpp.git] / test / framework.py
1 #!/usr/bin/env python3
2
3 from __future__ import print_function
4 import logging
5 import sys
6 import os
7 import select
8 import signal
9 import subprocess
10 import unittest
11 import re
12 import time
13 import faulthandler
14 import random
15 import copy
16 import platform
17 import shutil
18 from collections import deque
19 from threading import Thread, Event
20 from inspect import getdoc, isclass
21 from traceback import format_exception
22 from logging import FileHandler, DEBUG, Formatter
23 from enum import Enum
24 from abc import ABC, abstractmethod
25 from struct import pack, unpack
26
27 import scapy.compat
28 from scapy.packet import Raw, Packet
29 from config import config, available_cpus, num_cpus, max_vpp_cpus
30 import hook as hookmodule
31 from vpp_pg_interface import VppPGInterface
32 from vpp_sub_interface import VppSubInterface
33 from vpp_lo_interface import VppLoInterface
34 from vpp_bvi_interface import VppBviInterface
35 from vpp_papi_provider import VppPapiProvider
36 from vpp_papi import VppEnum
37 import vpp_papi
38 from vpp_papi.vpp_stats import VPPStats
39 from vpp_papi.vpp_transport_socket import VppTransportSocketIOError
40 from log import (
41     RED,
42     GREEN,
43     YELLOW,
44     double_line_delim,
45     single_line_delim,
46     get_logger,
47     colorize,
48 )
49 from vpp_object import VppObjectRegistry
50 from util import ppp, is_core_present
51 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
52 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
53 from scapy.layers.inet6 import ICMPv6EchoReply
54 from vpp_running import use_running
55
56
57 logger = logging.getLogger(__name__)
58
59 # Set up an empty logger for the testcase that can be overridden as necessary
60 null_logger = logging.getLogger("VppTestCase")
61 null_logger.addHandler(logging.NullHandler())
62
63 PASS = 0
64 FAIL = 1
65 ERROR = 2
66 SKIP = 3
67 TEST_RUN = 4
68 SKIP_CPU_SHORTAGE = 5
69
70
71 if config.debug_framework:
72     import debug_internal
73
74 """
75   Test framework module.
76
77   The module provides a set of tools for constructing and running tests and
78   representing the results.
79 """
80
81
82 class VppDiedError(Exception):
83     """exception for reporting that the subprocess has died."""
84
85     signals_by_value = {
86         v: k
87         for k, v in signal.__dict__.items()
88         if k.startswith("SIG") and not k.startswith("SIG_")
89     }
90
91     def __init__(self, rv=None, testcase=None, method_name=None):
92         self.rv = rv
93         self.signal_name = None
94         self.testcase = testcase
95         self.method_name = method_name
96
97         try:
98             self.signal_name = VppDiedError.signals_by_value[-rv]
99         except (KeyError, TypeError):
100             pass
101
102         if testcase is None and method_name is None:
103             in_msg = ""
104         else:
105             in_msg = " while running %s.%s" % (testcase, method_name)
106
107         if self.rv:
108             msg = "VPP subprocess died unexpectedly%s with return code: %d%s." % (
109                 in_msg,
110                 self.rv,
111                 " [%s]" % (self.signal_name if self.signal_name is not None else ""),
112             )
113         else:
114             msg = "VPP subprocess died unexpectedly%s." % in_msg
115
116         super(VppDiedError, self).__init__(msg)
117
118
119 class _PacketInfo(object):
120     """Private class to create packet info object.
121
122     Help process information about the next packet.
123     Set variables to default values.
124     """
125
126     #: Store the index of the packet.
127     index = -1
128     #: Store the index of the source packet generator interface of the packet.
129     src = -1
130     #: Store the index of the destination packet generator interface
131     #: of the packet.
132     dst = -1
133     #: Store expected ip version
134     ip = -1
135     #: Store expected upper protocol
136     proto = -1
137     #: Store the copy of the former packet.
138     data = None
139
140     def __eq__(self, other):
141         index = self.index == other.index
142         src = self.src == other.src
143         dst = self.dst == other.dst
144         data = self.data == other.data
145         return index and src and dst and data
146
147
148 def pump_output(testclass):
149     """pump output from vpp stdout/stderr to proper queues"""
150     if not hasattr(testclass, "vpp"):
151         return
152     stdout_fragment = ""
153     stderr_fragment = ""
154     while not testclass.pump_thread_stop_flag.is_set():
155         readable = select.select(
156             [
157                 testclass.vpp.stdout.fileno(),
158                 testclass.vpp.stderr.fileno(),
159                 testclass.pump_thread_wakeup_pipe[0],
160             ],
161             [],
162             [],
163         )[0]
164         if testclass.vpp.stdout.fileno() in readable:
165             read = os.read(testclass.vpp.stdout.fileno(), 102400)
166             if len(read) > 0:
167                 split = read.decode("ascii", errors="backslashreplace").splitlines(True)
168                 if len(stdout_fragment) > 0:
169                     split[0] = "%s%s" % (stdout_fragment, split[0])
170                 if len(split) > 0 and split[-1].endswith("\n"):
171                     limit = None
172                 else:
173                     limit = -1
174                     stdout_fragment = split[-1]
175                 testclass.vpp_stdout_deque.extend(split[:limit])
176                 if not config.cache_vpp_output:
177                     for line in split[:limit]:
178                         testclass.logger.info("VPP STDOUT: %s" % line.rstrip("\n"))
179         if testclass.vpp.stderr.fileno() in readable:
180             read = os.read(testclass.vpp.stderr.fileno(), 102400)
181             if len(read) > 0:
182                 split = read.decode("ascii", errors="backslashreplace").splitlines(True)
183                 if len(stderr_fragment) > 0:
184                     split[0] = "%s%s" % (stderr_fragment, split[0])
185                 if len(split) > 0 and split[-1].endswith("\n"):
186                     limit = None
187                 else:
188                     limit = -1
189                     stderr_fragment = split[-1]
190
191                 testclass.vpp_stderr_deque.extend(split[:limit])
192                 if not config.cache_vpp_output:
193                     for line in split[:limit]:
194                         testclass.logger.error("VPP STDERR: %s" % line.rstrip("\n"))
195                         # ignoring the dummy pipe here intentionally - the
196                         # flag will take care of properly terminating the loop
197
198
199 def _is_platform_aarch64():
200     return platform.machine() == "aarch64"
201
202
203 is_platform_aarch64 = _is_platform_aarch64()
204
205
206 def _is_distro_ubuntu2204():
207     with open("/etc/os-release") as f:
208         for line in f.readlines():
209             if "jammy" in line:
210                 return True
211     return False
212
213
214 is_distro_ubuntu2204 = _is_distro_ubuntu2204()
215
216
217 def _is_distro_debian11():
218     with open("/etc/os-release") as f:
219         for line in f.readlines():
220             if "bullseye" in line:
221                 return True
222     return False
223
224
225 is_distro_debian11 = _is_distro_debian11()
226
227
228 class KeepAliveReporter(object):
229     """
230     Singleton object which reports test start to parent process
231     """
232
233     _shared_state = {}
234
235     def __init__(self):
236         self.__dict__ = self._shared_state
237         self._pipe = None
238
239     @property
240     def pipe(self):
241         return self._pipe
242
243     @pipe.setter
244     def pipe(self, pipe):
245         if self._pipe is not None:
246             raise Exception("Internal error - pipe should only be set once.")
247         self._pipe = pipe
248
249     def send_keep_alive(self, test, desc=None):
250         """
251         Write current test tmpdir & desc to keep-alive pipe to signal liveness
252         """
253         if not hasattr(test, "vpp") or self.pipe is None:
254             # if not running forked..
255             return
256
257         if isclass(test):
258             desc = "%s (%s)" % (desc, unittest.util.strclass(test))
259         else:
260             desc = test.id()
261
262         self.pipe.send((desc, config.vpp, test.tempdir, test.vpp.pid))
263
264
265 class TestCaseTag(Enum):
266     # marks the suites that must run at the end
267     # using only a single test runner
268     RUN_SOLO = 1
269     # marks the suites broken on VPP multi-worker
270     FIXME_VPP_WORKERS = 2
271     # marks the suites broken when ASan is enabled
272     FIXME_ASAN = 3
273     # marks suites broken on Ubuntu-22.04
274     FIXME_UBUNTU2204 = 4
275     # marks suites broken on Debian-11
276     FIXME_DEBIAN11 = 5
277     # marks suites broken on debug vpp image
278     FIXME_VPP_DEBUG = 6
279
280
281 def create_tag_decorator(e):
282     def decorator(cls):
283         try:
284             cls.test_tags.append(e)
285         except AttributeError:
286             cls.test_tags = [e]
287         return cls
288
289     return decorator
290
291
292 tag_run_solo = create_tag_decorator(TestCaseTag.RUN_SOLO)
293 tag_fixme_vpp_workers = create_tag_decorator(TestCaseTag.FIXME_VPP_WORKERS)
294 tag_fixme_asan = create_tag_decorator(TestCaseTag.FIXME_ASAN)
295 tag_fixme_ubuntu2204 = create_tag_decorator(TestCaseTag.FIXME_UBUNTU2204)
296 tag_fixme_debian11 = create_tag_decorator(TestCaseTag.FIXME_DEBIAN11)
297 tag_fixme_vpp_debug = create_tag_decorator(TestCaseTag.FIXME_VPP_DEBUG)
298
299
300 class DummyVpp:
301     returncode = None
302     pid = 0xCAFEBAFE
303
304     def poll(self):
305         pass
306
307     def terminate(self):
308         pass
309
310
311 class CPUInterface(ABC):
312     cpus = []
313     skipped_due_to_cpu_lack = False
314
315     @classmethod
316     @abstractmethod
317     def get_cpus_required(cls):
318         pass
319
320     @classmethod
321     def assign_cpus(cls, cpus):
322         cls.cpus = cpus
323
324
325 @use_running
326 class VppTestCase(CPUInterface, unittest.TestCase):
327     """This subclass is a base class for VPP test cases that are implemented as
328     classes. It provides methods to create and run test case.
329     """
330
331     extra_vpp_statseg_config = ""
332     extra_vpp_config = []
333     extra_vpp_plugin_config = []
334     logger = null_logger
335     vapi_response_timeout = 5
336     remove_configured_vpp_objects_on_tear_down = True
337
338     @property
339     def packet_infos(self):
340         """List of packet infos"""
341         return self._packet_infos
342
343     @classmethod
344     def get_packet_count_for_if_idx(cls, dst_if_index):
345         """Get the number of packet info for specified destination if index"""
346         if dst_if_index in cls._packet_count_for_dst_if_idx:
347             return cls._packet_count_for_dst_if_idx[dst_if_index]
348         else:
349             return 0
350
351     @classmethod
352     def has_tag(cls, tag):
353         """if the test case has a given tag - return true"""
354         try:
355             return tag in cls.test_tags
356         except AttributeError:
357             pass
358         return False
359
360     @classmethod
361     def is_tagged_run_solo(cls):
362         """if the test case class is timing-sensitive - return true"""
363         return cls.has_tag(TestCaseTag.RUN_SOLO)
364
365     @classmethod
366     def skip_fixme_asan(cls):
367         """if @tag_fixme_asan & ASan is enabled - mark for skip"""
368         if cls.has_tag(TestCaseTag.FIXME_ASAN):
369             vpp_extra_cmake_args = os.environ.get("VPP_EXTRA_CMAKE_ARGS", "")
370             if "DVPP_ENABLE_SANITIZE_ADDR=ON" in vpp_extra_cmake_args:
371                 cls = unittest.skip("Skipping @tag_fixme_asan tests")(cls)
372
373     @classmethod
374     def skip_fixme_ubuntu2204(cls):
375         """if distro is ubuntu 22.04 and @tag_fixme_ubuntu2204 mark for skip"""
376         if cls.has_tag(TestCaseTag.FIXME_UBUNTU2204):
377             cls = unittest.skip("Skipping @tag_fixme_ubuntu2204 tests")(cls)
378
379     @classmethod
380     def skip_fixme_debian11(cls):
381         """if distro is Debian-11 and @tag_fixme_debian11 mark for skip"""
382         if cls.has_tag(TestCaseTag.FIXME_DEBIAN11):
383             cls = unittest.skip("Skipping @tag_fixme_debian11 tests")(cls)
384
385     @classmethod
386     def skip_fixme_vpp_debug(cls):
387         cls = unittest.skip("Skipping @tag_fixme_vpp_debug tests")(cls)
388
389     @classmethod
390     def instance(cls):
391         """Return the instance of this testcase"""
392         return cls.test_instance
393
394     @classmethod
395     def set_debug_flags(cls, d):
396         cls.gdbserver_port = 7777
397         cls.debug_core = False
398         cls.debug_gdb = False
399         cls.debug_gdbserver = False
400         cls.debug_all = False
401         cls.debug_attach = False
402         if d is None:
403             return
404         dl = d.lower()
405         if dl == "core":
406             cls.debug_core = True
407         elif dl == "gdb" or dl == "gdb-all":
408             cls.debug_gdb = True
409         elif dl == "gdbserver" or dl == "gdbserver-all":
410             cls.debug_gdbserver = True
411         elif dl == "attach":
412             cls.debug_attach = True
413         else:
414             raise Exception("Unrecognized DEBUG option: '%s'" % d)
415         if dl == "gdb-all" or dl == "gdbserver-all":
416             cls.debug_all = True
417
418     @classmethod
419     def get_vpp_worker_count(cls):
420         if not hasattr(cls, "vpp_worker_count"):
421             if cls.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
422                 cls.vpp_worker_count = 0
423             else:
424                 cls.vpp_worker_count = config.vpp_worker_count
425         return cls.vpp_worker_count
426
427     @classmethod
428     def get_cpus_required(cls):
429         return 1 + cls.get_vpp_worker_count()
430
431     @classmethod
432     def setUpConstants(cls):
433         """Set-up the test case class based on environment variables"""
434         cls.step = config.step
435         cls.plugin_path = ":".join(config.vpp_plugin_dir)
436         cls.test_plugin_path = ":".join(config.vpp_test_plugin_dir)
437         cls.extern_plugin_path = ":".join(config.extern_plugin_dir)
438         debug_cli = ""
439         if cls.step or cls.debug_gdb or cls.debug_gdbserver:
440             debug_cli = "cli-listen localhost:5002"
441         size = re.search(r"\d+[gG]", config.coredump_size)
442         if size:
443             coredump_size = f"coredump-size {config.coredump_size}".lower()
444         else:
445             coredump_size = "coredump-size unlimited"
446         default_variant = config.variant
447         if default_variant is not None:
448             default_variant = "default { variant %s 100 }" % default_variant
449         else:
450             default_variant = ""
451
452         api_fuzzing = config.api_fuzz
453         if api_fuzzing is None:
454             api_fuzzing = "off"
455
456         cls.vpp_cmdline = [
457             config.vpp,
458             "unix",
459             "{",
460             "nodaemon",
461             debug_cli,
462             "full-coredump",
463             coredump_size,
464             "runtime-dir",
465             cls.tempdir,
466             "}",
467             "api-trace",
468             "{",
469             "on",
470             "}",
471             "api-segment",
472             "{",
473             "prefix",
474             cls.get_api_segment_prefix(),
475             "}",
476             "cpu",
477             "{",
478             "main-core",
479             str(cls.cpus[0]),
480         ]
481         if cls.extern_plugin_path not in (None, ""):
482             cls.extra_vpp_plugin_config.append("add-path %s" % cls.extern_plugin_path)
483         if cls.get_vpp_worker_count():
484             cls.vpp_cmdline.extend(
485                 ["corelist-workers", ",".join([str(x) for x in cls.cpus[1:]])]
486             )
487         cls.vpp_cmdline.extend(
488             [
489                 "}",
490                 "physmem",
491                 "{",
492                 "max-size",
493                 "32m",
494                 "}",
495                 "statseg",
496                 "{",
497                 "socket-name",
498                 cls.get_stats_sock_path(),
499                 cls.extra_vpp_statseg_config,
500                 "}",
501                 "socksvr",
502                 "{",
503                 "socket-name",
504                 cls.get_api_sock_path(),
505                 "}",
506                 "node { ",
507                 default_variant,
508                 "}",
509                 "api-fuzz {",
510                 api_fuzzing,
511                 "}",
512                 "plugins",
513                 "{",
514                 "plugin",
515                 "dpdk_plugin.so",
516                 "{",
517                 "disable",
518                 "}",
519                 "plugin",
520                 "rdma_plugin.so",
521                 "{",
522                 "disable",
523                 "}",
524                 "plugin",
525                 "lisp_unittest_plugin.so",
526                 "{",
527                 "enable",
528                 "}",
529                 "plugin",
530                 "unittest_plugin.so",
531                 "{",
532                 "enable",
533                 "}",
534             ]
535             + cls.extra_vpp_plugin_config
536             + [
537                 "}",
538             ]
539         )
540
541         if cls.extra_vpp_config is not None:
542             cls.vpp_cmdline.extend(cls.extra_vpp_config)
543
544         if not cls.debug_attach:
545             cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
546             cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
547
548     @classmethod
549     def wait_for_enter(cls):
550         if cls.debug_gdbserver:
551             print(double_line_delim)
552             print("Spawned GDB server with PID: %d" % cls.vpp.pid)
553         elif cls.debug_gdb:
554             print(double_line_delim)
555             print("Spawned VPP with PID: %d" % cls.vpp.pid)
556         else:
557             cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
558             return
559         print(single_line_delim)
560         print("You can debug VPP using:")
561         if cls.debug_gdbserver:
562             print(
563                 f"sudo gdb {config.vpp} "
564                 f"-ex 'target remote localhost:{cls.gdbserver_port}'"
565             )
566             print(
567                 "Now is the time to attach gdb by running the above "
568                 "command, set up breakpoints etc., then resume VPP from "
569                 "within gdb by issuing the 'continue' command"
570             )
571             cls.gdbserver_port += 1
572         elif cls.debug_gdb:
573             print(f"sudo gdb {config.vpp} -ex 'attach {cls.vpp.pid}'")
574             print(
575                 "Now is the time to attach gdb by running the above "
576                 "command and set up breakpoints etc., then resume VPP from"
577                 " within gdb by issuing the 'continue' command"
578             )
579         print(single_line_delim)
580         input("Press ENTER to continue running the testcase...")
581
582     @classmethod
583     def attach_vpp(cls):
584         cls.vpp = DummyVpp()
585
586     @classmethod
587     def run_vpp(cls):
588         if (
589             is_distro_ubuntu2204 == True and cls.has_tag(TestCaseTag.FIXME_UBUNTU2204)
590         ) or (is_distro_debian11 == True and cls.has_tag(TestCaseTag.FIXME_DEBIAN11)):
591             return
592         cls.logger.debug(f"Assigned cpus: {cls.cpus}")
593         cmdline = cls.vpp_cmdline
594
595         if cls.debug_gdbserver:
596             gdbserver = "/usr/bin/gdbserver"
597             if not os.path.isfile(gdbserver) or not os.access(gdbserver, os.X_OK):
598                 raise Exception(
599                     "gdbserver binary '%s' does not exist or is "
600                     "not executable" % gdbserver
601                 )
602
603             cmdline = [
604                 gdbserver,
605                 "localhost:{port}".format(port=cls.gdbserver_port),
606             ] + cls.vpp_cmdline
607             cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
608
609         try:
610             cls.vpp = subprocess.Popen(
611                 cmdline, stdout=subprocess.PIPE, stderr=subprocess.PIPE
612             )
613         except subprocess.CalledProcessError as e:
614             cls.logger.critical(
615                 "Subprocess returned with non-0 return code: (%s)", e.returncode
616             )
617             raise
618         except OSError as e:
619             cls.logger.critical(
620                 "Subprocess returned with OS error: (%s) %s", e.errno, e.strerror
621             )
622             raise
623         except Exception as e:
624             cls.logger.exception("Subprocess returned unexpected from %s:", cmdline)
625             raise
626
627         cls.wait_for_enter()
628
629     @classmethod
630     def wait_for_coredump(cls):
631         corefile = cls.tempdir + "/core"
632         if os.path.isfile(corefile):
633             cls.logger.error("Waiting for coredump to complete: %s", corefile)
634             curr_size = os.path.getsize(corefile)
635             deadline = time.time() + 60
636             ok = False
637             while time.time() < deadline:
638                 cls.sleep(1)
639                 size = curr_size
640                 curr_size = os.path.getsize(corefile)
641                 if size == curr_size:
642                     ok = True
643                     break
644             if not ok:
645                 cls.logger.error(
646                     "Timed out waiting for coredump to complete: %s", corefile
647                 )
648             else:
649                 cls.logger.error("Coredump complete: %s, size %d", corefile, curr_size)
650
651     @classmethod
652     def get_stats_sock_path(cls):
653         return "%s/stats.sock" % cls.tempdir
654
655     @classmethod
656     def get_api_sock_path(cls):
657         return "%s/api.sock" % cls.tempdir
658
659     @classmethod
660     def get_api_segment_prefix(cls):
661         return os.path.basename(cls.tempdir)  # Only used for VAPI
662
663     @classmethod
664     def get_tempdir(cls):
665         if cls.debug_attach:
666             tmpdir = f"{config.tmp_dir}/unittest-attach-gdb"
667         else:
668             tmpdir = f"{config.tmp_dir}/vpp-unittest-{cls.__name__}"
669             if config.wipe_tmp_dir:
670                 shutil.rmtree(tmpdir, ignore_errors=True)
671             os.mkdir(tmpdir)
672         return tmpdir
673
674     @classmethod
675     def create_file_handler(cls):
676         if config.log_dir is None:
677             cls.file_handler = FileHandler(f"{cls.tempdir}/log.txt")
678             return
679
680         logdir = f"{config.log_dir}/vpp-unittest-{cls.__name__}"
681         if config.wipe_tmp_dir:
682             shutil.rmtree(logdir, ignore_errors=True)
683         os.mkdir(logdir)
684         cls.file_handler = FileHandler(f"{logdir}/log.txt")
685
686     @classmethod
687     def setUpClass(cls):
688         """
689         Perform class setup before running the testcase
690         Remove shared memory files, start vpp and connect the vpp-api
691         """
692         super(VppTestCase, cls).setUpClass()
693         cls.logger = get_logger(cls.__name__)
694         random.seed(config.rnd_seed)
695         if hasattr(cls, "parallel_handler"):
696             cls.logger.addHandler(cls.parallel_handler)
697             cls.logger.propagate = False
698         cls.set_debug_flags(config.debug)
699         cls.tempdir = cls.get_tempdir()
700         cls.create_file_handler()
701         cls.file_handler.setFormatter(
702             Formatter(fmt="%(asctime)s,%(msecs)03d %(message)s", datefmt="%H:%M:%S")
703         )
704         cls.file_handler.setLevel(DEBUG)
705         cls.logger.addHandler(cls.file_handler)
706         cls.logger.debug("--- setUpClass() for %s called ---" % cls.__name__)
707         os.chdir(cls.tempdir)
708         cls.logger.info(
709             "Temporary dir is %s, api socket is %s",
710             cls.tempdir,
711             cls.get_api_sock_path(),
712         )
713         cls.logger.debug("Random seed is %s", config.rnd_seed)
714         cls.setUpConstants()
715         cls.reset_packet_infos()
716         cls._pcaps = []
717         cls._old_pcaps = []
718         cls.verbose = 0
719         cls.vpp_dead = False
720         cls.registry = VppObjectRegistry()
721         cls.vpp_startup_failed = False
722         cls.reporter = KeepAliveReporter()
723         # need to catch exceptions here because if we raise, then the cleanup
724         # doesn't get called and we might end with a zombie vpp
725         try:
726             if cls.debug_attach:
727                 cls.attach_vpp()
728             else:
729                 cls.run_vpp()
730                 if not hasattr(cls, "vpp"):
731                     return
732             cls.reporter.send_keep_alive(cls, "setUpClass")
733             VppTestResult.current_test_case_info = TestCaseInfo(
734                 cls.logger, cls.tempdir, cls.vpp.pid, config.vpp
735             )
736             cls.vpp_stdout_deque = deque()
737             cls.vpp_stderr_deque = deque()
738             # Pump thread in a non-debug-attached & not running-vpp
739             if not cls.debug_attach and not hasattr(cls, "running_vpp"):
740                 cls.pump_thread_stop_flag = Event()
741                 cls.pump_thread_wakeup_pipe = os.pipe()
742                 cls.pump_thread = Thread(target=pump_output, args=(cls,))
743                 cls.pump_thread.daemon = True
744                 cls.pump_thread.start()
745             if cls.debug_gdb or cls.debug_gdbserver or cls.debug_attach:
746                 cls.vapi_response_timeout = 0
747             cls.vapi = VppPapiProvider(cls.__name__, cls, cls.vapi_response_timeout)
748             if cls.step:
749                 hook = hookmodule.StepHook(cls)
750             else:
751                 hook = hookmodule.PollHook(cls)
752             cls.vapi.register_hook(hook)
753             cls.statistics = VPPStats(socketname=cls.get_stats_sock_path())
754             try:
755                 hook.poll_vpp()
756             except VppDiedError:
757                 cls.vpp_startup_failed = True
758                 cls.logger.critical(
759                     "VPP died shortly after startup, check the"
760                     " output to standard error for possible cause"
761                 )
762                 raise
763             try:
764                 cls.vapi.connect()
765             except (vpp_papi.VPPIOError, Exception) as e:
766                 cls.logger.debug("Exception connecting to vapi: %s" % e)
767                 cls.vapi.disconnect()
768
769                 if cls.debug_gdbserver:
770                     print(
771                         colorize(
772                             "You're running VPP inside gdbserver but "
773                             "VPP-API connection failed, did you forget "
774                             "to 'continue' VPP from within gdb?",
775                             RED,
776                         )
777                     )
778                 raise e
779             if cls.debug_attach:
780                 last_line = cls.vapi.cli("show thread").split("\n")[-2]
781                 cls.vpp_worker_count = int(last_line.split(" ")[0])
782                 print("Detected VPP with %s workers." % cls.vpp_worker_count)
783         except vpp_papi.VPPRuntimeError as e:
784             cls.logger.debug("%s" % e)
785             cls.quit()
786             raise e
787         except Exception as e:
788             cls.logger.debug("Exception connecting to VPP: %s" % e)
789             cls.quit()
790             raise e
791
792     @classmethod
793     def _debug_quit(cls):
794         if cls.debug_gdbserver or cls.debug_gdb:
795             try:
796                 cls.vpp.poll()
797
798                 if cls.vpp.returncode is None:
799                     print()
800                     print(double_line_delim)
801                     print("VPP or GDB server is still running")
802                     print(single_line_delim)
803                     input(
804                         "When done debugging, press ENTER to kill the "
805                         "process and finish running the testcase..."
806                     )
807             except AttributeError:
808                 pass
809
810     @classmethod
811     def quit(cls):
812         """
813         Disconnect vpp-api, kill vpp and cleanup shared memory files
814         """
815         cls._debug_quit()
816         if hasattr(cls, "running_vpp"):
817             cls.vpp.quit_vpp()
818
819         # first signal that we want to stop the pump thread, then wake it up
820         if hasattr(cls, "pump_thread_stop_flag"):
821             cls.pump_thread_stop_flag.set()
822         if hasattr(cls, "pump_thread_wakeup_pipe"):
823             os.write(cls.pump_thread_wakeup_pipe[1], b"ding dong wake up")
824         if hasattr(cls, "pump_thread"):
825             cls.logger.debug("Waiting for pump thread to stop")
826             cls.pump_thread.join()
827         if hasattr(cls, "vpp_stderr_reader_thread"):
828             cls.logger.debug("Waiting for stderr pump to stop")
829             cls.vpp_stderr_reader_thread.join()
830
831         if hasattr(cls, "vpp"):
832             if hasattr(cls, "vapi"):
833                 cls.logger.debug(cls.vapi.vpp.get_stats())
834                 cls.logger.debug("Disconnecting class vapi client on %s", cls.__name__)
835                 cls.vapi.disconnect()
836                 cls.logger.debug("Deleting class vapi attribute on %s", cls.__name__)
837                 del cls.vapi
838             cls.vpp.poll()
839             if not cls.debug_attach and cls.vpp.returncode is None:
840                 cls.wait_for_coredump()
841                 cls.logger.debug("Sending TERM to vpp")
842                 cls.vpp.terminate()
843                 cls.logger.debug("Waiting for vpp to die")
844                 try:
845                     outs, errs = cls.vpp.communicate(timeout=5)
846                 except subprocess.TimeoutExpired:
847                     cls.vpp.kill()
848                     outs, errs = cls.vpp.communicate()
849             cls.logger.debug("Deleting class vpp attribute on %s", cls.__name__)
850             if not cls.debug_attach and not hasattr(cls, "running_vpp"):
851                 cls.vpp.stdout.close()
852                 cls.vpp.stderr.close()
853             # If vpp is a dynamic attribute set by the func use_running,
854             # deletion will result in an AttributeError that we can
855             # safetly pass.
856             try:
857                 del cls.vpp
858             except AttributeError:
859                 pass
860
861         if cls.vpp_startup_failed:
862             stdout_log = cls.logger.info
863             stderr_log = cls.logger.critical
864         else:
865             stdout_log = cls.logger.info
866             stderr_log = cls.logger.info
867
868         if hasattr(cls, "vpp_stdout_deque"):
869             stdout_log(single_line_delim)
870             stdout_log("VPP output to stdout while running %s:", cls.__name__)
871             stdout_log(single_line_delim)
872             vpp_output = "".join(cls.vpp_stdout_deque)
873             with open(cls.tempdir + "/vpp_stdout.txt", "w") as f:
874                 f.write(vpp_output)
875             stdout_log("\n%s", vpp_output)
876             stdout_log(single_line_delim)
877
878         if hasattr(cls, "vpp_stderr_deque"):
879             stderr_log(single_line_delim)
880             stderr_log("VPP output to stderr while running %s:", cls.__name__)
881             stderr_log(single_line_delim)
882             vpp_output = "".join(cls.vpp_stderr_deque)
883             with open(cls.tempdir + "/vpp_stderr.txt", "w") as f:
884                 f.write(vpp_output)
885             stderr_log("\n%s", vpp_output)
886             stderr_log(single_line_delim)
887
888     @classmethod
889     def tearDownClass(cls):
890         """Perform final cleanup after running all tests in this test-case"""
891         cls.logger.debug("--- tearDownClass() for %s called ---" % cls.__name__)
892         if not hasattr(cls, "vpp"):
893             return
894         cls.reporter.send_keep_alive(cls, "tearDownClass")
895         cls.quit()
896         cls.file_handler.close()
897         cls.reset_packet_infos()
898         if config.debug_framework:
899             debug_internal.on_tear_down_class(cls)
900
901     def show_commands_at_teardown(self):
902         """Allow subclass specific teardown logging additions."""
903         self.logger.info("--- No test specific show commands provided. ---")
904
905     def tearDown(self):
906         """Show various debug prints after each test"""
907         self.logger.debug(
908             "--- tearDown() for %s.%s(%s) called ---"
909             % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
910         )
911         if not hasattr(self, "vpp"):
912             return
913
914         try:
915             if not self.vpp_dead:
916                 self.logger.debug(self.vapi.cli("show trace max 1000"))
917                 self.logger.info(self.vapi.ppcli("show interface"))
918                 self.logger.info(self.vapi.ppcli("show hardware"))
919                 self.logger.info(self.statistics.set_errors_str())
920                 self.logger.info(self.vapi.ppcli("show run"))
921                 self.logger.info(self.vapi.ppcli("show log"))
922                 self.logger.info(self.vapi.ppcli("show bihash"))
923                 self.logger.info("Logging testcase specific show commands.")
924                 self.show_commands_at_teardown()
925                 if self.remove_configured_vpp_objects_on_tear_down:
926                     self.registry.remove_vpp_config(self.logger)
927             # Save/Dump VPP api trace log
928             m = self._testMethodName
929             api_trace = "vpp_api_trace.%s.%d.log" % (m, self.vpp.pid)
930             tmp_api_trace = "/tmp/%s" % api_trace
931             vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
932             self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
933             self.logger.info("Moving %s to %s\n" % (tmp_api_trace, vpp_api_trace_log))
934             shutil.move(tmp_api_trace, vpp_api_trace_log)
935         except VppTransportSocketIOError:
936             self.logger.debug(
937                 "VppTransportSocketIOError: Vpp dead. Cannot log show commands."
938             )
939             self.vpp_dead = True
940         else:
941             self.registry.unregister_all(self.logger)
942
943     def setUp(self):
944         """Clear trace before running each test"""
945         super(VppTestCase, self).setUp()
946         if not hasattr(self, "vpp"):
947             return
948         self.reporter.send_keep_alive(self)
949         if self.vpp_dead:
950             raise VppDiedError(
951                 rv=None,
952                 testcase=self.__class__.__name__,
953                 method_name=self._testMethodName,
954             )
955         self.sleep(0.1, "during setUp")
956         self.vpp_stdout_deque.append(
957             "--- test setUp() for %s.%s(%s) starts here ---\n"
958             % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
959         )
960         self.vpp_stderr_deque.append(
961             "--- test setUp() for %s.%s(%s) starts here ---\n"
962             % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
963         )
964         self.vapi.cli("clear trace")
965         # store the test instance inside the test class - so that objects
966         # holding the class can access instance methods (like assertEqual)
967         type(self).test_instance = self
968
969     @classmethod
970     def pg_enable_capture(cls, interfaces=None):
971         """
972         Enable capture on packet-generator interfaces
973
974         :param interfaces: iterable interface indexes (if None,
975                            use self.pg_interfaces)
976
977         """
978         if interfaces is None:
979             interfaces = cls.pg_interfaces
980         for i in interfaces:
981             i.enable_capture()
982
983     @classmethod
984     def register_pcap(cls, intf, worker):
985         """Register a pcap in the testclass"""
986         # add to the list of captures with current timestamp
987         cls._pcaps.append((intf, worker))
988
989     @classmethod
990     def get_vpp_time(cls):
991         # processes e.g. "Time now 2.190522, Wed, 11 Mar 2020 17:29:54 GMT"
992         # returns float("2.190522")
993         timestr = cls.vapi.cli("show clock")
994         head, sep, tail = timestr.partition(",")
995         head, sep, tail = head.partition("Time now")
996         return float(tail)
997
998     @classmethod
999     def sleep_on_vpp_time(cls, sec):
1000         """Sleep according to time in VPP world"""
1001         # On a busy system with many processes
1002         # we might end up with VPP time being slower than real world
1003         # So take that into account when waiting for VPP to do something
1004         start_time = cls.get_vpp_time()
1005         while cls.get_vpp_time() - start_time < sec:
1006             cls.sleep(0.1)
1007
1008     @classmethod
1009     def pg_start(cls, trace=True):
1010         """Enable the PG, wait till it is done, then clean up"""
1011         for (intf, worker) in cls._old_pcaps:
1012             intf.handle_old_pcap_file(intf.get_in_path(worker), intf.in_history_counter)
1013         cls._old_pcaps = []
1014         if trace:
1015             cls.vapi.cli("clear trace")
1016             cls.vapi.cli("trace add pg-input 1000")
1017         cls.vapi.cli("packet-generator enable")
1018         # PG, when starts, runs to completion -
1019         # so let's avoid a race condition,
1020         # and wait a little till it's done.
1021         # Then clean it up  - and then be gone.
1022         deadline = time.time() + 300
1023         while cls.vapi.cli("show packet-generator").find("Yes") != -1:
1024             cls.sleep(0.01)  # yield
1025             if time.time() > deadline:
1026                 cls.logger.error("Timeout waiting for pg to stop")
1027                 break
1028         for intf, worker in cls._pcaps:
1029             cls.vapi.cli("packet-generator delete %s" % intf.get_cap_name(worker))
1030         cls._old_pcaps = cls._pcaps
1031         cls._pcaps = []
1032
1033     @classmethod
1034     def create_pg_interfaces_internal(cls, interfaces, gso=0, gso_size=0, mode=None):
1035         """
1036         Create packet-generator interfaces.
1037
1038         :param interfaces: iterable indexes of the interfaces.
1039         :returns: List of created interfaces.
1040
1041         """
1042         result = []
1043         for i in interfaces:
1044             intf = VppPGInterface(cls, i, gso, gso_size, mode)
1045             setattr(cls, intf.name, intf)
1046             result.append(intf)
1047         cls.pg_interfaces = result
1048         return result
1049
1050     @classmethod
1051     def create_pg_ip4_interfaces(cls, interfaces, gso=0, gso_size=0):
1052         if not hasattr(cls, "vpp"):
1053             cls.pg_interfaces = []
1054             return cls.pg_interfaces
1055         pgmode = VppEnum.vl_api_pg_interface_mode_t
1056         return cls.create_pg_interfaces_internal(
1057             interfaces, gso, gso_size, pgmode.PG_API_MODE_IP4
1058         )
1059
1060     @classmethod
1061     def create_pg_ip6_interfaces(cls, interfaces, gso=0, gso_size=0):
1062         if not hasattr(cls, "vpp"):
1063             cls.pg_interfaces = []
1064             return cls.pg_interfaces
1065         pgmode = VppEnum.vl_api_pg_interface_mode_t
1066         return cls.create_pg_interfaces_internal(
1067             interfaces, gso, gso_size, pgmode.PG_API_MODE_IP6
1068         )
1069
1070     @classmethod
1071     def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0):
1072         if not hasattr(cls, "vpp"):
1073             cls.pg_interfaces = []
1074             return cls.pg_interfaces
1075         pgmode = VppEnum.vl_api_pg_interface_mode_t
1076         return cls.create_pg_interfaces_internal(
1077             interfaces, gso, gso_size, pgmode.PG_API_MODE_ETHERNET
1078         )
1079
1080     @classmethod
1081     def create_pg_ethernet_interfaces(cls, interfaces, gso=0, gso_size=0):
1082         if not hasattr(cls, "vpp"):
1083             cls.pg_interfaces = []
1084             return cls.pg_interfaces
1085         pgmode = VppEnum.vl_api_pg_interface_mode_t
1086         return cls.create_pg_interfaces_internal(
1087             interfaces, gso, gso_size, pgmode.PG_API_MODE_ETHERNET
1088         )
1089
1090     @classmethod
1091     def create_loopback_interfaces(cls, count):
1092         """
1093         Create loopback interfaces.
1094
1095         :param count: number of interfaces created.
1096         :returns: List of created interfaces.
1097         """
1098         if not hasattr(cls, "vpp"):
1099             cls.lo_interfaces = []
1100             return cls.lo_interfaces
1101         result = [VppLoInterface(cls) for i in range(count)]
1102         for intf in result:
1103             setattr(cls, intf.name, intf)
1104         cls.lo_interfaces = result
1105         return result
1106
1107     @classmethod
1108     def create_bvi_interfaces(cls, count):
1109         """
1110         Create BVI interfaces.
1111
1112         :param count: number of interfaces created.
1113         :returns: List of created interfaces.
1114         """
1115         if not hasattr(cls, "vpp"):
1116             cls.bvi_interfaces = []
1117             return cls.bvi_interfaces
1118         result = [VppBviInterface(cls) for i in range(count)]
1119         for intf in result:
1120             setattr(cls, intf.name, intf)
1121         cls.bvi_interfaces = result
1122         return result
1123
1124     @staticmethod
1125     def extend_packet(packet, size, padding=" "):
1126         """
1127         Extend packet to given size by padding with spaces or custom padding
1128         NOTE: Currently works only when Raw layer is present.
1129
1130         :param packet: packet
1131         :param size: target size
1132         :param padding: padding used to extend the payload
1133
1134         """
1135         packet_len = len(packet) + 4
1136         extend = size - packet_len
1137         if extend > 0:
1138             num = (extend // len(padding)) + 1
1139             packet[Raw].load += (padding * num)[:extend].encode("ascii")
1140
1141     @classmethod
1142     def reset_packet_infos(cls):
1143         """Reset the list of packet info objects and packet counts to zero"""
1144         cls._packet_infos = {}
1145         cls._packet_count_for_dst_if_idx = {}
1146
1147     @classmethod
1148     def create_packet_info(cls, src_if, dst_if):
1149         """
1150         Create packet info object containing the source and destination indexes
1151         and add it to the testcase's packet info list
1152
1153         :param VppInterface src_if: source interface
1154         :param VppInterface dst_if: destination interface
1155
1156         :returns: _PacketInfo object
1157
1158         """
1159         info = _PacketInfo()
1160         info.index = len(cls._packet_infos)
1161         info.src = src_if.sw_if_index
1162         info.dst = dst_if.sw_if_index
1163         if isinstance(dst_if, VppSubInterface):
1164             dst_idx = dst_if.parent.sw_if_index
1165         else:
1166             dst_idx = dst_if.sw_if_index
1167         if dst_idx in cls._packet_count_for_dst_if_idx:
1168             cls._packet_count_for_dst_if_idx[dst_idx] += 1
1169         else:
1170             cls._packet_count_for_dst_if_idx[dst_idx] = 1
1171         cls._packet_infos[info.index] = info
1172         return info
1173
1174     @staticmethod
1175     def info_to_payload(info):
1176         """
1177         Convert _PacketInfo object to packet payload
1178
1179         :param info: _PacketInfo object
1180
1181         :returns: string containing serialized data from packet info
1182         """
1183
1184         # retrieve payload, currently 18 bytes (4 x ints + 1 short)
1185         return pack("iiiih", info.index, info.src, info.dst, info.ip, info.proto)
1186
1187     @staticmethod
1188     def payload_to_info(payload, payload_field="load"):
1189         """
1190         Convert packet payload to _PacketInfo object
1191
1192         :param payload: packet payload
1193         :type payload:  <class 'scapy.packet.Raw'>
1194         :param payload_field: packet fieldname of payload "load" for
1195                 <class 'scapy.packet.Raw'>
1196         :type payload_field: str
1197         :returns: _PacketInfo object containing de-serialized data from payload
1198
1199         """
1200
1201         # retrieve payload, currently 18 bytes (4 x ints + 1 short)
1202         payload_b = getattr(payload, payload_field)[:18]
1203
1204         info = _PacketInfo()
1205         info.index, info.src, info.dst, info.ip, info.proto = unpack("iiiih", payload_b)
1206
1207         # some SRv6 TCs depend on get an exception if bad values are detected
1208         if info.index > 0x4000:
1209             raise ValueError("Index value is invalid")
1210
1211         return info
1212
1213     def get_next_packet_info(self, info):
1214         """
1215         Iterate over the packet info list stored in the testcase
1216         Start iteration with first element if info is None
1217         Continue based on index in info if info is specified
1218
1219         :param info: info or None
1220         :returns: next info in list or None if no more infos
1221         """
1222         if info is None:
1223             next_index = 0
1224         else:
1225             next_index = info.index + 1
1226         if next_index == len(self._packet_infos):
1227             return None
1228         else:
1229             return self._packet_infos[next_index]
1230
1231     def get_next_packet_info_for_interface(self, src_index, info):
1232         """
1233         Search the packet info list for the next packet info with same source
1234         interface index
1235
1236         :param src_index: source interface index to search for
1237         :param info: packet info - where to start the search
1238         :returns: packet info or None
1239
1240         """
1241         while True:
1242             info = self.get_next_packet_info(info)
1243             if info is None:
1244                 return None
1245             if info.src == src_index:
1246                 return info
1247
1248     def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
1249         """
1250         Search the packet info list for the next packet info with same source
1251         and destination interface indexes
1252
1253         :param src_index: source interface index to search for
1254         :param dst_index: destination interface index to search for
1255         :param info: packet info - where to start the search
1256         :returns: packet info or None
1257
1258         """
1259         while True:
1260             info = self.get_next_packet_info_for_interface(src_index, info)
1261             if info is None:
1262                 return None
1263             if info.dst == dst_index:
1264                 return info
1265
1266     def assert_equal(self, real_value, expected_value, name_or_class=None):
1267         if name_or_class is None:
1268             self.assertEqual(real_value, expected_value)
1269             return
1270         try:
1271             msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
1272             msg = msg % (
1273                 getdoc(name_or_class).strip(),
1274                 real_value,
1275                 str(name_or_class(real_value)),
1276                 expected_value,
1277                 str(name_or_class(expected_value)),
1278             )
1279         except Exception:
1280             msg = "Invalid %s: %s does not match expected value %s" % (
1281                 name_or_class,
1282                 real_value,
1283                 expected_value,
1284             )
1285
1286         self.assertEqual(real_value, expected_value, msg)
1287
1288     def assert_in_range(self, real_value, expected_min, expected_max, name=None):
1289         if name is None:
1290             msg = None
1291         else:
1292             msg = "Invalid %s: %s out of range <%s,%s>" % (
1293                 name,
1294                 real_value,
1295                 expected_min,
1296                 expected_max,
1297             )
1298         self.assertTrue(expected_min <= real_value <= expected_max, msg)
1299
1300     def assert_packet_checksums_valid(self, packet, ignore_zero_udp_checksums=True):
1301         received = packet.__class__(scapy.compat.raw(packet))
1302         udp_layers = ["UDP", "UDPerror"]
1303         checksum_fields = ["cksum", "chksum"]
1304         checksums = []
1305         counter = 0
1306         temp = received.__class__(scapy.compat.raw(received))
1307         while True:
1308             layer = temp.getlayer(counter)
1309             if layer:
1310                 layer = layer.copy()
1311                 layer.remove_payload()
1312                 for cf in checksum_fields:
1313                     if hasattr(layer, cf):
1314                         if (
1315                             ignore_zero_udp_checksums
1316                             and 0 == getattr(layer, cf)
1317                             and layer.name in udp_layers
1318                         ):
1319                             continue
1320                         delattr(temp.getlayer(counter), cf)
1321                         checksums.append((counter, cf))
1322             else:
1323                 break
1324             counter = counter + 1
1325         if 0 == len(checksums):
1326             return
1327         temp = temp.__class__(scapy.compat.raw(temp))
1328         for layer, cf in reversed(checksums):
1329             calc_sum = getattr(temp[layer], cf)
1330             self.assert_equal(
1331                 getattr(received[layer], cf),
1332                 calc_sum,
1333                 "packet checksum on layer #%d: %s" % (layer, temp[layer].name),
1334             )
1335             self.logger.debug(
1336                 "Checksum field `%s` on `%s` layer has correct value `%s`"
1337                 % (cf, temp[layer].name, calc_sum)
1338             )
1339
1340     def assert_checksum_valid(
1341         self,
1342         received_packet,
1343         layer,
1344         checksum_field_names=["chksum", "cksum"],
1345         ignore_zero_checksum=False,
1346     ):
1347         """Check checksum of received packet on given layer"""
1348         layer_copy = received_packet[layer].copy()
1349         layer_copy.remove_payload()
1350         field_name = None
1351         for f in checksum_field_names:
1352             if hasattr(layer_copy, f):
1353                 field_name = f
1354                 break
1355         if field_name is None:
1356             raise Exception(
1357                 f"Layer `{layer}` has none of checksum fields: `{checksum_field_names}`."
1358             )
1359         received_packet_checksum = getattr(received_packet[layer], field_name)
1360         if ignore_zero_checksum and 0 == received_packet_checksum:
1361             return
1362         recalculated = received_packet.__class__(scapy.compat.raw(received_packet))
1363         delattr(recalculated[layer], field_name)
1364         recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
1365         self.assert_equal(
1366             received_packet_checksum,
1367             getattr(recalculated[layer], field_name),
1368             f"packet checksum (field: {field_name}) on layer: %s" % layer,
1369         )
1370
1371     def assert_ip_checksum_valid(self, received_packet, ignore_zero_checksum=False):
1372         self.assert_checksum_valid(
1373             received_packet, "IP", ignore_zero_checksum=ignore_zero_checksum
1374         )
1375
1376     def assert_tcp_checksum_valid(self, received_packet, ignore_zero_checksum=False):
1377         self.assert_checksum_valid(
1378             received_packet, "TCP", ignore_zero_checksum=ignore_zero_checksum
1379         )
1380
1381     def assert_udp_checksum_valid(self, received_packet, ignore_zero_checksum=True):
1382         self.assert_checksum_valid(
1383             received_packet, "UDP", ignore_zero_checksum=ignore_zero_checksum
1384         )
1385
1386     def assert_embedded_icmp_checksum_valid(self, received_packet):
1387         if received_packet.haslayer(IPerror):
1388             self.assert_checksum_valid(received_packet, "IPerror")
1389         if received_packet.haslayer(TCPerror):
1390             self.assert_checksum_valid(received_packet, "TCPerror")
1391         if received_packet.haslayer(UDPerror):
1392             self.assert_checksum_valid(
1393                 received_packet, "UDPerror", ignore_zero_checksum=True
1394             )
1395         if received_packet.haslayer(ICMPerror):
1396             self.assert_checksum_valid(received_packet, "ICMPerror")
1397
1398     def assert_icmp_checksum_valid(self, received_packet):
1399         self.assert_checksum_valid(received_packet, "ICMP")
1400         self.assert_embedded_icmp_checksum_valid(received_packet)
1401
1402     def assert_icmpv6_checksum_valid(self, pkt):
1403         if pkt.haslayer(ICMPv6DestUnreach):
1404             self.assert_checksum_valid(pkt, "ICMPv6DestUnreach")
1405             self.assert_embedded_icmp_checksum_valid(pkt)
1406         if pkt.haslayer(ICMPv6EchoRequest):
1407             self.assert_checksum_valid(pkt, "ICMPv6EchoRequest")
1408         if pkt.haslayer(ICMPv6EchoReply):
1409             self.assert_checksum_valid(pkt, "ICMPv6EchoReply")
1410
1411     def get_counter(self, counter):
1412         if counter.startswith("/"):
1413             counter_value = self.statistics.get_counter(counter)
1414         else:
1415             counters = self.vapi.cli("sh errors").split("\n")
1416             counter_value = 0
1417             for i in range(1, len(counters) - 1):
1418                 results = counters[i].split()
1419                 if results[1] == counter:
1420                     counter_value = int(results[0])
1421                     break
1422         return counter_value
1423
1424     def assert_counter_equal(self, counter, expected_value, thread=None, index=0):
1425         c = self.get_counter(counter)
1426         if thread is not None:
1427             c = c[thread][index]
1428         else:
1429             c = sum(x[index] for x in c)
1430         self.assert_equal(c, expected_value, "counter `%s'" % counter)
1431
1432     def assert_packet_counter_equal(self, counter, expected_value):
1433         counter_value = self.get_counter(counter)
1434         self.assert_equal(
1435             counter_value, expected_value, "packet counter `%s'" % counter
1436         )
1437
1438     def assert_error_counter_equal(self, counter, expected_value):
1439         counter_value = self.statistics[counter].sum()
1440         self.assert_equal(counter_value, expected_value, "error counter `%s'" % counter)
1441
1442     @classmethod
1443     def sleep(cls, timeout, remark=None):
1444
1445         # /* Allow sleep(0) to maintain win32 semantics, and as decreed
1446         #  * by Guido, only the main thread can be interrupted.
1447         # */
1448         # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892  # noqa
1449         if timeout == 0:
1450             # yield quantum
1451             if hasattr(os, "sched_yield"):
1452                 os.sched_yield()
1453             else:
1454                 time.sleep(0)
1455             return
1456
1457         cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1458         before = time.time()
1459         time.sleep(timeout)
1460         after = time.time()
1461         if after - before > 2 * timeout:
1462             cls.logger.error(
1463                 "unexpected self.sleep() result - slept for %es instead of ~%es!",
1464                 after - before,
1465                 timeout,
1466             )
1467
1468         cls.logger.debug(
1469             "Finished sleep (%s) - slept %es (wanted %es)",
1470             remark,
1471             after - before,
1472             timeout,
1473         )
1474
1475     def virtual_sleep(self, timeout, remark=None):
1476         self.logger.debug("Moving VPP time by %s (%s)", timeout, remark)
1477         self.vapi.cli("set clock adjust %s" % timeout)
1478
1479     def pg_send(self, intf, pkts, worker=None, trace=True):
1480         intf.add_stream(pkts, worker=worker)
1481         self.pg_enable_capture(self.pg_interfaces)
1482         self.pg_start(trace=trace)
1483
1484     def snapshot_stats(self, stats_diff):
1485         """Return snapshot of interesting stats based on diff dictionary."""
1486         stats_snapshot = {}
1487         for sw_if_index in stats_diff:
1488             for counter in stats_diff[sw_if_index]:
1489                 stats_snapshot[counter] = self.statistics[counter]
1490         self.logger.debug(f"Took statistics stats_snapshot: {stats_snapshot}")
1491         return stats_snapshot
1492
1493     def compare_stats_with_snapshot(self, stats_diff, stats_snapshot):
1494         """Assert appropriate difference between current stats and snapshot."""
1495         for sw_if_index in stats_diff:
1496             for cntr, diff in stats_diff[sw_if_index].items():
1497                 if sw_if_index == "err":
1498                     self.assert_equal(
1499                         self.statistics[cntr].sum(),
1500                         stats_snapshot[cntr].sum() + diff,
1501                         f"'{cntr}' counter value (previous value: "
1502                         f"{stats_snapshot[cntr].sum()}, "
1503                         f"expected diff: {diff})",
1504                     )
1505                 else:
1506                     try:
1507                         self.assert_equal(
1508                             self.statistics[cntr][:, sw_if_index].sum(),
1509                             stats_snapshot[cntr][:, sw_if_index].sum() + diff,
1510                             f"'{cntr}' counter value (previous value: "
1511                             f"{stats_snapshot[cntr][:, sw_if_index].sum()}, "
1512                             f"expected diff: {diff})",
1513                         )
1514                     except IndexError as e:
1515                         # if diff is 0, then this most probably a case where
1516                         # test declares multiple interfaces but traffic hasn't
1517                         # passed through this one yet - which means the counter
1518                         # value is 0 and can be ignored
1519                         if 0 != diff:
1520                             raise Exception(
1521                                 f"Couldn't sum counter: {cntr} on sw_if_index: {sw_if_index}"
1522                             ) from e
1523
1524     def send_and_assert_no_replies(
1525         self, intf, pkts, remark="", timeout=None, stats_diff=None, trace=True, msg=None
1526     ):
1527         if stats_diff:
1528             stats_snapshot = self.snapshot_stats(stats_diff)
1529
1530         self.pg_send(intf, pkts)
1531
1532         try:
1533             if not timeout:
1534                 timeout = 1
1535             for i in self.pg_interfaces:
1536                 i.assert_nothing_captured(timeout=timeout, remark=remark)
1537                 timeout = 0.1
1538         finally:
1539             if trace:
1540                 if msg:
1541                     self.logger.debug(f"send_and_assert_no_replies: {msg}")
1542                 self.logger.debug(self.vapi.cli("show trace"))
1543
1544         if stats_diff:
1545             self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1546
1547     def send_and_expect(
1548         self,
1549         intf,
1550         pkts,
1551         output,
1552         n_rx=None,
1553         worker=None,
1554         trace=True,
1555         msg=None,
1556         stats_diff=None,
1557     ):
1558         if stats_diff:
1559             stats_snapshot = self.snapshot_stats(stats_diff)
1560
1561         if not n_rx:
1562             n_rx = 1 if isinstance(pkts, Packet) else len(pkts)
1563         self.pg_send(intf, pkts, worker=worker, trace=trace)
1564         rx = output.get_capture(n_rx)
1565         if trace:
1566             if msg:
1567                 self.logger.debug(f"send_and_expect: {msg}")
1568             self.logger.debug(self.vapi.cli("show trace"))
1569
1570         if stats_diff:
1571             self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1572
1573         return rx
1574
1575     def send_and_expect_load_balancing(
1576         self, input, pkts, outputs, worker=None, trace=True
1577     ):
1578         self.pg_send(input, pkts, worker=worker, trace=trace)
1579         rxs = []
1580         for oo in outputs:
1581             rx = oo._get_capture(1)
1582             self.assertNotEqual(0, len(rx))
1583             rxs.append(rx)
1584         if trace:
1585             self.logger.debug(self.vapi.cli("show trace"))
1586         return rxs
1587
1588     def send_and_expect_some(self, intf, pkts, output, worker=None, trace=True):
1589         self.pg_send(intf, pkts, worker=worker, trace=trace)
1590         rx = output._get_capture(1)
1591         if trace:
1592             self.logger.debug(self.vapi.cli("show trace"))
1593         self.assertTrue(len(rx) > 0)
1594         self.assertTrue(len(rx) < len(pkts))
1595         return rx
1596
1597     def send_and_expect_only(self, intf, pkts, output, timeout=None, stats_diff=None):
1598         if stats_diff:
1599             stats_snapshot = self.snapshot_stats(stats_diff)
1600
1601         self.pg_send(intf, pkts)
1602         rx = output.get_capture(len(pkts))
1603         outputs = [output]
1604         if not timeout:
1605             timeout = 1
1606         for i in self.pg_interfaces:
1607             if i not in outputs:
1608                 i.assert_nothing_captured(timeout=timeout)
1609                 timeout = 0.1
1610
1611         if stats_diff:
1612             self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1613
1614         return rx
1615
1616
1617 def get_testcase_doc_name(test):
1618     return getdoc(test.__class__).splitlines()[0]
1619
1620
1621 def get_test_description(descriptions, test):
1622     short_description = test.shortDescription()
1623     if descriptions and short_description:
1624         return short_description
1625     else:
1626         return str(test)
1627
1628
1629 class TestCaseInfo(object):
1630     def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1631         self.logger = logger
1632         self.tempdir = tempdir
1633         self.vpp_pid = vpp_pid
1634         self.vpp_bin_path = vpp_bin_path
1635         self.core_crash_test = None
1636
1637
1638 class VppTestResult(unittest.TestResult):
1639     """
1640     @property result_string
1641      String variable to store the test case result string.
1642     @property errors
1643      List variable containing 2-tuples of TestCase instances and strings
1644      holding formatted tracebacks. Each tuple represents a test which
1645      raised an unexpected exception.
1646     @property failures
1647      List variable containing 2-tuples of TestCase instances and strings
1648      holding formatted tracebacks. Each tuple represents a test where
1649      a failure was explicitly signalled using the TestCase.assert*()
1650      methods.
1651     """
1652
1653     failed_test_cases_info = set()
1654     core_crash_test_cases_info = set()
1655     current_test_case_info = None
1656
1657     def __init__(self, stream=None, descriptions=None, verbosity=None, runner=None):
1658         """
1659         :param stream File descriptor to store where to report test results.
1660             Set to the standard error stream by default.
1661         :param descriptions Boolean variable to store information if to use
1662             test case descriptions.
1663         :param verbosity Integer variable to store required verbosity level.
1664         """
1665         super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1666         self.stream = stream
1667         self.descriptions = descriptions
1668         self.verbosity = verbosity
1669         self.result_string = None
1670         self.runner = runner
1671         self.printed = []
1672
1673     def addSuccess(self, test):
1674         """
1675         Record a test succeeded result
1676
1677         :param test:
1678
1679         """
1680         if self.current_test_case_info:
1681             self.current_test_case_info.logger.debug(
1682                 "--- addSuccess() %s.%s(%s) called"
1683                 % (test.__class__.__name__, test._testMethodName, test._testMethodDoc)
1684             )
1685         unittest.TestResult.addSuccess(self, test)
1686         self.result_string = colorize("OK", GREEN)
1687
1688         self.send_result_through_pipe(test, PASS)
1689
1690     def addSkip(self, test, reason):
1691         """
1692         Record a test skipped.
1693
1694         :param test:
1695         :param reason:
1696
1697         """
1698         if self.current_test_case_info:
1699             self.current_test_case_info.logger.debug(
1700                 "--- addSkip() %s.%s(%s) called, reason is %s"
1701                 % (
1702                     test.__class__.__name__,
1703                     test._testMethodName,
1704                     test._testMethodDoc,
1705                     reason,
1706                 )
1707             )
1708         unittest.TestResult.addSkip(self, test, reason)
1709         self.result_string = colorize("SKIP", YELLOW)
1710
1711         if reason == "not enough cpus":
1712             self.send_result_through_pipe(test, SKIP_CPU_SHORTAGE)
1713         else:
1714             self.send_result_through_pipe(test, SKIP)
1715
1716     def symlink_failed(self):
1717         if self.current_test_case_info:
1718             try:
1719                 failed_dir = config.failed_dir
1720                 link_path = os.path.join(
1721                     failed_dir,
1722                     "%s-FAILED" % os.path.basename(self.current_test_case_info.tempdir),
1723                 )
1724
1725                 self.current_test_case_info.logger.debug(
1726                     "creating a link to the failed test"
1727                 )
1728                 self.current_test_case_info.logger.debug(
1729                     "os.symlink(%s, %s)"
1730                     % (self.current_test_case_info.tempdir, link_path)
1731                 )
1732                 if os.path.exists(link_path):
1733                     self.current_test_case_info.logger.debug("symlink already exists")
1734                 else:
1735                     os.symlink(self.current_test_case_info.tempdir, link_path)
1736
1737             except Exception as e:
1738                 self.current_test_case_info.logger.error(e)
1739
1740     def send_result_through_pipe(self, test, result):
1741         if hasattr(self, "test_framework_result_pipe"):
1742             pipe = self.test_framework_result_pipe
1743             if pipe:
1744                 pipe.send((test.id(), result))
1745
1746     def log_error(self, test, err, fn_name):
1747         if self.current_test_case_info:
1748             if isinstance(test, unittest.suite._ErrorHolder):
1749                 test_name = test.description
1750             else:
1751                 test_name = "%s.%s(%s)" % (
1752                     test.__class__.__name__,
1753                     test._testMethodName,
1754                     test._testMethodDoc,
1755                 )
1756             self.current_test_case_info.logger.debug(
1757                 "--- %s() %s called, err is %s" % (fn_name, test_name, err)
1758             )
1759             self.current_test_case_info.logger.debug(
1760                 "formatted exception is:\n%s" % "".join(format_exception(*err))
1761             )
1762
1763     def add_error(self, test, err, unittest_fn, error_type):
1764         if error_type == FAIL:
1765             self.log_error(test, err, "addFailure")
1766             error_type_str = colorize("FAIL", RED)
1767         elif error_type == ERROR:
1768             self.log_error(test, err, "addError")
1769             error_type_str = colorize("ERROR", RED)
1770         else:
1771             raise Exception(
1772                 "Error type %s cannot be used to record an "
1773                 "error or a failure" % error_type
1774             )
1775
1776         unittest_fn(self, test, err)
1777         if self.current_test_case_info:
1778             self.result_string = "%s [ temp dir used by test case: %s ]" % (
1779                 error_type_str,
1780                 self.current_test_case_info.tempdir,
1781             )
1782             self.symlink_failed()
1783             self.failed_test_cases_info.add(self.current_test_case_info)
1784             if is_core_present(self.current_test_case_info.tempdir):
1785                 if not self.current_test_case_info.core_crash_test:
1786                     if isinstance(test, unittest.suite._ErrorHolder):
1787                         test_name = str(test)
1788                     else:
1789                         test_name = "'{!s}' ({!s})".format(
1790                             get_testcase_doc_name(test), test.id()
1791                         )
1792                     self.current_test_case_info.core_crash_test = test_name
1793                 self.core_crash_test_cases_info.add(self.current_test_case_info)
1794         else:
1795             self.result_string = "%s [no temp dir]" % error_type_str
1796
1797         self.send_result_through_pipe(test, error_type)
1798
1799     def addFailure(self, test, err):
1800         """
1801         Record a test failed result
1802
1803         :param test:
1804         :param err: error message
1805
1806         """
1807         self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1808
1809     def addError(self, test, err):
1810         """
1811         Record a test error result
1812
1813         :param test:
1814         :param err: error message
1815
1816         """
1817         self.add_error(test, err, unittest.TestResult.addError, ERROR)
1818
1819     def getDescription(self, test):
1820         """
1821         Get test description
1822
1823         :param test:
1824         :returns: test description
1825
1826         """
1827         return get_test_description(self.descriptions, test)
1828
1829     def startTest(self, test):
1830         """
1831         Start a test
1832
1833         :param test:
1834
1835         """
1836
1837         def print_header(test):
1838             if test.__class__ in self.printed:
1839                 return
1840
1841             test_doc = getdoc(test)
1842             if not test_doc:
1843                 raise Exception("No doc string for test '%s'" % test.id())
1844
1845             test_title = test_doc.splitlines()[0].rstrip()
1846             test_title = colorize(test_title, GREEN)
1847             if test.is_tagged_run_solo():
1848                 test_title = colorize(f"SOLO RUN: {test_title}", YELLOW)
1849
1850             # This block may overwrite the colorized title above,
1851             # but we want this to stand out and be fixed
1852             if test.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
1853                 test_title = colorize(f"FIXME with VPP workers: {test_title}", RED)
1854
1855             if test.has_tag(TestCaseTag.FIXME_ASAN):
1856                 test_title = colorize(f"FIXME with ASAN: {test_title}", RED)
1857                 test.skip_fixme_asan()
1858
1859             if is_distro_ubuntu2204 == True and test.has_tag(
1860                 TestCaseTag.FIXME_UBUNTU2204
1861             ):
1862                 test_title = colorize(f"FIXME on Ubuntu-22.04: {test_title}", RED)
1863                 test.skip_fixme_ubuntu2204()
1864
1865             if is_distro_debian11 == True and test.has_tag(TestCaseTag.FIXME_DEBIAN11):
1866                 test_title = colorize(f"FIXME on Debian-11: {test_title}", RED)
1867                 test.skip_fixme_debian11()
1868
1869             if "debug" in config.vpp_tag and test.has_tag(TestCaseTag.FIXME_VPP_DEBUG):
1870                 test_title = colorize(f"FIXME on VPP Debug: {test_title}", RED)
1871                 test.skip_fixme_vpp_debug()
1872
1873             if hasattr(test, "vpp_worker_count"):
1874                 if test.vpp_worker_count == 0:
1875                     test_title += " [main thread only]"
1876                 elif test.vpp_worker_count == 1:
1877                     test_title += " [1 worker thread]"
1878                 else:
1879                     test_title += f" [{test.vpp_worker_count} worker threads]"
1880
1881             if test.__class__.skipped_due_to_cpu_lack:
1882                 test_title = colorize(
1883                     f"{test_title} [skipped - not enough cpus, "
1884                     f"required={test.__class__.get_cpus_required()}, "
1885                     f"available={max_vpp_cpus}]",
1886                     YELLOW,
1887                 )
1888
1889             print(double_line_delim)
1890             print(test_title)
1891             print(double_line_delim)
1892             self.printed.append(test.__class__)
1893
1894         print_header(test)
1895         self.start_test = time.time()
1896         unittest.TestResult.startTest(self, test)
1897         if self.verbosity > 0:
1898             self.stream.writeln("Starting " + self.getDescription(test) + " ...")
1899             self.stream.writeln(single_line_delim)
1900
1901     def stopTest(self, test):
1902         """
1903         Called when the given test has been run
1904
1905         :param test:
1906
1907         """
1908         unittest.TestResult.stopTest(self, test)
1909
1910         if self.verbosity > 0:
1911             self.stream.writeln(single_line_delim)
1912             self.stream.writeln(
1913                 "%-73s%s" % (self.getDescription(test), self.result_string)
1914             )
1915             self.stream.writeln(single_line_delim)
1916         else:
1917             self.stream.writeln(
1918                 "%-68s %4.2f %s"
1919                 % (
1920                     self.getDescription(test),
1921                     time.time() - self.start_test,
1922                     self.result_string,
1923                 )
1924             )
1925
1926         self.send_result_through_pipe(test, TEST_RUN)
1927
1928     def printErrors(self):
1929         """
1930         Print errors from running the test case
1931         """
1932         if len(self.errors) > 0 or len(self.failures) > 0:
1933             self.stream.writeln()
1934             self.printErrorList("ERROR", self.errors)
1935             self.printErrorList("FAIL", self.failures)
1936
1937         # ^^ that is the last output from unittest before summary
1938         if not self.runner.print_summary:
1939             devnull = unittest.runner._WritelnDecorator(open(os.devnull, "w"))
1940             self.stream = devnull
1941             self.runner.stream = devnull
1942
1943     def printErrorList(self, flavour, errors):
1944         """
1945         Print error list to the output stream together with error type
1946         and test case description.
1947
1948         :param flavour: error type
1949         :param errors: iterable errors
1950
1951         """
1952         for test, err in errors:
1953             self.stream.writeln(double_line_delim)
1954             self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
1955             self.stream.writeln(single_line_delim)
1956             self.stream.writeln("%s" % err)
1957
1958
1959 class VppTestRunner(unittest.TextTestRunner):
1960     """
1961     A basic test runner implementation which prints results to standard error.
1962     """
1963
1964     @property
1965     def resultclass(self):
1966         """Class maintaining the results of the tests"""
1967         return VppTestResult
1968
1969     def __init__(
1970         self,
1971         keep_alive_pipe=None,
1972         descriptions=True,
1973         verbosity=1,
1974         result_pipe=None,
1975         failfast=False,
1976         buffer=False,
1977         resultclass=None,
1978         print_summary=True,
1979         **kwargs,
1980     ):
1981         # ignore stream setting here, use hard-coded stdout to be in sync
1982         # with prints from VppTestCase methods ...
1983         super(VppTestRunner, self).__init__(
1984             sys.stdout, descriptions, verbosity, failfast, buffer, resultclass, **kwargs
1985         )
1986         KeepAliveReporter.pipe = keep_alive_pipe
1987
1988         self.orig_stream = self.stream
1989         self.resultclass.test_framework_result_pipe = result_pipe
1990
1991         self.print_summary = print_summary
1992
1993     def _makeResult(self):
1994         return self.resultclass(self.stream, self.descriptions, self.verbosity, self)
1995
1996     def run(self, test):
1997         """
1998         Run the tests
1999
2000         :param test:
2001
2002         """
2003         faulthandler.enable()  # emit stack trace to stderr if killed by signal
2004
2005         result = super(VppTestRunner, self).run(test)
2006         if not self.print_summary:
2007             self.stream = self.orig_stream
2008             result.stream = self.orig_stream
2009         return result
2010
2011
2012 class Worker(Thread):
2013     def __init__(self, executable_args, logger, env=None, *args, **kwargs):
2014         super(Worker, self).__init__(*args, **kwargs)
2015         self.logger = logger
2016         self.args = executable_args
2017         if hasattr(self, "testcase") and self.testcase.debug_all:
2018             if self.testcase.debug_gdbserver:
2019                 self.args = [
2020                     "/usr/bin/gdbserver",
2021                     "localhost:{port}".format(port=self.testcase.gdbserver_port),
2022                 ] + args
2023             elif self.testcase.debug_gdb and hasattr(self, "wait_for_gdb"):
2024                 self.args.append(self.wait_for_gdb)
2025         self.app_bin = executable_args[0]
2026         self.app_name = os.path.basename(self.app_bin)
2027         if hasattr(self, "role"):
2028             self.app_name += " {role}".format(role=self.role)
2029         self.process = None
2030         self.result = None
2031         env = {} if env is None else env
2032         self.env = copy.deepcopy(env)
2033
2034     def wait_for_enter(self):
2035         if not hasattr(self, "testcase"):
2036             return
2037         if self.testcase.debug_all and self.testcase.debug_gdbserver:
2038             print()
2039             print(double_line_delim)
2040             print(
2041                 "Spawned GDB Server for '{app}' with PID: {pid}".format(
2042                     app=self.app_name, pid=self.process.pid
2043                 )
2044             )
2045         elif self.testcase.debug_all and self.testcase.debug_gdb:
2046             print()
2047             print(double_line_delim)
2048             print(
2049                 "Spawned '{app}' with PID: {pid}".format(
2050                     app=self.app_name, pid=self.process.pid
2051                 )
2052             )
2053         else:
2054             return
2055         print(single_line_delim)
2056         print("You can debug '{app}' using:".format(app=self.app_name))
2057         if self.testcase.debug_gdbserver:
2058             print(
2059                 "sudo gdb "
2060                 + self.app_bin
2061                 + " -ex 'target remote localhost:{port}'".format(
2062                     port=self.testcase.gdbserver_port
2063                 )
2064             )
2065             print(
2066                 "Now is the time to attach gdb by running the above "
2067                 "command, set up breakpoints etc., then resume from "
2068                 "within gdb by issuing the 'continue' command"
2069             )
2070             self.testcase.gdbserver_port += 1
2071         elif self.testcase.debug_gdb:
2072             print(
2073                 "sudo gdb "
2074                 + self.app_bin
2075                 + " -ex 'attach {pid}'".format(pid=self.process.pid)
2076             )
2077             print(
2078                 "Now is the time to attach gdb by running the above "
2079                 "command and set up breakpoints etc., then resume from"
2080                 " within gdb by issuing the 'continue' command"
2081             )
2082         print(single_line_delim)
2083         input("Press ENTER to continue running the testcase...")
2084
2085     def run(self):
2086         executable = self.args[0]
2087         if not os.path.exists(executable) or not os.access(
2088             executable, os.F_OK | os.X_OK
2089         ):
2090             # Exit code that means some system file did not exist,
2091             # could not be opened, or had some other kind of error.
2092             self.result = os.EX_OSFILE
2093             raise EnvironmentError(
2094                 "executable '%s' is not found or executable." % executable
2095             )
2096         self.logger.debug(
2097             "Running executable '{app}': '{cmd}'".format(
2098                 app=self.app_name, cmd=" ".join(self.args)
2099             )
2100         )
2101         env = os.environ.copy()
2102         env.update(self.env)
2103         env["CK_LOG_FILE_NAME"] = "-"
2104         self.process = subprocess.Popen(
2105             ["stdbuf", "-o0", "-e0"] + self.args,
2106             shell=False,
2107             env=env,
2108             preexec_fn=os.setpgrp,
2109             stdout=subprocess.PIPE,
2110             stderr=subprocess.PIPE,
2111         )
2112         self.wait_for_enter()
2113         out, err = self.process.communicate()
2114         self.logger.debug("Finished running `{app}'".format(app=self.app_name))
2115         self.logger.info("Return code is `%s'" % self.process.returncode)
2116         self.logger.info(single_line_delim)
2117         self.logger.info(
2118             "Executable `{app}' wrote to stdout:".format(app=self.app_name)
2119         )
2120         self.logger.info(single_line_delim)
2121         self.logger.info(out.decode("utf-8"))
2122         self.logger.info(single_line_delim)
2123         self.logger.info(
2124             "Executable `{app}' wrote to stderr:".format(app=self.app_name)
2125         )
2126         self.logger.info(single_line_delim)
2127         self.logger.info(err.decode("utf-8"))
2128         self.logger.info(single_line_delim)
2129         self.result = self.process.returncode
2130
2131
2132 if __name__ == "__main__":
2133     pass