f90aa43c9f2ce45d744362b4db107db9eaaffee1
[vpp.git] / test / framework.py
1 #!/usr/bin/env python3
2
3 from __future__ import print_function
4 import logging
5 import sys
6 import os
7 import select
8 import signal
9 import subprocess
10 import unittest
11 import re
12 import time
13 import faulthandler
14 import random
15 import copy
16 import platform
17 import shutil
18 from collections import deque
19 from threading import Thread, Event
20 from inspect import getdoc, isclass
21 from traceback import format_exception
22 from logging import FileHandler, DEBUG, Formatter
23 from enum import Enum
24 from abc import ABC, abstractmethod
25 from struct import pack, unpack
26
27 import scapy.compat
28 from scapy.packet import Raw, Packet
29 from config import config, available_cpus, num_cpus, max_vpp_cpus
30 import hook as hookmodule
31 from vpp_pg_interface import VppPGInterface
32 from vpp_sub_interface import VppSubInterface
33 from vpp_lo_interface import VppLoInterface
34 from vpp_bvi_interface import VppBviInterface
35 from vpp_papi_provider import VppPapiProvider
36 from vpp_papi import VppEnum
37 import vpp_papi
38 from vpp_papi.vpp_stats import VPPStats
39 from vpp_papi.vpp_transport_socket import VppTransportSocketIOError
40 from log import (
41     RED,
42     GREEN,
43     YELLOW,
44     double_line_delim,
45     single_line_delim,
46     get_logger,
47     colorize,
48 )
49 from vpp_object import VppObjectRegistry
50 from util import ppp, is_core_present
51 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
52 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
53 from scapy.layers.inet6 import ICMPv6EchoReply
54 from vpp_running import use_running
55
56
57 logger = logging.getLogger(__name__)
58
59 # Set up an empty logger for the testcase that can be overridden as necessary
60 null_logger = logging.getLogger("VppTestCase")
61 null_logger.addHandler(logging.NullHandler())
62
63 PASS = 0
64 FAIL = 1
65 ERROR = 2
66 SKIP = 3
67 TEST_RUN = 4
68 SKIP_CPU_SHORTAGE = 5
69
70
71 if config.debug_framework:
72     import debug_internal
73
74 """
75   Test framework module.
76
77   The module provides a set of tools for constructing and running tests and
78   representing the results.
79 """
80
81
82 class VppDiedError(Exception):
83     """exception for reporting that the subprocess has died."""
84
85     signals_by_value = {
86         v: k
87         for k, v in signal.__dict__.items()
88         if k.startswith("SIG") and not k.startswith("SIG_")
89     }
90
91     def __init__(self, rv=None, testcase=None, method_name=None):
92         self.rv = rv
93         self.signal_name = None
94         self.testcase = testcase
95         self.method_name = method_name
96
97         try:
98             self.signal_name = VppDiedError.signals_by_value[-rv]
99         except (KeyError, TypeError):
100             pass
101
102         if testcase is None and method_name is None:
103             in_msg = ""
104         else:
105             in_msg = " while running %s.%s" % (testcase, method_name)
106
107         if self.rv:
108             msg = "VPP subprocess died unexpectedly%s with return code: %d%s." % (
109                 in_msg,
110                 self.rv,
111                 " [%s]" % (self.signal_name if self.signal_name is not None else ""),
112             )
113         else:
114             msg = "VPP subprocess died unexpectedly%s." % in_msg
115
116         super(VppDiedError, self).__init__(msg)
117
118
119 class _PacketInfo(object):
120     """Private class to create packet info object.
121
122     Help process information about the next packet.
123     Set variables to default values.
124     """
125
126     #: Store the index of the packet.
127     index = -1
128     #: Store the index of the source packet generator interface of the packet.
129     src = -1
130     #: Store the index of the destination packet generator interface
131     #: of the packet.
132     dst = -1
133     #: Store expected ip version
134     ip = -1
135     #: Store expected upper protocol
136     proto = -1
137     #: Store the copy of the former packet.
138     data = None
139
140     def __eq__(self, other):
141         index = self.index == other.index
142         src = self.src == other.src
143         dst = self.dst == other.dst
144         data = self.data == other.data
145         return index and src and dst and data
146
147
148 def pump_output(testclass):
149     """pump output from vpp stdout/stderr to proper queues"""
150     if not hasattr(testclass, "vpp"):
151         return
152     stdout_fragment = ""
153     stderr_fragment = ""
154     while not testclass.pump_thread_stop_flag.is_set():
155         readable = select.select(
156             [
157                 testclass.vpp.stdout.fileno(),
158                 testclass.vpp.stderr.fileno(),
159                 testclass.pump_thread_wakeup_pipe[0],
160             ],
161             [],
162             [],
163         )[0]
164         if testclass.vpp.stdout.fileno() in readable:
165             read = os.read(testclass.vpp.stdout.fileno(), 102400)
166             if len(read) > 0:
167                 split = read.decode("ascii", errors="backslashreplace").splitlines(True)
168                 if len(stdout_fragment) > 0:
169                     split[0] = "%s%s" % (stdout_fragment, split[0])
170                 if len(split) > 0 and split[-1].endswith("\n"):
171                     limit = None
172                 else:
173                     limit = -1
174                     stdout_fragment = split[-1]
175                 testclass.vpp_stdout_deque.extend(split[:limit])
176                 if not config.cache_vpp_output:
177                     for line in split[:limit]:
178                         testclass.logger.info("VPP STDOUT: %s" % line.rstrip("\n"))
179         if testclass.vpp.stderr.fileno() in readable:
180             read = os.read(testclass.vpp.stderr.fileno(), 102400)
181             if len(read) > 0:
182                 split = read.decode("ascii", errors="backslashreplace").splitlines(True)
183                 if len(stderr_fragment) > 0:
184                     split[0] = "%s%s" % (stderr_fragment, split[0])
185                 if len(split) > 0 and split[-1].endswith("\n"):
186                     limit = None
187                 else:
188                     limit = -1
189                     stderr_fragment = split[-1]
190
191                 testclass.vpp_stderr_deque.extend(split[:limit])
192                 if not config.cache_vpp_output:
193                     for line in split[:limit]:
194                         testclass.logger.error("VPP STDERR: %s" % line.rstrip("\n"))
195                         # ignoring the dummy pipe here intentionally - the
196                         # flag will take care of properly terminating the loop
197
198
199 def _is_platform_aarch64():
200     return platform.machine() == "aarch64"
201
202
203 is_platform_aarch64 = _is_platform_aarch64()
204
205
206 def _is_distro_ubuntu2204():
207     with open("/etc/os-release") as f:
208         for line in f.readlines():
209             if "jammy" in line:
210                 return True
211     return False
212
213
214 is_distro_ubuntu2204 = _is_distro_ubuntu2204()
215
216
217 def _is_distro_debian11():
218     with open("/etc/os-release") as f:
219         for line in f.readlines():
220             if "bullseye" in line:
221                 return True
222     return False
223
224
225 is_distro_debian11 = _is_distro_debian11()
226
227
228 class KeepAliveReporter(object):
229     """
230     Singleton object which reports test start to parent process
231     """
232
233     _shared_state = {}
234
235     def __init__(self):
236         self.__dict__ = self._shared_state
237         self._pipe = None
238
239     @property
240     def pipe(self):
241         return self._pipe
242
243     @pipe.setter
244     def pipe(self, pipe):
245         if self._pipe is not None:
246             raise Exception("Internal error - pipe should only be set once.")
247         self._pipe = pipe
248
249     def send_keep_alive(self, test, desc=None):
250         """
251         Write current test tmpdir & desc to keep-alive pipe to signal liveness
252         """
253         if not hasattr(test, "vpp") or self.pipe is None:
254             # if not running forked..
255             return
256
257         if isclass(test):
258             desc = "%s (%s)" % (desc, unittest.util.strclass(test))
259         else:
260             desc = test.id()
261
262         self.pipe.send((desc, config.vpp, test.tempdir, test.vpp.pid))
263
264
265 class TestCaseTag(Enum):
266     # marks the suites that must run at the end
267     # using only a single test runner
268     RUN_SOLO = 1
269     # marks the suites broken on VPP multi-worker
270     FIXME_VPP_WORKERS = 2
271     # marks the suites broken when ASan is enabled
272     FIXME_ASAN = 3
273     # marks suites broken on Ubuntu-22.04
274     FIXME_UBUNTU2204 = 4
275     # marks suites broken on Debian-11
276     FIXME_DEBIAN11 = 5
277     # marks suites broken on debug vpp image
278     FIXME_VPP_DEBUG = 6
279
280
281 def create_tag_decorator(e):
282     def decorator(cls):
283         try:
284             cls.test_tags.append(e)
285         except AttributeError:
286             cls.test_tags = [e]
287         return cls
288
289     return decorator
290
291
292 tag_run_solo = create_tag_decorator(TestCaseTag.RUN_SOLO)
293 tag_fixme_vpp_workers = create_tag_decorator(TestCaseTag.FIXME_VPP_WORKERS)
294 tag_fixme_asan = create_tag_decorator(TestCaseTag.FIXME_ASAN)
295 tag_fixme_ubuntu2204 = create_tag_decorator(TestCaseTag.FIXME_UBUNTU2204)
296 tag_fixme_debian11 = create_tag_decorator(TestCaseTag.FIXME_DEBIAN11)
297 tag_fixme_vpp_debug = create_tag_decorator(TestCaseTag.FIXME_VPP_DEBUG)
298
299
300 class DummyVpp:
301     returncode = None
302     pid = 0xCAFEBAFE
303
304     def poll(self):
305         pass
306
307     def terminate(self):
308         pass
309
310
311 class CPUInterface(ABC):
312     cpus = []
313     skipped_due_to_cpu_lack = False
314
315     @classmethod
316     @abstractmethod
317     def get_cpus_required(cls):
318         pass
319
320     @classmethod
321     def assign_cpus(cls, cpus):
322         cls.cpus = cpus
323
324
325 @use_running
326 class VppTestCase(CPUInterface, unittest.TestCase):
327     """This subclass is a base class for VPP test cases that are implemented as
328     classes. It provides methods to create and run test case.
329     """
330
331     extra_vpp_statseg_config = ""
332     extra_vpp_punt_config = []
333     extra_vpp_plugin_config = []
334     logger = null_logger
335     vapi_response_timeout = 5
336     remove_configured_vpp_objects_on_tear_down = True
337
338     @property
339     def packet_infos(self):
340         """List of packet infos"""
341         return self._packet_infos
342
343     @classmethod
344     def get_packet_count_for_if_idx(cls, dst_if_index):
345         """Get the number of packet info for specified destination if index"""
346         if dst_if_index in cls._packet_count_for_dst_if_idx:
347             return cls._packet_count_for_dst_if_idx[dst_if_index]
348         else:
349             return 0
350
351     @classmethod
352     def has_tag(cls, tag):
353         """if the test case has a given tag - return true"""
354         try:
355             return tag in cls.test_tags
356         except AttributeError:
357             pass
358         return False
359
360     @classmethod
361     def is_tagged_run_solo(cls):
362         """if the test case class is timing-sensitive - return true"""
363         return cls.has_tag(TestCaseTag.RUN_SOLO)
364
365     @classmethod
366     def skip_fixme_asan(cls):
367         """if @tag_fixme_asan & ASan is enabled - mark for skip"""
368         if cls.has_tag(TestCaseTag.FIXME_ASAN):
369             vpp_extra_cmake_args = os.environ.get("VPP_EXTRA_CMAKE_ARGS", "")
370             if "DVPP_ENABLE_SANITIZE_ADDR=ON" in vpp_extra_cmake_args:
371                 cls = unittest.skip("Skipping @tag_fixme_asan tests")(cls)
372
373     @classmethod
374     def skip_fixme_ubuntu2204(cls):
375         """if distro is ubuntu 22.04 and @tag_fixme_ubuntu2204 mark for skip"""
376         if cls.has_tag(TestCaseTag.FIXME_UBUNTU2204):
377             cls = unittest.skip("Skipping @tag_fixme_ubuntu2204 tests")(cls)
378
379     @classmethod
380     def skip_fixme_debian11(cls):
381         """if distro is Debian-11 and @tag_fixme_debian11 mark for skip"""
382         if cls.has_tag(TestCaseTag.FIXME_DEBIAN11):
383             cls = unittest.skip("Skipping @tag_fixme_debian11 tests")(cls)
384
385     @classmethod
386     def skip_fixme_vpp_debug(cls):
387         cls = unittest.skip("Skipping @tag_fixme_vpp_debug tests")(cls)
388
389     @classmethod
390     def instance(cls):
391         """Return the instance of this testcase"""
392         return cls.test_instance
393
394     @classmethod
395     def set_debug_flags(cls, d):
396         cls.gdbserver_port = 7777
397         cls.debug_core = False
398         cls.debug_gdb = False
399         cls.debug_gdbserver = False
400         cls.debug_all = False
401         cls.debug_attach = False
402         if d is None:
403             return
404         dl = d.lower()
405         if dl == "core":
406             cls.debug_core = True
407         elif dl == "gdb" or dl == "gdb-all":
408             cls.debug_gdb = True
409         elif dl == "gdbserver" or dl == "gdbserver-all":
410             cls.debug_gdbserver = True
411         elif dl == "attach":
412             cls.debug_attach = True
413         else:
414             raise Exception("Unrecognized DEBUG option: '%s'" % d)
415         if dl == "gdb-all" or dl == "gdbserver-all":
416             cls.debug_all = True
417
418     @classmethod
419     def get_vpp_worker_count(cls):
420         if not hasattr(cls, "vpp_worker_count"):
421             if cls.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
422                 cls.vpp_worker_count = 0
423             else:
424                 cls.vpp_worker_count = config.vpp_worker_count
425         return cls.vpp_worker_count
426
427     @classmethod
428     def get_cpus_required(cls):
429         return 1 + cls.get_vpp_worker_count()
430
431     @classmethod
432     def setUpConstants(cls):
433         """Set-up the test case class based on environment variables"""
434         cls.step = config.step
435         cls.plugin_path = ":".join(config.vpp_plugin_dir)
436         cls.test_plugin_path = ":".join(config.vpp_test_plugin_dir)
437         cls.extern_plugin_path = ":".join(config.extern_plugin_dir)
438         debug_cli = ""
439         if cls.step or cls.debug_gdb or cls.debug_gdbserver:
440             debug_cli = "cli-listen localhost:5002"
441         size = re.search(r"\d+[gG]", config.coredump_size)
442         if size:
443             coredump_size = f"coredump-size {config.coredump_size}".lower()
444         else:
445             coredump_size = "coredump-size unlimited"
446         default_variant = config.variant
447         if default_variant is not None:
448             default_variant = "default { variant %s 100 }" % default_variant
449         else:
450             default_variant = ""
451
452         api_fuzzing = config.api_fuzz
453         if api_fuzzing is None:
454             api_fuzzing = "off"
455
456         cls.vpp_cmdline = [
457             config.vpp,
458             "unix",
459             "{",
460             "nodaemon",
461             debug_cli,
462             "full-coredump",
463             coredump_size,
464             "runtime-dir",
465             cls.tempdir,
466             "}",
467             "api-trace",
468             "{",
469             "on",
470             "}",
471             "api-segment",
472             "{",
473             "prefix",
474             cls.get_api_segment_prefix(),
475             "}",
476             "cpu",
477             "{",
478             "main-core",
479             str(cls.cpus[0]),
480         ]
481         if cls.extern_plugin_path not in (None, ""):
482             cls.extra_vpp_plugin_config.append("add-path %s" % cls.extern_plugin_path)
483         if cls.get_vpp_worker_count():
484             cls.vpp_cmdline.extend(
485                 ["corelist-workers", ",".join([str(x) for x in cls.cpus[1:]])]
486             )
487         cls.vpp_cmdline.extend(
488             [
489                 "}",
490                 "physmem",
491                 "{",
492                 "max-size",
493                 "32m",
494                 "}",
495                 "statseg",
496                 "{",
497                 "socket-name",
498                 cls.get_stats_sock_path(),
499                 cls.extra_vpp_statseg_config,
500                 "}",
501                 "socksvr",
502                 "{",
503                 "socket-name",
504                 cls.get_api_sock_path(),
505                 "}",
506                 "node { ",
507                 default_variant,
508                 "}",
509                 "api-fuzz {",
510                 api_fuzzing,
511                 "}",
512                 "plugins",
513                 "{",
514                 "plugin",
515                 "dpdk_plugin.so",
516                 "{",
517                 "disable",
518                 "}",
519                 "plugin",
520                 "rdma_plugin.so",
521                 "{",
522                 "disable",
523                 "}",
524                 "plugin",
525                 "lisp_unittest_plugin.so",
526                 "{",
527                 "enable",
528                 "}",
529                 "plugin",
530                 "unittest_plugin.so",
531                 "{",
532                 "enable",
533                 "}",
534             ]
535             + cls.extra_vpp_plugin_config
536             + [
537                 "}",
538             ]
539         )
540
541         if cls.extra_vpp_punt_config is not None:
542             cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
543
544         if not cls.debug_attach:
545             cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
546             cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
547
548     @classmethod
549     def wait_for_enter(cls):
550         if cls.debug_gdbserver:
551             print(double_line_delim)
552             print("Spawned GDB server with PID: %d" % cls.vpp.pid)
553         elif cls.debug_gdb:
554             print(double_line_delim)
555             print("Spawned VPP with PID: %d" % cls.vpp.pid)
556         else:
557             cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
558             return
559         print(single_line_delim)
560         print("You can debug VPP using:")
561         if cls.debug_gdbserver:
562             print(
563                 f"sudo gdb {config.vpp} "
564                 f"-ex 'target remote localhost:{cls.gdbserver_port}'"
565             )
566             print(
567                 "Now is the time to attach gdb by running the above "
568                 "command, set up breakpoints etc., then resume VPP from "
569                 "within gdb by issuing the 'continue' command"
570             )
571             cls.gdbserver_port += 1
572         elif cls.debug_gdb:
573             print(f"sudo gdb {config.vpp} -ex 'attach {cls.vpp.pid}'")
574             print(
575                 "Now is the time to attach gdb by running the above "
576                 "command and set up breakpoints etc., then resume VPP from"
577                 " within gdb by issuing the 'continue' command"
578             )
579         print(single_line_delim)
580         input("Press ENTER to continue running the testcase...")
581
582     @classmethod
583     def attach_vpp(cls):
584         cls.vpp = DummyVpp()
585
586     @classmethod
587     def run_vpp(cls):
588         if (
589             is_distro_ubuntu2204 == True and cls.has_tag(TestCaseTag.FIXME_UBUNTU2204)
590         ) or (is_distro_debian11 == True and cls.has_tag(TestCaseTag.FIXME_DEBIAN11)):
591             return
592         cls.logger.debug(f"Assigned cpus: {cls.cpus}")
593         cmdline = cls.vpp_cmdline
594
595         if cls.debug_gdbserver:
596             gdbserver = "/usr/bin/gdbserver"
597             if not os.path.isfile(gdbserver) or not os.access(gdbserver, os.X_OK):
598                 raise Exception(
599                     "gdbserver binary '%s' does not exist or is "
600                     "not executable" % gdbserver
601                 )
602
603             cmdline = [
604                 gdbserver,
605                 "localhost:{port}".format(port=cls.gdbserver_port),
606             ] + cls.vpp_cmdline
607             cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
608
609         try:
610             cls.vpp = subprocess.Popen(
611                 cmdline, stdout=subprocess.PIPE, stderr=subprocess.PIPE
612             )
613         except subprocess.CalledProcessError as e:
614             cls.logger.critical(
615                 "Subprocess returned with non-0 return code: (%s)", e.returncode
616             )
617             raise
618         except OSError as e:
619             cls.logger.critical(
620                 "Subprocess returned with OS error: (%s) %s", e.errno, e.strerror
621             )
622             raise
623         except Exception as e:
624             cls.logger.exception("Subprocess returned unexpected from %s:", cmdline)
625             raise
626
627         cls.wait_for_enter()
628
629     @classmethod
630     def wait_for_coredump(cls):
631         corefile = cls.tempdir + "/core"
632         if os.path.isfile(corefile):
633             cls.logger.error("Waiting for coredump to complete: %s", corefile)
634             curr_size = os.path.getsize(corefile)
635             deadline = time.time() + 60
636             ok = False
637             while time.time() < deadline:
638                 cls.sleep(1)
639                 size = curr_size
640                 curr_size = os.path.getsize(corefile)
641                 if size == curr_size:
642                     ok = True
643                     break
644             if not ok:
645                 cls.logger.error(
646                     "Timed out waiting for coredump to complete: %s", corefile
647                 )
648             else:
649                 cls.logger.error("Coredump complete: %s, size %d", corefile, curr_size)
650
651     @classmethod
652     def get_stats_sock_path(cls):
653         return "%s/stats.sock" % cls.tempdir
654
655     @classmethod
656     def get_api_sock_path(cls):
657         return "%s/api.sock" % cls.tempdir
658
659     @classmethod
660     def get_api_segment_prefix(cls):
661         return os.path.basename(cls.tempdir)  # Only used for VAPI
662
663     @classmethod
664     def get_tempdir(cls):
665         if cls.debug_attach:
666             tmpdir = f"{config.tmp_dir}/unittest-attach-gdb"
667         else:
668             tmpdir = f"{config.tmp_dir}/vpp-unittest-{cls.__name__}"
669             if config.wipe_tmp_dir:
670                 shutil.rmtree(tmpdir, ignore_errors=True)
671             os.mkdir(tmpdir)
672         return tmpdir
673
674     @classmethod
675     def create_file_handler(cls):
676         if config.log_dir is None:
677             cls.file_handler = FileHandler(f"{cls.tempdir}/log.txt")
678             return
679
680         logdir = f"{config.log_dir}/vpp-unittest-{cls.__name__}"
681         if config.wipe_tmp_dir:
682             shutil.rmtree(logdir, ignore_errors=True)
683         os.mkdir(logdir)
684         cls.file_handler = FileHandler(f"{logdir}/log.txt")
685
686     @classmethod
687     def setUpClass(cls):
688         """
689         Perform class setup before running the testcase
690         Remove shared memory files, start vpp and connect the vpp-api
691         """
692         super(VppTestCase, cls).setUpClass()
693         cls.logger = get_logger(cls.__name__)
694         random.seed(config.rnd_seed)
695         if hasattr(cls, "parallel_handler"):
696             cls.logger.addHandler(cls.parallel_handler)
697             cls.logger.propagate = False
698         cls.set_debug_flags(config.debug)
699         cls.tempdir = cls.get_tempdir()
700         cls.create_file_handler()
701         cls.file_handler.setFormatter(
702             Formatter(fmt="%(asctime)s,%(msecs)03d %(message)s", datefmt="%H:%M:%S")
703         )
704         cls.file_handler.setLevel(DEBUG)
705         cls.logger.addHandler(cls.file_handler)
706         cls.logger.debug("--- setUpClass() for %s called ---" % cls.__name__)
707         os.chdir(cls.tempdir)
708         cls.logger.info(
709             "Temporary dir is %s, api socket is %s",
710             cls.tempdir,
711             cls.get_api_sock_path(),
712         )
713         cls.logger.debug("Random seed is %s", config.rnd_seed)
714         cls.setUpConstants()
715         cls.reset_packet_infos()
716         cls._pcaps = []
717         cls._old_pcaps = []
718         cls.verbose = 0
719         cls.vpp_dead = False
720         cls.registry = VppObjectRegistry()
721         cls.vpp_startup_failed = False
722         cls.reporter = KeepAliveReporter()
723         # need to catch exceptions here because if we raise, then the cleanup
724         # doesn't get called and we might end with a zombie vpp
725         try:
726             if cls.debug_attach:
727                 cls.attach_vpp()
728             else:
729                 cls.run_vpp()
730                 if not hasattr(cls, "vpp"):
731                     return
732             cls.reporter.send_keep_alive(cls, "setUpClass")
733             VppTestResult.current_test_case_info = TestCaseInfo(
734                 cls.logger, cls.tempdir, cls.vpp.pid, config.vpp
735             )
736             cls.vpp_stdout_deque = deque()
737             cls.vpp_stderr_deque = deque()
738             # Pump thread in a non-debug-attached & not running-vpp
739             if not cls.debug_attach and not hasattr(cls, "running_vpp"):
740                 cls.pump_thread_stop_flag = Event()
741                 cls.pump_thread_wakeup_pipe = os.pipe()
742                 cls.pump_thread = Thread(target=pump_output, args=(cls,))
743                 cls.pump_thread.daemon = True
744                 cls.pump_thread.start()
745             if cls.debug_gdb or cls.debug_gdbserver or cls.debug_attach:
746                 cls.vapi_response_timeout = 0
747             cls.vapi = VppPapiProvider(cls.__name__, cls, cls.vapi_response_timeout)
748             if cls.step:
749                 hook = hookmodule.StepHook(cls)
750             else:
751                 hook = hookmodule.PollHook(cls)
752             cls.vapi.register_hook(hook)
753             cls.statistics = VPPStats(socketname=cls.get_stats_sock_path())
754             try:
755                 hook.poll_vpp()
756             except VppDiedError:
757                 cls.vpp_startup_failed = True
758                 cls.logger.critical(
759                     "VPP died shortly after startup, check the"
760                     " output to standard error for possible cause"
761                 )
762                 raise
763             try:
764                 cls.vapi.connect()
765             except (vpp_papi.VPPIOError, Exception) as e:
766                 cls.logger.debug("Exception connecting to vapi: %s" % e)
767                 cls.vapi.disconnect()
768
769                 if cls.debug_gdbserver:
770                     print(
771                         colorize(
772                             "You're running VPP inside gdbserver but "
773                             "VPP-API connection failed, did you forget "
774                             "to 'continue' VPP from within gdb?",
775                             RED,
776                         )
777                     )
778                 raise e
779             if cls.debug_attach:
780                 last_line = cls.vapi.cli("show thread").split("\n")[-2]
781                 cls.vpp_worker_count = int(last_line.split(" ")[0])
782                 print("Detected VPP with %s workers." % cls.vpp_worker_count)
783         except vpp_papi.VPPRuntimeError as e:
784             cls.logger.debug("%s" % e)
785             cls.quit()
786             raise e
787         except Exception as e:
788             cls.logger.debug("Exception connecting to VPP: %s" % e)
789             cls.quit()
790             raise e
791
792     @classmethod
793     def _debug_quit(cls):
794         if cls.debug_gdbserver or cls.debug_gdb:
795             try:
796                 cls.vpp.poll()
797
798                 if cls.vpp.returncode is None:
799                     print()
800                     print(double_line_delim)
801                     print("VPP or GDB server is still running")
802                     print(single_line_delim)
803                     input(
804                         "When done debugging, press ENTER to kill the "
805                         "process and finish running the testcase..."
806                     )
807             except AttributeError:
808                 pass
809
810     @classmethod
811     def quit(cls):
812         """
813         Disconnect vpp-api, kill vpp and cleanup shared memory files
814         """
815         cls._debug_quit()
816         if hasattr(cls, "running_vpp"):
817             cls.vpp.quit_vpp()
818
819         # first signal that we want to stop the pump thread, then wake it up
820         if hasattr(cls, "pump_thread_stop_flag"):
821             cls.pump_thread_stop_flag.set()
822         if hasattr(cls, "pump_thread_wakeup_pipe"):
823             os.write(cls.pump_thread_wakeup_pipe[1], b"ding dong wake up")
824         if hasattr(cls, "pump_thread"):
825             cls.logger.debug("Waiting for pump thread to stop")
826             cls.pump_thread.join()
827         if hasattr(cls, "vpp_stderr_reader_thread"):
828             cls.logger.debug("Waiting for stderr pump to stop")
829             cls.vpp_stderr_reader_thread.join()
830
831         if hasattr(cls, "vpp"):
832             if hasattr(cls, "vapi"):
833                 cls.logger.debug(cls.vapi.vpp.get_stats())
834                 cls.logger.debug("Disconnecting class vapi client on %s", cls.__name__)
835                 cls.vapi.disconnect()
836                 cls.logger.debug("Deleting class vapi attribute on %s", cls.__name__)
837                 del cls.vapi
838             cls.vpp.poll()
839             if not cls.debug_attach and cls.vpp.returncode is None:
840                 cls.wait_for_coredump()
841                 cls.logger.debug("Sending TERM to vpp")
842                 cls.vpp.terminate()
843                 cls.logger.debug("Waiting for vpp to die")
844                 try:
845                     outs, errs = cls.vpp.communicate(timeout=5)
846                 except subprocess.TimeoutExpired:
847                     cls.vpp.kill()
848                     outs, errs = cls.vpp.communicate()
849             cls.logger.debug("Deleting class vpp attribute on %s", cls.__name__)
850             if not cls.debug_attach and not hasattr(cls, "running_vpp"):
851                 cls.vpp.stdout.close()
852                 cls.vpp.stderr.close()
853             # If vpp is a dynamic attribute set by the func use_running,
854             # deletion will result in an AttributeError that we can
855             # safetly pass.
856             try:
857                 del cls.vpp
858             except AttributeError:
859                 pass
860
861         if cls.vpp_startup_failed:
862             stdout_log = cls.logger.info
863             stderr_log = cls.logger.critical
864         else:
865             stdout_log = cls.logger.info
866             stderr_log = cls.logger.info
867
868         if hasattr(cls, "vpp_stdout_deque"):
869             stdout_log(single_line_delim)
870             stdout_log("VPP output to stdout while running %s:", cls.__name__)
871             stdout_log(single_line_delim)
872             vpp_output = "".join(cls.vpp_stdout_deque)
873             with open(cls.tempdir + "/vpp_stdout.txt", "w") as f:
874                 f.write(vpp_output)
875             stdout_log("\n%s", vpp_output)
876             stdout_log(single_line_delim)
877
878         if hasattr(cls, "vpp_stderr_deque"):
879             stderr_log(single_line_delim)
880             stderr_log("VPP output to stderr while running %s:", cls.__name__)
881             stderr_log(single_line_delim)
882             vpp_output = "".join(cls.vpp_stderr_deque)
883             with open(cls.tempdir + "/vpp_stderr.txt", "w") as f:
884                 f.write(vpp_output)
885             stderr_log("\n%s", vpp_output)
886             stderr_log(single_line_delim)
887
888     @classmethod
889     def tearDownClass(cls):
890         """Perform final cleanup after running all tests in this test-case"""
891         cls.logger.debug("--- tearDownClass() for %s called ---" % cls.__name__)
892         if not hasattr(cls, "vpp"):
893             return
894         cls.reporter.send_keep_alive(cls, "tearDownClass")
895         cls.quit()
896         cls.file_handler.close()
897         cls.reset_packet_infos()
898         if config.debug_framework:
899             debug_internal.on_tear_down_class(cls)
900
901     def show_commands_at_teardown(self):
902         """Allow subclass specific teardown logging additions."""
903         self.logger.info("--- No test specific show commands provided. ---")
904
905     def tearDown(self):
906         """Show various debug prints after each test"""
907         self.logger.debug(
908             "--- tearDown() for %s.%s(%s) called ---"
909             % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
910         )
911         if not hasattr(self, "vpp"):
912             return
913
914         try:
915             if not self.vpp_dead:
916                 self.logger.debug(self.vapi.cli("show trace max 1000"))
917                 self.logger.info(self.vapi.ppcli("show interface"))
918                 self.logger.info(self.vapi.ppcli("show hardware"))
919                 self.logger.info(self.statistics.set_errors_str())
920                 self.logger.info(self.vapi.ppcli("show run"))
921                 self.logger.info(self.vapi.ppcli("show log"))
922                 self.logger.info(self.vapi.ppcli("show bihash"))
923                 self.logger.info("Logging testcase specific show commands.")
924                 self.show_commands_at_teardown()
925                 if self.remove_configured_vpp_objects_on_tear_down:
926                     self.registry.remove_vpp_config(self.logger)
927             # Save/Dump VPP api trace log
928             m = self._testMethodName
929             api_trace = "vpp_api_trace.%s.%d.log" % (m, self.vpp.pid)
930             tmp_api_trace = "/tmp/%s" % api_trace
931             vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
932             self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
933             self.logger.info("Moving %s to %s\n" % (tmp_api_trace, vpp_api_trace_log))
934             shutil.move(tmp_api_trace, vpp_api_trace_log)
935         except VppTransportSocketIOError:
936             self.logger.debug(
937                 "VppTransportSocketIOError: Vpp dead. Cannot log show commands."
938             )
939             self.vpp_dead = True
940         else:
941             self.registry.unregister_all(self.logger)
942
943     def setUp(self):
944         """Clear trace before running each test"""
945         super(VppTestCase, self).setUp()
946         if not hasattr(self, "vpp"):
947             return
948         self.reporter.send_keep_alive(self)
949         if self.vpp_dead:
950             raise VppDiedError(
951                 rv=None,
952                 testcase=self.__class__.__name__,
953                 method_name=self._testMethodName,
954             )
955         self.sleep(0.1, "during setUp")
956         self.vpp_stdout_deque.append(
957             "--- test setUp() for %s.%s(%s) starts here ---\n"
958             % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
959         )
960         self.vpp_stderr_deque.append(
961             "--- test setUp() for %s.%s(%s) starts here ---\n"
962             % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
963         )
964         self.vapi.cli("clear trace")
965         # store the test instance inside the test class - so that objects
966         # holding the class can access instance methods (like assertEqual)
967         type(self).test_instance = self
968
969     @classmethod
970     def pg_enable_capture(cls, interfaces=None):
971         """
972         Enable capture on packet-generator interfaces
973
974         :param interfaces: iterable interface indexes (if None,
975                            use self.pg_interfaces)
976
977         """
978         if interfaces is None:
979             interfaces = cls.pg_interfaces
980         for i in interfaces:
981             i.enable_capture()
982
983     @classmethod
984     def register_pcap(cls, intf, worker):
985         """Register a pcap in the testclass"""
986         # add to the list of captures with current timestamp
987         cls._pcaps.append((intf, worker))
988
989     @classmethod
990     def get_vpp_time(cls):
991         # processes e.g. "Time now 2.190522, Wed, 11 Mar 2020 17:29:54 GMT"
992         # returns float("2.190522")
993         timestr = cls.vapi.cli("show clock")
994         head, sep, tail = timestr.partition(",")
995         head, sep, tail = head.partition("Time now")
996         return float(tail)
997
998     @classmethod
999     def sleep_on_vpp_time(cls, sec):
1000         """Sleep according to time in VPP world"""
1001         # On a busy system with many processes
1002         # we might end up with VPP time being slower than real world
1003         # So take that into account when waiting for VPP to do something
1004         start_time = cls.get_vpp_time()
1005         while cls.get_vpp_time() - start_time < sec:
1006             cls.sleep(0.1)
1007
1008     @classmethod
1009     def pg_start(cls, trace=True):
1010         """Enable the PG, wait till it is done, then clean up"""
1011         for (intf, worker) in cls._old_pcaps:
1012             intf.handle_old_pcap_file(intf.get_in_path(worker), intf.in_history_counter)
1013         cls._old_pcaps = []
1014         if trace:
1015             cls.vapi.cli("clear trace")
1016             cls.vapi.cli("trace add pg-input 1000")
1017         cls.vapi.cli("packet-generator enable")
1018         # PG, when starts, runs to completion -
1019         # so let's avoid a race condition,
1020         # and wait a little till it's done.
1021         # Then clean it up  - and then be gone.
1022         deadline = time.time() + 300
1023         while cls.vapi.cli("show packet-generator").find("Yes") != -1:
1024             cls.sleep(0.01)  # yield
1025             if time.time() > deadline:
1026                 cls.logger.error("Timeout waiting for pg to stop")
1027                 break
1028         for intf, worker in cls._pcaps:
1029             cls.vapi.cli("packet-generator delete %s" % intf.get_cap_name(worker))
1030         cls._old_pcaps = cls._pcaps
1031         cls._pcaps = []
1032
1033     @classmethod
1034     def create_pg_interfaces_internal(cls, interfaces, gso=0, gso_size=0, mode=None):
1035         """
1036         Create packet-generator interfaces.
1037
1038         :param interfaces: iterable indexes of the interfaces.
1039         :returns: List of created interfaces.
1040
1041         """
1042         result = []
1043         for i in interfaces:
1044             intf = VppPGInterface(cls, i, gso, gso_size, mode)
1045             setattr(cls, intf.name, intf)
1046             result.append(intf)
1047         cls.pg_interfaces = result
1048         return result
1049
1050     @classmethod
1051     def create_pg_ip4_interfaces(cls, interfaces, gso=0, gso_size=0):
1052         if not hasattr(cls, "vpp"):
1053             cls.pg_interfaces = []
1054             return cls.pg_interfaces
1055         pgmode = VppEnum.vl_api_pg_interface_mode_t
1056         return cls.create_pg_interfaces_internal(
1057             interfaces, gso, gso_size, pgmode.PG_API_MODE_IP4
1058         )
1059
1060     @classmethod
1061     def create_pg_ip6_interfaces(cls, interfaces, gso=0, gso_size=0):
1062         if not hasattr(cls, "vpp"):
1063             cls.pg_interfaces = []
1064             return cls.pg_interfaces
1065         pgmode = VppEnum.vl_api_pg_interface_mode_t
1066         return cls.create_pg_interfaces_internal(
1067             interfaces, gso, gso_size, pgmode.PG_API_MODE_IP6
1068         )
1069
1070     @classmethod
1071     def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0):
1072         if not hasattr(cls, "vpp"):
1073             cls.pg_interfaces = []
1074             return cls.pg_interfaces
1075         pgmode = VppEnum.vl_api_pg_interface_mode_t
1076         return cls.create_pg_interfaces_internal(
1077             interfaces, gso, gso_size, pgmode.PG_API_MODE_ETHERNET
1078         )
1079
1080     @classmethod
1081     def create_pg_ethernet_interfaces(cls, interfaces, gso=0, gso_size=0):
1082         if not hasattr(cls, "vpp"):
1083             cls.pg_interfaces = []
1084             return cls.pg_interfaces
1085         pgmode = VppEnum.vl_api_pg_interface_mode_t
1086         return cls.create_pg_interfaces_internal(
1087             interfaces, gso, gso_size, pgmode.PG_API_MODE_ETHERNET
1088         )
1089
1090     @classmethod
1091     def create_loopback_interfaces(cls, count):
1092         """
1093         Create loopback interfaces.
1094
1095         :param count: number of interfaces created.
1096         :returns: List of created interfaces.
1097         """
1098         if not hasattr(cls, "vpp"):
1099             cls.lo_interfaces = []
1100             return cls.lo_interfaces
1101         result = [VppLoInterface(cls) for i in range(count)]
1102         for intf in result:
1103             setattr(cls, intf.name, intf)
1104         cls.lo_interfaces = result
1105         return result
1106
1107     @classmethod
1108     def create_bvi_interfaces(cls, count):
1109         """
1110         Create BVI interfaces.
1111
1112         :param count: number of interfaces created.
1113         :returns: List of created interfaces.
1114         """
1115         if not hasattr(cls, "vpp"):
1116             cls.bvi_interfaces = []
1117             return cls.bvi_interfaces
1118         result = [VppBviInterface(cls) for i in range(count)]
1119         for intf in result:
1120             setattr(cls, intf.name, intf)
1121         cls.bvi_interfaces = result
1122         return result
1123
1124     @staticmethod
1125     def extend_packet(packet, size, padding=" "):
1126         """
1127         Extend packet to given size by padding with spaces or custom padding
1128         NOTE: Currently works only when Raw layer is present.
1129
1130         :param packet: packet
1131         :param size: target size
1132         :param padding: padding used to extend the payload
1133
1134         """
1135         packet_len = len(packet) + 4
1136         extend = size - packet_len
1137         if extend > 0:
1138             num = (extend // len(padding)) + 1
1139             packet[Raw].load += (padding * num)[:extend].encode("ascii")
1140
1141     @classmethod
1142     def reset_packet_infos(cls):
1143         """Reset the list of packet info objects and packet counts to zero"""
1144         cls._packet_infos = {}
1145         cls._packet_count_for_dst_if_idx = {}
1146
1147     @classmethod
1148     def create_packet_info(cls, src_if, dst_if):
1149         """
1150         Create packet info object containing the source and destination indexes
1151         and add it to the testcase's packet info list
1152
1153         :param VppInterface src_if: source interface
1154         :param VppInterface dst_if: destination interface
1155
1156         :returns: _PacketInfo object
1157
1158         """
1159         info = _PacketInfo()
1160         info.index = len(cls._packet_infos)
1161         info.src = src_if.sw_if_index
1162         info.dst = dst_if.sw_if_index
1163         if isinstance(dst_if, VppSubInterface):
1164             dst_idx = dst_if.parent.sw_if_index
1165         else:
1166             dst_idx = dst_if.sw_if_index
1167         if dst_idx in cls._packet_count_for_dst_if_idx:
1168             cls._packet_count_for_dst_if_idx[dst_idx] += 1
1169         else:
1170             cls._packet_count_for_dst_if_idx[dst_idx] = 1
1171         cls._packet_infos[info.index] = info
1172         return info
1173
1174     @staticmethod
1175     def info_to_payload(info):
1176         """
1177         Convert _PacketInfo object to packet payload
1178
1179         :param info: _PacketInfo object
1180
1181         :returns: string containing serialized data from packet info
1182         """
1183
1184         # retrieve payload, currently 18 bytes (4 x ints + 1 short)
1185         return pack("iiiih", info.index, info.src, info.dst, info.ip, info.proto)
1186
1187     @staticmethod
1188     def payload_to_info(payload, payload_field="load"):
1189         """
1190         Convert packet payload to _PacketInfo object
1191
1192         :param payload: packet payload
1193         :type payload:  <class 'scapy.packet.Raw'>
1194         :param payload_field: packet fieldname of payload "load" for
1195                 <class 'scapy.packet.Raw'>
1196         :type payload_field: str
1197         :returns: _PacketInfo object containing de-serialized data from payload
1198
1199         """
1200
1201         # retrieve payload, currently 18 bytes (4 x ints + 1 short)
1202         payload_b = getattr(payload, payload_field)[:18]
1203
1204         info = _PacketInfo()
1205         info.index, info.src, info.dst, info.ip, info.proto = unpack("iiiih", payload_b)
1206
1207         # some SRv6 TCs depend on get an exception if bad values are detected
1208         if info.index > 0x4000:
1209             raise ValueError("Index value is invalid")
1210
1211         return info
1212
1213     def get_next_packet_info(self, info):
1214         """
1215         Iterate over the packet info list stored in the testcase
1216         Start iteration with first element if info is None
1217         Continue based on index in info if info is specified
1218
1219         :param info: info or None
1220         :returns: next info in list or None if no more infos
1221         """
1222         if info is None:
1223             next_index = 0
1224         else:
1225             next_index = info.index + 1
1226         if next_index == len(self._packet_infos):
1227             return None
1228         else:
1229             return self._packet_infos[next_index]
1230
1231     def get_next_packet_info_for_interface(self, src_index, info):
1232         """
1233         Search the packet info list for the next packet info with same source
1234         interface index
1235
1236         :param src_index: source interface index to search for
1237         :param info: packet info - where to start the search
1238         :returns: packet info or None
1239
1240         """
1241         while True:
1242             info = self.get_next_packet_info(info)
1243             if info is None:
1244                 return None
1245             if info.src == src_index:
1246                 return info
1247
1248     def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
1249         """
1250         Search the packet info list for the next packet info with same source
1251         and destination interface indexes
1252
1253         :param src_index: source interface index to search for
1254         :param dst_index: destination interface index to search for
1255         :param info: packet info - where to start the search
1256         :returns: packet info or None
1257
1258         """
1259         while True:
1260             info = self.get_next_packet_info_for_interface(src_index, info)
1261             if info is None:
1262                 return None
1263             if info.dst == dst_index:
1264                 return info
1265
1266     def assert_equal(self, real_value, expected_value, name_or_class=None):
1267         if name_or_class is None:
1268             self.assertEqual(real_value, expected_value)
1269             return
1270         try:
1271             msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
1272             msg = msg % (
1273                 getdoc(name_or_class).strip(),
1274                 real_value,
1275                 str(name_or_class(real_value)),
1276                 expected_value,
1277                 str(name_or_class(expected_value)),
1278             )
1279         except Exception:
1280             msg = "Invalid %s: %s does not match expected value %s" % (
1281                 name_or_class,
1282                 real_value,
1283                 expected_value,
1284             )
1285
1286         self.assertEqual(real_value, expected_value, msg)
1287
1288     def assert_in_range(self, real_value, expected_min, expected_max, name=None):
1289         if name is None:
1290             msg = None
1291         else:
1292             msg = "Invalid %s: %s out of range <%s,%s>" % (
1293                 name,
1294                 real_value,
1295                 expected_min,
1296                 expected_max,
1297             )
1298         self.assertTrue(expected_min <= real_value <= expected_max, msg)
1299
1300     def assert_packet_checksums_valid(self, packet, ignore_zero_udp_checksums=True):
1301         received = packet.__class__(scapy.compat.raw(packet))
1302         udp_layers = ["UDP", "UDPerror"]
1303         checksum_fields = ["cksum", "chksum"]
1304         checksums = []
1305         counter = 0
1306         temp = received.__class__(scapy.compat.raw(received))
1307         while True:
1308             layer = temp.getlayer(counter)
1309             if layer:
1310                 layer = layer.copy()
1311                 layer.remove_payload()
1312                 for cf in checksum_fields:
1313                     if hasattr(layer, cf):
1314                         if (
1315                             ignore_zero_udp_checksums
1316                             and 0 == getattr(layer, cf)
1317                             and layer.name in udp_layers
1318                         ):
1319                             continue
1320                         delattr(temp.getlayer(counter), cf)
1321                         checksums.append((counter, cf))
1322             else:
1323                 break
1324             counter = counter + 1
1325         if 0 == len(checksums):
1326             return
1327         temp = temp.__class__(scapy.compat.raw(temp))
1328         for layer, cf in reversed(checksums):
1329             calc_sum = getattr(temp[layer], cf)
1330             self.assert_equal(
1331                 getattr(received[layer], cf),
1332                 calc_sum,
1333                 "packet checksum on layer #%d: %s" % (layer, temp[layer].name),
1334             )
1335             self.logger.debug(
1336                 "Checksum field `%s` on `%s` layer has correct value `%s`"
1337                 % (cf, temp[layer].name, calc_sum)
1338             )
1339
1340     def assert_checksum_valid(
1341         self,
1342         received_packet,
1343         layer,
1344         checksum_field_names=["chksum", "cksum"],
1345         ignore_zero_checksum=False,
1346     ):
1347         """Check checksum of received packet on given layer"""
1348         layer_copy = received_packet[layer].copy()
1349         layer_copy.remove_payload()
1350         field_name = None
1351         for f in checksum_field_names:
1352             if hasattr(layer_copy, f):
1353                 field_name = f
1354                 break
1355         if field_name is None:
1356             raise Exception(
1357                 f"Layer `{layer}` has none of checksum fields: `{checksum_field_names}`."
1358             )
1359         received_packet_checksum = getattr(received_packet[layer], field_name)
1360         if ignore_zero_checksum and 0 == received_packet_checksum:
1361             return
1362         recalculated = received_packet.__class__(scapy.compat.raw(received_packet))
1363         delattr(recalculated[layer], field_name)
1364         recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
1365         self.assert_equal(
1366             received_packet_checksum,
1367             getattr(recalculated[layer], field_name),
1368             f"packet checksum (field: {field_name}) on layer: %s" % layer,
1369         )
1370
1371     def assert_ip_checksum_valid(self, received_packet, ignore_zero_checksum=False):
1372         self.assert_checksum_valid(
1373             received_packet, "IP", ignore_zero_checksum=ignore_zero_checksum
1374         )
1375
1376     def assert_tcp_checksum_valid(self, received_packet, ignore_zero_checksum=False):
1377         self.assert_checksum_valid(
1378             received_packet, "TCP", ignore_zero_checksum=ignore_zero_checksum
1379         )
1380
1381     def assert_udp_checksum_valid(self, received_packet, ignore_zero_checksum=True):
1382         self.assert_checksum_valid(
1383             received_packet, "UDP", ignore_zero_checksum=ignore_zero_checksum
1384         )
1385
1386     def assert_embedded_icmp_checksum_valid(self, received_packet):
1387         if received_packet.haslayer(IPerror):
1388             self.assert_checksum_valid(received_packet, "IPerror")
1389         if received_packet.haslayer(TCPerror):
1390             self.assert_checksum_valid(received_packet, "TCPerror")
1391         if received_packet.haslayer(UDPerror):
1392             self.assert_checksum_valid(
1393                 received_packet, "UDPerror", ignore_zero_checksum=True
1394             )
1395         if received_packet.haslayer(ICMPerror):
1396             self.assert_checksum_valid(received_packet, "ICMPerror")
1397
1398     def assert_icmp_checksum_valid(self, received_packet):
1399         self.assert_checksum_valid(received_packet, "ICMP")
1400         self.assert_embedded_icmp_checksum_valid(received_packet)
1401
1402     def assert_icmpv6_checksum_valid(self, pkt):
1403         if pkt.haslayer(ICMPv6DestUnreach):
1404             self.assert_checksum_valid(pkt, "ICMPv6DestUnreach")
1405             self.assert_embedded_icmp_checksum_valid(pkt)
1406         if pkt.haslayer(ICMPv6EchoRequest):
1407             self.assert_checksum_valid(pkt, "ICMPv6EchoRequest")
1408         if pkt.haslayer(ICMPv6EchoReply):
1409             self.assert_checksum_valid(pkt, "ICMPv6EchoReply")
1410
1411     def get_counter(self, counter):
1412         if counter.startswith("/"):
1413             counter_value = self.statistics.get_counter(counter)
1414         else:
1415             counters = self.vapi.cli("sh errors").split("\n")
1416             counter_value = 0
1417             for i in range(1, len(counters) - 1):
1418                 results = counters[i].split()
1419                 if results[1] == counter:
1420                     counter_value = int(results[0])
1421                     break
1422         return counter_value
1423
1424     def assert_counter_equal(self, counter, expected_value, thread=None, index=0):
1425         c = self.get_counter(counter)
1426         if thread is not None:
1427             c = c[thread][index]
1428         else:
1429             c = sum(x[index] for x in c)
1430         self.assert_equal(c, expected_value, "counter `%s'" % counter)
1431
1432     def assert_packet_counter_equal(self, counter, expected_value):
1433         counter_value = self.get_counter(counter)
1434         self.assert_equal(
1435             counter_value, expected_value, "packet counter `%s'" % counter
1436         )
1437
1438     def assert_error_counter_equal(self, counter, expected_value):
1439         counter_value = self.statistics[counter].sum()
1440         self.assert_equal(counter_value, expected_value, "error counter `%s'" % counter)
1441
1442     @classmethod
1443     def sleep(cls, timeout, remark=None):
1444
1445         # /* Allow sleep(0) to maintain win32 semantics, and as decreed
1446         #  * by Guido, only the main thread can be interrupted.
1447         # */
1448         # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892  # noqa
1449         if timeout == 0:
1450             # yield quantum
1451             if hasattr(os, "sched_yield"):
1452                 os.sched_yield()
1453             else:
1454                 time.sleep(0)
1455             return
1456
1457         cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1458         before = time.time()
1459         time.sleep(timeout)
1460         after = time.time()
1461         if after - before > 2 * timeout:
1462             cls.logger.error(
1463                 "unexpected self.sleep() result - slept for %es instead of ~%es!",
1464                 after - before,
1465                 timeout,
1466             )
1467
1468         cls.logger.debug(
1469             "Finished sleep (%s) - slept %es (wanted %es)",
1470             remark,
1471             after - before,
1472             timeout,
1473         )
1474
1475     def virtual_sleep(self, timeout, remark=None):
1476         self.logger.debug("Moving VPP time by %s (%s)", timeout, remark)
1477         self.vapi.cli("set clock adjust %s" % timeout)
1478
1479     def pg_send(self, intf, pkts, worker=None, trace=True):
1480         intf.add_stream(pkts, worker=worker)
1481         self.pg_enable_capture(self.pg_interfaces)
1482         self.pg_start(trace=trace)
1483
1484     def snapshot_stats(self, stats_diff):
1485         """Return snapshot of interesting stats based on diff dictionary."""
1486         stats_snapshot = {}
1487         for sw_if_index in stats_diff:
1488             for counter in stats_diff[sw_if_index]:
1489                 stats_snapshot[counter] = self.statistics[counter]
1490         self.logger.debug(f"Took statistics stats_snapshot: {stats_snapshot}")
1491         return stats_snapshot
1492
1493     def compare_stats_with_snapshot(self, stats_diff, stats_snapshot):
1494         """Assert appropriate difference between current stats and snapshot."""
1495         for sw_if_index in stats_diff:
1496             for cntr, diff in stats_diff[sw_if_index].items():
1497                 if sw_if_index == "err":
1498                     self.assert_equal(
1499                         self.statistics[cntr].sum(),
1500                         stats_snapshot[cntr].sum() + diff,
1501                         f"'{cntr}' counter value (previous value: "
1502                         f"{stats_snapshot[cntr].sum()}, "
1503                         f"expected diff: {diff})",
1504                     )
1505                 else:
1506                     try:
1507                         self.assert_equal(
1508                             self.statistics[cntr][:, sw_if_index].sum(),
1509                             stats_snapshot[cntr][:, sw_if_index].sum() + diff,
1510                             f"'{cntr}' counter value (previous value: "
1511                             f"{stats_snapshot[cntr][:, sw_if_index].sum()}, "
1512                             f"expected diff: {diff})",
1513                         )
1514                     except IndexError:
1515                         # if diff is 0, then this most probably a case where
1516                         # test declares multiple interfaces but traffic hasn't
1517                         # passed through this one yet - which means the counter
1518                         # value is 0 and can be ignored
1519                         if 0 != diff:
1520                             raise
1521
1522     def send_and_assert_no_replies(
1523         self, intf, pkts, remark="", timeout=None, stats_diff=None, trace=True, msg=None
1524     ):
1525         if stats_diff:
1526             stats_snapshot = self.snapshot_stats(stats_diff)
1527
1528         self.pg_send(intf, pkts)
1529
1530         try:
1531             if not timeout:
1532                 timeout = 1
1533             for i in self.pg_interfaces:
1534                 i.assert_nothing_captured(timeout=timeout, remark=remark)
1535                 timeout = 0.1
1536         finally:
1537             if trace:
1538                 if msg:
1539                     self.logger.debug(f"send_and_assert_no_replies: {msg}")
1540                 self.logger.debug(self.vapi.cli("show trace"))
1541
1542         if stats_diff:
1543             self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1544
1545     def send_and_expect(
1546         self,
1547         intf,
1548         pkts,
1549         output,
1550         n_rx=None,
1551         worker=None,
1552         trace=True,
1553         msg=None,
1554         stats_diff=None,
1555     ):
1556         if stats_diff:
1557             stats_snapshot = self.snapshot_stats(stats_diff)
1558
1559         if not n_rx:
1560             n_rx = 1 if isinstance(pkts, Packet) else len(pkts)
1561         self.pg_send(intf, pkts, worker=worker, trace=trace)
1562         rx = output.get_capture(n_rx)
1563         if trace:
1564             if msg:
1565                 self.logger.debug(f"send_and_expect: {msg}")
1566             self.logger.debug(self.vapi.cli("show trace"))
1567
1568         if stats_diff:
1569             self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1570
1571         return rx
1572
1573     def send_and_expect_load_balancing(
1574         self, input, pkts, outputs, worker=None, trace=True
1575     ):
1576         self.pg_send(input, pkts, worker=worker, trace=trace)
1577         rxs = []
1578         for oo in outputs:
1579             rx = oo._get_capture(1)
1580             self.assertNotEqual(0, len(rx))
1581             rxs.append(rx)
1582         if trace:
1583             self.logger.debug(self.vapi.cli("show trace"))
1584         return rxs
1585
1586     def send_and_expect_some(self, intf, pkts, output, worker=None, trace=True):
1587         self.pg_send(intf, pkts, worker=worker, trace=trace)
1588         rx = output._get_capture(1)
1589         if trace:
1590             self.logger.debug(self.vapi.cli("show trace"))
1591         self.assertTrue(len(rx) > 0)
1592         self.assertTrue(len(rx) < len(pkts))
1593         return rx
1594
1595     def send_and_expect_only(self, intf, pkts, output, timeout=None, stats_diff=None):
1596         if stats_diff:
1597             stats_snapshot = self.snapshot_stats(stats_diff)
1598
1599         self.pg_send(intf, pkts)
1600         rx = output.get_capture(len(pkts))
1601         outputs = [output]
1602         if not timeout:
1603             timeout = 1
1604         for i in self.pg_interfaces:
1605             if i not in outputs:
1606                 i.assert_nothing_captured(timeout=timeout)
1607                 timeout = 0.1
1608
1609         if stats_diff:
1610             self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1611
1612         return rx
1613
1614
1615 def get_testcase_doc_name(test):
1616     return getdoc(test.__class__).splitlines()[0]
1617
1618
1619 def get_test_description(descriptions, test):
1620     short_description = test.shortDescription()
1621     if descriptions and short_description:
1622         return short_description
1623     else:
1624         return str(test)
1625
1626
1627 class TestCaseInfo(object):
1628     def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1629         self.logger = logger
1630         self.tempdir = tempdir
1631         self.vpp_pid = vpp_pid
1632         self.vpp_bin_path = vpp_bin_path
1633         self.core_crash_test = None
1634
1635
1636 class VppTestResult(unittest.TestResult):
1637     """
1638     @property result_string
1639      String variable to store the test case result string.
1640     @property errors
1641      List variable containing 2-tuples of TestCase instances and strings
1642      holding formatted tracebacks. Each tuple represents a test which
1643      raised an unexpected exception.
1644     @property failures
1645      List variable containing 2-tuples of TestCase instances and strings
1646      holding formatted tracebacks. Each tuple represents a test where
1647      a failure was explicitly signalled using the TestCase.assert*()
1648      methods.
1649     """
1650
1651     failed_test_cases_info = set()
1652     core_crash_test_cases_info = set()
1653     current_test_case_info = None
1654
1655     def __init__(self, stream=None, descriptions=None, verbosity=None, runner=None):
1656         """
1657         :param stream File descriptor to store where to report test results.
1658             Set to the standard error stream by default.
1659         :param descriptions Boolean variable to store information if to use
1660             test case descriptions.
1661         :param verbosity Integer variable to store required verbosity level.
1662         """
1663         super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1664         self.stream = stream
1665         self.descriptions = descriptions
1666         self.verbosity = verbosity
1667         self.result_string = None
1668         self.runner = runner
1669         self.printed = []
1670
1671     def addSuccess(self, test):
1672         """
1673         Record a test succeeded result
1674
1675         :param test:
1676
1677         """
1678         if self.current_test_case_info:
1679             self.current_test_case_info.logger.debug(
1680                 "--- addSuccess() %s.%s(%s) called"
1681                 % (test.__class__.__name__, test._testMethodName, test._testMethodDoc)
1682             )
1683         unittest.TestResult.addSuccess(self, test)
1684         self.result_string = colorize("OK", GREEN)
1685
1686         self.send_result_through_pipe(test, PASS)
1687
1688     def addSkip(self, test, reason):
1689         """
1690         Record a test skipped.
1691
1692         :param test:
1693         :param reason:
1694
1695         """
1696         if self.current_test_case_info:
1697             self.current_test_case_info.logger.debug(
1698                 "--- addSkip() %s.%s(%s) called, reason is %s"
1699                 % (
1700                     test.__class__.__name__,
1701                     test._testMethodName,
1702                     test._testMethodDoc,
1703                     reason,
1704                 )
1705             )
1706         unittest.TestResult.addSkip(self, test, reason)
1707         self.result_string = colorize("SKIP", YELLOW)
1708
1709         if reason == "not enough cpus":
1710             self.send_result_through_pipe(test, SKIP_CPU_SHORTAGE)
1711         else:
1712             self.send_result_through_pipe(test, SKIP)
1713
1714     def symlink_failed(self):
1715         if self.current_test_case_info:
1716             try:
1717                 failed_dir = config.failed_dir
1718                 link_path = os.path.join(
1719                     failed_dir,
1720                     "%s-FAILED" % os.path.basename(self.current_test_case_info.tempdir),
1721                 )
1722
1723                 self.current_test_case_info.logger.debug(
1724                     "creating a link to the failed test"
1725                 )
1726                 self.current_test_case_info.logger.debug(
1727                     "os.symlink(%s, %s)"
1728                     % (self.current_test_case_info.tempdir, link_path)
1729                 )
1730                 if os.path.exists(link_path):
1731                     self.current_test_case_info.logger.debug("symlink already exists")
1732                 else:
1733                     os.symlink(self.current_test_case_info.tempdir, link_path)
1734
1735             except Exception as e:
1736                 self.current_test_case_info.logger.error(e)
1737
1738     def send_result_through_pipe(self, test, result):
1739         if hasattr(self, "test_framework_result_pipe"):
1740             pipe = self.test_framework_result_pipe
1741             if pipe:
1742                 pipe.send((test.id(), result))
1743
1744     def log_error(self, test, err, fn_name):
1745         if self.current_test_case_info:
1746             if isinstance(test, unittest.suite._ErrorHolder):
1747                 test_name = test.description
1748             else:
1749                 test_name = "%s.%s(%s)" % (
1750                     test.__class__.__name__,
1751                     test._testMethodName,
1752                     test._testMethodDoc,
1753                 )
1754             self.current_test_case_info.logger.debug(
1755                 "--- %s() %s called, err is %s" % (fn_name, test_name, err)
1756             )
1757             self.current_test_case_info.logger.debug(
1758                 "formatted exception is:\n%s" % "".join(format_exception(*err))
1759             )
1760
1761     def add_error(self, test, err, unittest_fn, error_type):
1762         if error_type == FAIL:
1763             self.log_error(test, err, "addFailure")
1764             error_type_str = colorize("FAIL", RED)
1765         elif error_type == ERROR:
1766             self.log_error(test, err, "addError")
1767             error_type_str = colorize("ERROR", RED)
1768         else:
1769             raise Exception(
1770                 "Error type %s cannot be used to record an "
1771                 "error or a failure" % error_type
1772             )
1773
1774         unittest_fn(self, test, err)
1775         if self.current_test_case_info:
1776             self.result_string = "%s [ temp dir used by test case: %s ]" % (
1777                 error_type_str,
1778                 self.current_test_case_info.tempdir,
1779             )
1780             self.symlink_failed()
1781             self.failed_test_cases_info.add(self.current_test_case_info)
1782             if is_core_present(self.current_test_case_info.tempdir):
1783                 if not self.current_test_case_info.core_crash_test:
1784                     if isinstance(test, unittest.suite._ErrorHolder):
1785                         test_name = str(test)
1786                     else:
1787                         test_name = "'{!s}' ({!s})".format(
1788                             get_testcase_doc_name(test), test.id()
1789                         )
1790                     self.current_test_case_info.core_crash_test = test_name
1791                 self.core_crash_test_cases_info.add(self.current_test_case_info)
1792         else:
1793             self.result_string = "%s [no temp dir]" % error_type_str
1794
1795         self.send_result_through_pipe(test, error_type)
1796
1797     def addFailure(self, test, err):
1798         """
1799         Record a test failed result
1800
1801         :param test:
1802         :param err: error message
1803
1804         """
1805         self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1806
1807     def addError(self, test, err):
1808         """
1809         Record a test error result
1810
1811         :param test:
1812         :param err: error message
1813
1814         """
1815         self.add_error(test, err, unittest.TestResult.addError, ERROR)
1816
1817     def getDescription(self, test):
1818         """
1819         Get test description
1820
1821         :param test:
1822         :returns: test description
1823
1824         """
1825         return get_test_description(self.descriptions, test)
1826
1827     def startTest(self, test):
1828         """
1829         Start a test
1830
1831         :param test:
1832
1833         """
1834
1835         def print_header(test):
1836             if test.__class__ in self.printed:
1837                 return
1838
1839             test_doc = getdoc(test)
1840             if not test_doc:
1841                 raise Exception("No doc string for test '%s'" % test.id())
1842
1843             test_title = test_doc.splitlines()[0].rstrip()
1844             test_title = colorize(test_title, GREEN)
1845             if test.is_tagged_run_solo():
1846                 test_title = colorize(f"SOLO RUN: {test_title}", YELLOW)
1847
1848             # This block may overwrite the colorized title above,
1849             # but we want this to stand out and be fixed
1850             if test.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
1851                 test_title = colorize(f"FIXME with VPP workers: {test_title}", RED)
1852
1853             if test.has_tag(TestCaseTag.FIXME_ASAN):
1854                 test_title = colorize(f"FIXME with ASAN: {test_title}", RED)
1855                 test.skip_fixme_asan()
1856
1857             if is_distro_ubuntu2204 == True and test.has_tag(
1858                 TestCaseTag.FIXME_UBUNTU2204
1859             ):
1860                 test_title = colorize(f"FIXME on Ubuntu-22.04: {test_title}", RED)
1861                 test.skip_fixme_ubuntu2204()
1862
1863             if is_distro_debian11 == True and test.has_tag(TestCaseTag.FIXME_DEBIAN11):
1864                 test_title = colorize(f"FIXME on Debian-11: {test_title}", RED)
1865                 test.skip_fixme_debian11()
1866
1867             if "debug" in config.vpp_tag and test.has_tag(TestCaseTag.FIXME_VPP_DEBUG):
1868                 test_title = colorize(f"FIXME on VPP Debug: {test_title}", RED)
1869                 test.skip_fixme_vpp_debug()
1870
1871             if hasattr(test, "vpp_worker_count"):
1872                 if test.vpp_worker_count == 0:
1873                     test_title += " [main thread only]"
1874                 elif test.vpp_worker_count == 1:
1875                     test_title += " [1 worker thread]"
1876                 else:
1877                     test_title += f" [{test.vpp_worker_count} worker threads]"
1878
1879             if test.__class__.skipped_due_to_cpu_lack:
1880                 test_title = colorize(
1881                     f"{test_title} [skipped - not enough cpus, "
1882                     f"required={test.__class__.get_cpus_required()}, "
1883                     f"available={max_vpp_cpus}]",
1884                     YELLOW,
1885                 )
1886
1887             print(double_line_delim)
1888             print(test_title)
1889             print(double_line_delim)
1890             self.printed.append(test.__class__)
1891
1892         print_header(test)
1893         self.start_test = time.time()
1894         unittest.TestResult.startTest(self, test)
1895         if self.verbosity > 0:
1896             self.stream.writeln("Starting " + self.getDescription(test) + " ...")
1897             self.stream.writeln(single_line_delim)
1898
1899     def stopTest(self, test):
1900         """
1901         Called when the given test has been run
1902
1903         :param test:
1904
1905         """
1906         unittest.TestResult.stopTest(self, test)
1907
1908         if self.verbosity > 0:
1909             self.stream.writeln(single_line_delim)
1910             self.stream.writeln(
1911                 "%-73s%s" % (self.getDescription(test), self.result_string)
1912             )
1913             self.stream.writeln(single_line_delim)
1914         else:
1915             self.stream.writeln(
1916                 "%-68s %4.2f %s"
1917                 % (
1918                     self.getDescription(test),
1919                     time.time() - self.start_test,
1920                     self.result_string,
1921                 )
1922             )
1923
1924         self.send_result_through_pipe(test, TEST_RUN)
1925
1926     def printErrors(self):
1927         """
1928         Print errors from running the test case
1929         """
1930         if len(self.errors) > 0 or len(self.failures) > 0:
1931             self.stream.writeln()
1932             self.printErrorList("ERROR", self.errors)
1933             self.printErrorList("FAIL", self.failures)
1934
1935         # ^^ that is the last output from unittest before summary
1936         if not self.runner.print_summary:
1937             devnull = unittest.runner._WritelnDecorator(open(os.devnull, "w"))
1938             self.stream = devnull
1939             self.runner.stream = devnull
1940
1941     def printErrorList(self, flavour, errors):
1942         """
1943         Print error list to the output stream together with error type
1944         and test case description.
1945
1946         :param flavour: error type
1947         :param errors: iterable errors
1948
1949         """
1950         for test, err in errors:
1951             self.stream.writeln(double_line_delim)
1952             self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
1953             self.stream.writeln(single_line_delim)
1954             self.stream.writeln("%s" % err)
1955
1956
1957 class VppTestRunner(unittest.TextTestRunner):
1958     """
1959     A basic test runner implementation which prints results to standard error.
1960     """
1961
1962     @property
1963     def resultclass(self):
1964         """Class maintaining the results of the tests"""
1965         return VppTestResult
1966
1967     def __init__(
1968         self,
1969         keep_alive_pipe=None,
1970         descriptions=True,
1971         verbosity=1,
1972         result_pipe=None,
1973         failfast=False,
1974         buffer=False,
1975         resultclass=None,
1976         print_summary=True,
1977         **kwargs,
1978     ):
1979         # ignore stream setting here, use hard-coded stdout to be in sync
1980         # with prints from VppTestCase methods ...
1981         super(VppTestRunner, self).__init__(
1982             sys.stdout, descriptions, verbosity, failfast, buffer, resultclass, **kwargs
1983         )
1984         KeepAliveReporter.pipe = keep_alive_pipe
1985
1986         self.orig_stream = self.stream
1987         self.resultclass.test_framework_result_pipe = result_pipe
1988
1989         self.print_summary = print_summary
1990
1991     def _makeResult(self):
1992         return self.resultclass(self.stream, self.descriptions, self.verbosity, self)
1993
1994     def run(self, test):
1995         """
1996         Run the tests
1997
1998         :param test:
1999
2000         """
2001         faulthandler.enable()  # emit stack trace to stderr if killed by signal
2002
2003         result = super(VppTestRunner, self).run(test)
2004         if not self.print_summary:
2005             self.stream = self.orig_stream
2006             result.stream = self.orig_stream
2007         return result
2008
2009
2010 class Worker(Thread):
2011     def __init__(self, executable_args, logger, env=None, *args, **kwargs):
2012         super(Worker, self).__init__(*args, **kwargs)
2013         self.logger = logger
2014         self.args = executable_args
2015         if hasattr(self, "testcase") and self.testcase.debug_all:
2016             if self.testcase.debug_gdbserver:
2017                 self.args = [
2018                     "/usr/bin/gdbserver",
2019                     "localhost:{port}".format(port=self.testcase.gdbserver_port),
2020                 ] + args
2021             elif self.testcase.debug_gdb and hasattr(self, "wait_for_gdb"):
2022                 self.args.append(self.wait_for_gdb)
2023         self.app_bin = executable_args[0]
2024         self.app_name = os.path.basename(self.app_bin)
2025         if hasattr(self, "role"):
2026             self.app_name += " {role}".format(role=self.role)
2027         self.process = None
2028         self.result = None
2029         env = {} if env is None else env
2030         self.env = copy.deepcopy(env)
2031
2032     def wait_for_enter(self):
2033         if not hasattr(self, "testcase"):
2034             return
2035         if self.testcase.debug_all and self.testcase.debug_gdbserver:
2036             print()
2037             print(double_line_delim)
2038             print(
2039                 "Spawned GDB Server for '{app}' with PID: {pid}".format(
2040                     app=self.app_name, pid=self.process.pid
2041                 )
2042             )
2043         elif self.testcase.debug_all and self.testcase.debug_gdb:
2044             print()
2045             print(double_line_delim)
2046             print(
2047                 "Spawned '{app}' with PID: {pid}".format(
2048                     app=self.app_name, pid=self.process.pid
2049                 )
2050             )
2051         else:
2052             return
2053         print(single_line_delim)
2054         print("You can debug '{app}' using:".format(app=self.app_name))
2055         if self.testcase.debug_gdbserver:
2056             print(
2057                 "sudo gdb "
2058                 + self.app_bin
2059                 + " -ex 'target remote localhost:{port}'".format(
2060                     port=self.testcase.gdbserver_port
2061                 )
2062             )
2063             print(
2064                 "Now is the time to attach gdb by running the above "
2065                 "command, set up breakpoints etc., then resume from "
2066                 "within gdb by issuing the 'continue' command"
2067             )
2068             self.testcase.gdbserver_port += 1
2069         elif self.testcase.debug_gdb:
2070             print(
2071                 "sudo gdb "
2072                 + self.app_bin
2073                 + " -ex 'attach {pid}'".format(pid=self.process.pid)
2074             )
2075             print(
2076                 "Now is the time to attach gdb by running the above "
2077                 "command and set up breakpoints etc., then resume from"
2078                 " within gdb by issuing the 'continue' command"
2079             )
2080         print(single_line_delim)
2081         input("Press ENTER to continue running the testcase...")
2082
2083     def run(self):
2084         executable = self.args[0]
2085         if not os.path.exists(executable) or not os.access(
2086             executable, os.F_OK | os.X_OK
2087         ):
2088             # Exit code that means some system file did not exist,
2089             # could not be opened, or had some other kind of error.
2090             self.result = os.EX_OSFILE
2091             raise EnvironmentError(
2092                 "executable '%s' is not found or executable." % executable
2093             )
2094         self.logger.debug(
2095             "Running executable '{app}': '{cmd}'".format(
2096                 app=self.app_name, cmd=" ".join(self.args)
2097             )
2098         )
2099         env = os.environ.copy()
2100         env.update(self.env)
2101         env["CK_LOG_FILE_NAME"] = "-"
2102         self.process = subprocess.Popen(
2103             ["stdbuf", "-o0", "-e0"] + self.args,
2104             shell=False,
2105             env=env,
2106             preexec_fn=os.setpgrp,
2107             stdout=subprocess.PIPE,
2108             stderr=subprocess.PIPE,
2109         )
2110         self.wait_for_enter()
2111         out, err = self.process.communicate()
2112         self.logger.debug("Finished running `{app}'".format(app=self.app_name))
2113         self.logger.info("Return code is `%s'" % self.process.returncode)
2114         self.logger.info(single_line_delim)
2115         self.logger.info(
2116             "Executable `{app}' wrote to stdout:".format(app=self.app_name)
2117         )
2118         self.logger.info(single_line_delim)
2119         self.logger.info(out.decode("utf-8"))
2120         self.logger.info(single_line_delim)
2121         self.logger.info(
2122             "Executable `{app}' wrote to stderr:".format(app=self.app_name)
2123         )
2124         self.logger.info(single_line_delim)
2125         self.logger.info(err.decode("utf-8"))
2126         self.logger.info(single_line_delim)
2127         self.result = self.process.returncode
2128
2129
2130 if __name__ == "__main__":
2131     pass