2b197326532f3172d879ba68e2602d20d2ceba1e
[vpp.git] / test / framework.py
1 #!/usr/bin/env python3
2
3 from __future__ import print_function
4 import logging
5 import sys
6 import os
7 import select
8 import signal
9 import subprocess
10 import unittest
11 import re
12 import time
13 import faulthandler
14 import random
15 import copy
16 import platform
17 import shutil
18 from collections import deque
19 from threading import Thread, Event
20 from inspect import getdoc, isclass
21 from traceback import format_exception
22 from logging import FileHandler, DEBUG, Formatter
23 from enum import Enum
24 from abc import ABC, abstractmethod
25 from struct import pack, unpack
26
27 import scapy.compat
28 from scapy.packet import Raw, Packet
29 from config import config, available_cpus, num_cpus, max_vpp_cpus
30 import hook as hookmodule
31 from vpp_pg_interface import VppPGInterface
32 from vpp_sub_interface import VppSubInterface
33 from vpp_lo_interface import VppLoInterface
34 from vpp_bvi_interface import VppBviInterface
35 from vpp_papi_provider import VppPapiProvider
36 from vpp_papi import VppEnum
37 import vpp_papi
38 from vpp_papi.vpp_stats import VPPStats
39 from vpp_papi.vpp_transport_socket import VppTransportSocketIOError
40 from log import (
41     RED,
42     GREEN,
43     YELLOW,
44     double_line_delim,
45     single_line_delim,
46     get_logger,
47     colorize,
48 )
49 from vpp_object import VppObjectRegistry
50 from util import ppp, is_core_present
51 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
52 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
53 from scapy.layers.inet6 import ICMPv6EchoReply
54 from vpp_running import use_running
55
56
57 logger = logging.getLogger(__name__)
58
59 # Set up an empty logger for the testcase that can be overridden as necessary
60 null_logger = logging.getLogger("VppTestCase")
61 null_logger.addHandler(logging.NullHandler())
62
63 PASS = 0
64 FAIL = 1
65 ERROR = 2
66 SKIP = 3
67 TEST_RUN = 4
68 SKIP_CPU_SHORTAGE = 5
69
70
71 if config.debug_framework:
72     import debug_internal
73
74 """
75   Test framework module.
76
77   The module provides a set of tools for constructing and running tests and
78   representing the results.
79 """
80
81
82 class VppDiedError(Exception):
83     """exception for reporting that the subprocess has died."""
84
85     signals_by_value = {
86         v: k
87         for k, v in signal.__dict__.items()
88         if k.startswith("SIG") and not k.startswith("SIG_")
89     }
90
91     def __init__(self, rv=None, testcase=None, method_name=None):
92         self.rv = rv
93         self.signal_name = None
94         self.testcase = testcase
95         self.method_name = method_name
96
97         try:
98             self.signal_name = VppDiedError.signals_by_value[-rv]
99         except (KeyError, TypeError):
100             pass
101
102         if testcase is None and method_name is None:
103             in_msg = ""
104         else:
105             in_msg = " while running %s.%s" % (testcase, method_name)
106
107         if self.rv:
108             msg = "VPP subprocess died unexpectedly%s with return code: %d%s." % (
109                 in_msg,
110                 self.rv,
111                 " [%s]" % (self.signal_name if self.signal_name is not None else ""),
112             )
113         else:
114             msg = "VPP subprocess died unexpectedly%s." % in_msg
115
116         super(VppDiedError, self).__init__(msg)
117
118
119 class _PacketInfo(object):
120     """Private class to create packet info object.
121
122     Help process information about the next packet.
123     Set variables to default values.
124     """
125
126     #: Store the index of the packet.
127     index = -1
128     #: Store the index of the source packet generator interface of the packet.
129     src = -1
130     #: Store the index of the destination packet generator interface
131     #: of the packet.
132     dst = -1
133     #: Store expected ip version
134     ip = -1
135     #: Store expected upper protocol
136     proto = -1
137     #: Store the copy of the former packet.
138     data = None
139
140     def __eq__(self, other):
141         index = self.index == other.index
142         src = self.src == other.src
143         dst = self.dst == other.dst
144         data = self.data == other.data
145         return index and src and dst and data
146
147
148 def pump_output(testclass):
149     """pump output from vpp stdout/stderr to proper queues"""
150     if not hasattr(testclass, "vpp"):
151         return
152     stdout_fragment = ""
153     stderr_fragment = ""
154     while not testclass.pump_thread_stop_flag.is_set():
155         readable = select.select(
156             [
157                 testclass.vpp.stdout.fileno(),
158                 testclass.vpp.stderr.fileno(),
159                 testclass.pump_thread_wakeup_pipe[0],
160             ],
161             [],
162             [],
163         )[0]
164         if testclass.vpp.stdout.fileno() in readable:
165             read = os.read(testclass.vpp.stdout.fileno(), 102400)
166             if len(read) > 0:
167                 split = read.decode("ascii", errors="backslashreplace").splitlines(True)
168                 if len(stdout_fragment) > 0:
169                     split[0] = "%s%s" % (stdout_fragment, split[0])
170                 if len(split) > 0 and split[-1].endswith("\n"):
171                     limit = None
172                 else:
173                     limit = -1
174                     stdout_fragment = split[-1]
175                 testclass.vpp_stdout_deque.extend(split[:limit])
176                 if not config.cache_vpp_output:
177                     for line in split[:limit]:
178                         testclass.logger.info("VPP STDOUT: %s" % line.rstrip("\n"))
179         if testclass.vpp.stderr.fileno() in readable:
180             read = os.read(testclass.vpp.stderr.fileno(), 102400)
181             if len(read) > 0:
182                 split = read.decode("ascii", errors="backslashreplace").splitlines(True)
183                 if len(stderr_fragment) > 0:
184                     split[0] = "%s%s" % (stderr_fragment, split[0])
185                 if len(split) > 0 and split[-1].endswith("\n"):
186                     limit = None
187                 else:
188                     limit = -1
189                     stderr_fragment = split[-1]
190
191                 testclass.vpp_stderr_deque.extend(split[:limit])
192                 if not config.cache_vpp_output:
193                     for line in split[:limit]:
194                         testclass.logger.error("VPP STDERR: %s" % line.rstrip("\n"))
195                         # ignoring the dummy pipe here intentionally - the
196                         # flag will take care of properly terminating the loop
197
198
199 def _is_platform_aarch64():
200     return platform.machine() == "aarch64"
201
202
203 is_platform_aarch64 = _is_platform_aarch64()
204
205
206 def _is_distro_ubuntu2204():
207     with open("/etc/os-release") as f:
208         for line in f.readlines():
209             if "jammy" in line:
210                 return True
211     return False
212
213
214 is_distro_ubuntu2204 = _is_distro_ubuntu2204()
215
216
217 def _is_distro_debian11():
218     with open("/etc/os-release") as f:
219         for line in f.readlines():
220             if "bullseye" in line:
221                 return True
222     return False
223
224
225 is_distro_debian11 = _is_distro_debian11()
226
227
228 class KeepAliveReporter(object):
229     """
230     Singleton object which reports test start to parent process
231     """
232
233     _shared_state = {}
234
235     def __init__(self):
236         self.__dict__ = self._shared_state
237         self._pipe = None
238
239     @property
240     def pipe(self):
241         return self._pipe
242
243     @pipe.setter
244     def pipe(self, pipe):
245         if self._pipe is not None:
246             raise Exception("Internal error - pipe should only be set once.")
247         self._pipe = pipe
248
249     def send_keep_alive(self, test, desc=None):
250         """
251         Write current test tmpdir & desc to keep-alive pipe to signal liveness
252         """
253         if not hasattr(test, "vpp") or self.pipe is None:
254             # if not running forked..
255             return
256
257         if isclass(test):
258             desc = "%s (%s)" % (desc, unittest.util.strclass(test))
259         else:
260             desc = test.id()
261
262         self.pipe.send((desc, config.vpp, test.tempdir, test.vpp.pid))
263
264
265 class TestCaseTag(Enum):
266     # marks the suites that must run at the end
267     # using only a single test runner
268     RUN_SOLO = 1
269     # marks the suites broken on VPP multi-worker
270     FIXME_VPP_WORKERS = 2
271     # marks the suites broken when ASan is enabled
272     FIXME_ASAN = 3
273     # marks suites broken on Ubuntu-22.04
274     FIXME_UBUNTU2204 = 4
275     # marks suites broken on Debian-11
276     FIXME_DEBIAN11 = 5
277     # marks suites broken on debug vpp image
278     FIXME_VPP_DEBUG = 6
279
280
281 def create_tag_decorator(e):
282     def decorator(cls):
283         try:
284             cls.test_tags.append(e)
285         except AttributeError:
286             cls.test_tags = [e]
287         return cls
288
289     return decorator
290
291
292 tag_run_solo = create_tag_decorator(TestCaseTag.RUN_SOLO)
293 tag_fixme_vpp_workers = create_tag_decorator(TestCaseTag.FIXME_VPP_WORKERS)
294 tag_fixme_asan = create_tag_decorator(TestCaseTag.FIXME_ASAN)
295 tag_fixme_ubuntu2204 = create_tag_decorator(TestCaseTag.FIXME_UBUNTU2204)
296 tag_fixme_debian11 = create_tag_decorator(TestCaseTag.FIXME_DEBIAN11)
297 tag_fixme_vpp_debug = create_tag_decorator(TestCaseTag.FIXME_VPP_DEBUG)
298
299
300 class DummyVpp:
301     returncode = None
302     pid = 0xCAFEBAFE
303
304     def poll(self):
305         pass
306
307     def terminate(self):
308         pass
309
310
311 class CPUInterface(ABC):
312     cpus = []
313     skipped_due_to_cpu_lack = False
314
315     @classmethod
316     @abstractmethod
317     def get_cpus_required(cls):
318         pass
319
320     @classmethod
321     def assign_cpus(cls, cpus):
322         cls.cpus = cpus
323
324
325 @use_running
326 class VppTestCase(CPUInterface, unittest.TestCase):
327     """This subclass is a base class for VPP test cases that are implemented as
328     classes. It provides methods to create and run test case.
329     """
330
331     extra_vpp_statseg_config = ""
332     extra_vpp_punt_config = []
333     extra_vpp_plugin_config = []
334     logger = null_logger
335     vapi_response_timeout = 5
336     remove_configured_vpp_objects_on_tear_down = True
337
338     @property
339     def packet_infos(self):
340         """List of packet infos"""
341         return self._packet_infos
342
343     @classmethod
344     def get_packet_count_for_if_idx(cls, dst_if_index):
345         """Get the number of packet info for specified destination if index"""
346         if dst_if_index in cls._packet_count_for_dst_if_idx:
347             return cls._packet_count_for_dst_if_idx[dst_if_index]
348         else:
349             return 0
350
351     @classmethod
352     def has_tag(cls, tag):
353         """if the test case has a given tag - return true"""
354         try:
355             return tag in cls.test_tags
356         except AttributeError:
357             pass
358         return False
359
360     @classmethod
361     def is_tagged_run_solo(cls):
362         """if the test case class is timing-sensitive - return true"""
363         return cls.has_tag(TestCaseTag.RUN_SOLO)
364
365     @classmethod
366     def skip_fixme_asan(cls):
367         """if @tag_fixme_asan & ASan is enabled - mark for skip"""
368         if cls.has_tag(TestCaseTag.FIXME_ASAN):
369             vpp_extra_cmake_args = os.environ.get("VPP_EXTRA_CMAKE_ARGS", "")
370             if "DVPP_ENABLE_SANITIZE_ADDR=ON" in vpp_extra_cmake_args:
371                 cls = unittest.skip("Skipping @tag_fixme_asan tests")(cls)
372
373     @classmethod
374     def skip_fixme_ubuntu2204(cls):
375         """if distro is ubuntu 22.04 and @tag_fixme_ubuntu2204 mark for skip"""
376         if cls.has_tag(TestCaseTag.FIXME_UBUNTU2204):
377             cls = unittest.skip("Skipping @tag_fixme_ubuntu2204 tests")(cls)
378
379     @classmethod
380     def skip_fixme_debian11(cls):
381         """if distro is Debian-11 and @tag_fixme_debian11 mark for skip"""
382         if cls.has_tag(TestCaseTag.FIXME_DEBIAN11):
383             cls = unittest.skip("Skipping @tag_fixme_debian11 tests")(cls)
384
385     @classmethod
386     def skip_fixme_vpp_debug(cls):
387         cls = unittest.skip("Skipping @tag_fixme_vpp_debug tests")(cls)
388
389     @classmethod
390     def instance(cls):
391         """Return the instance of this testcase"""
392         return cls.test_instance
393
394     @classmethod
395     def set_debug_flags(cls, d):
396         cls.gdbserver_port = 7777
397         cls.debug_core = False
398         cls.debug_gdb = False
399         cls.debug_gdbserver = False
400         cls.debug_all = False
401         cls.debug_attach = False
402         if d is None:
403             return
404         dl = d.lower()
405         if dl == "core":
406             cls.debug_core = True
407         elif dl == "gdb" or dl == "gdb-all":
408             cls.debug_gdb = True
409         elif dl == "gdbserver" or dl == "gdbserver-all":
410             cls.debug_gdbserver = True
411         elif dl == "attach":
412             cls.debug_attach = True
413         else:
414             raise Exception("Unrecognized DEBUG option: '%s'" % d)
415         if dl == "gdb-all" or dl == "gdbserver-all":
416             cls.debug_all = True
417
418     @classmethod
419     def get_vpp_worker_count(cls):
420         if not hasattr(cls, "vpp_worker_count"):
421             if cls.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
422                 cls.vpp_worker_count = 0
423             else:
424                 cls.vpp_worker_count = config.vpp_worker_count
425         return cls.vpp_worker_count
426
427     @classmethod
428     def get_cpus_required(cls):
429         return 1 + cls.get_vpp_worker_count()
430
431     @classmethod
432     def setUpConstants(cls):
433         """Set-up the test case class based on environment variables"""
434         cls.step = config.step
435         cls.plugin_path = ":".join(config.vpp_plugin_dir)
436         cls.test_plugin_path = ":".join(config.vpp_test_plugin_dir)
437         cls.extern_plugin_path = ":".join(config.extern_plugin_dir)
438         debug_cli = ""
439         if cls.step or cls.debug_gdb or cls.debug_gdbserver:
440             debug_cli = "cli-listen localhost:5002"
441         size = re.search(r"\d+[gG]", config.coredump_size)
442         if size:
443             coredump_size = f"coredump-size {config.coredump_size}".lower()
444         else:
445             coredump_size = "coredump-size unlimited"
446         default_variant = config.variant
447         if default_variant is not None:
448             default_variant = "default { variant %s 100 }" % default_variant
449         else:
450             default_variant = ""
451
452         api_fuzzing = config.api_fuzz
453         if api_fuzzing is None:
454             api_fuzzing = "off"
455
456         cls.vpp_cmdline = [
457             config.vpp,
458             "unix",
459             "{",
460             "nodaemon",
461             debug_cli,
462             "full-coredump",
463             coredump_size,
464             "runtime-dir",
465             cls.tempdir,
466             "}",
467             "api-trace",
468             "{",
469             "on",
470             "}",
471             "api-segment",
472             "{",
473             "prefix",
474             cls.get_api_segment_prefix(),
475             "}",
476             "cpu",
477             "{",
478             "main-core",
479             str(cls.cpus[0]),
480         ]
481         if cls.extern_plugin_path not in (None, ""):
482             cls.extra_vpp_plugin_config.append("add-path %s" % cls.extern_plugin_path)
483         if cls.get_vpp_worker_count():
484             cls.vpp_cmdline.extend(
485                 ["corelist-workers", ",".join([str(x) for x in cls.cpus[1:]])]
486             )
487         cls.vpp_cmdline.extend(
488             [
489                 "}",
490                 "physmem",
491                 "{",
492                 "max-size",
493                 "32m",
494                 "}",
495                 "statseg",
496                 "{",
497                 "socket-name",
498                 cls.get_stats_sock_path(),
499                 cls.extra_vpp_statseg_config,
500                 "}",
501                 "socksvr",
502                 "{",
503                 "socket-name",
504                 cls.get_api_sock_path(),
505                 "}",
506                 "node { ",
507                 default_variant,
508                 "}",
509                 "api-fuzz {",
510                 api_fuzzing,
511                 "}",
512                 "plugins",
513                 "{",
514                 "plugin",
515                 "dpdk_plugin.so",
516                 "{",
517                 "disable",
518                 "}",
519                 "plugin",
520                 "rdma_plugin.so",
521                 "{",
522                 "disable",
523                 "}",
524                 "plugin",
525                 "lisp_unittest_plugin.so",
526                 "{",
527                 "enable",
528                 "}",
529                 "plugin",
530                 "unittest_plugin.so",
531                 "{",
532                 "enable",
533                 "}",
534             ]
535             + cls.extra_vpp_plugin_config
536             + [
537                 "}",
538             ]
539         )
540
541         if cls.extra_vpp_punt_config is not None:
542             cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
543
544         if not cls.debug_attach:
545             cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
546             cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
547
548     @classmethod
549     def wait_for_enter(cls):
550         if cls.debug_gdbserver:
551             print(double_line_delim)
552             print("Spawned GDB server with PID: %d" % cls.vpp.pid)
553         elif cls.debug_gdb:
554             print(double_line_delim)
555             print("Spawned VPP with PID: %d" % cls.vpp.pid)
556         else:
557             cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
558             return
559         print(single_line_delim)
560         print("You can debug VPP using:")
561         if cls.debug_gdbserver:
562             print(
563                 f"sudo gdb {config.vpp} "
564                 f"-ex 'target remote localhost:{cls.gdbserver_port}'"
565             )
566             print(
567                 "Now is the time to attach gdb by running the above "
568                 "command, set up breakpoints etc., then resume VPP from "
569                 "within gdb by issuing the 'continue' command"
570             )
571             cls.gdbserver_port += 1
572         elif cls.debug_gdb:
573             print(f"sudo gdb {config.vpp} -ex 'attach {cls.vpp.pid}'")
574             print(
575                 "Now is the time to attach gdb by running the above "
576                 "command and set up breakpoints etc., then resume VPP from"
577                 " within gdb by issuing the 'continue' command"
578             )
579         print(single_line_delim)
580         input("Press ENTER to continue running the testcase...")
581
582     @classmethod
583     def attach_vpp(cls):
584         cls.vpp = DummyVpp()
585
586     @classmethod
587     def run_vpp(cls):
588         if (
589             is_distro_ubuntu2204 == True and cls.has_tag(TestCaseTag.FIXME_UBUNTU2204)
590         ) or (is_distro_debian11 == True and cls.has_tag(TestCaseTag.FIXME_DEBIAN11)):
591             return
592         cls.logger.debug(f"Assigned cpus: {cls.cpus}")
593         cmdline = cls.vpp_cmdline
594
595         if cls.debug_gdbserver:
596             gdbserver = "/usr/bin/gdbserver"
597             if not os.path.isfile(gdbserver) or not os.access(gdbserver, os.X_OK):
598                 raise Exception(
599                     "gdbserver binary '%s' does not exist or is "
600                     "not executable" % gdbserver
601                 )
602
603             cmdline = [
604                 gdbserver,
605                 "localhost:{port}".format(port=cls.gdbserver_port),
606             ] + cls.vpp_cmdline
607             cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
608
609         try:
610             cls.vpp = subprocess.Popen(
611                 cmdline, stdout=subprocess.PIPE, stderr=subprocess.PIPE
612             )
613         except subprocess.CalledProcessError as e:
614             cls.logger.critical(
615                 "Subprocess returned with non-0 return code: (%s)", e.returncode
616             )
617             raise
618         except OSError as e:
619             cls.logger.critical(
620                 "Subprocess returned with OS error: (%s) %s", e.errno, e.strerror
621             )
622             raise
623         except Exception as e:
624             cls.logger.exception("Subprocess returned unexpected from %s:", cmdline)
625             raise
626
627         cls.wait_for_enter()
628
629     @classmethod
630     def wait_for_coredump(cls):
631         corefile = cls.tempdir + "/core"
632         if os.path.isfile(corefile):
633             cls.logger.error("Waiting for coredump to complete: %s", corefile)
634             curr_size = os.path.getsize(corefile)
635             deadline = time.time() + 60
636             ok = False
637             while time.time() < deadline:
638                 cls.sleep(1)
639                 size = curr_size
640                 curr_size = os.path.getsize(corefile)
641                 if size == curr_size:
642                     ok = True
643                     break
644             if not ok:
645                 cls.logger.error(
646                     "Timed out waiting for coredump to complete: %s", corefile
647                 )
648             else:
649                 cls.logger.error("Coredump complete: %s, size %d", corefile, curr_size)
650
651     @classmethod
652     def get_stats_sock_path(cls):
653         return "%s/stats.sock" % cls.tempdir
654
655     @classmethod
656     def get_api_sock_path(cls):
657         return "%s/api.sock" % cls.tempdir
658
659     @classmethod
660     def get_api_segment_prefix(cls):
661         return os.path.basename(cls.tempdir)  # Only used for VAPI
662
663     @classmethod
664     def get_tempdir(cls):
665         if cls.debug_attach:
666             tmpdir = f"{config.tmp_dir}/unittest-attach-gdb"
667         else:
668             tmpdir = f"{config.tmp_dir}/vpp-unittest-{cls.__name__}"
669             if config.wipe_tmp_dir:
670                 shutil.rmtree(tmpdir, ignore_errors=True)
671             os.mkdir(tmpdir)
672         return tmpdir
673
674     @classmethod
675     def create_file_handler(cls):
676         if config.log_dir is None:
677             cls.file_handler = FileHandler(f"{cls.tempdir}/log.txt")
678             return
679
680         logdir = f"{config.log_dir}/vpp-unittest-{cls.__name__}"
681         if config.wipe_tmp_dir:
682             shutil.rmtree(logdir, ignore_errors=True)
683         os.mkdir(logdir)
684         cls.file_handler = FileHandler(f"{logdir}/log.txt")
685
686     @classmethod
687     def setUpClass(cls):
688         """
689         Perform class setup before running the testcase
690         Remove shared memory files, start vpp and connect the vpp-api
691         """
692         super(VppTestCase, cls).setUpClass()
693         cls.logger = get_logger(cls.__name__)
694         random.seed(config.rnd_seed)
695         if hasattr(cls, "parallel_handler"):
696             cls.logger.addHandler(cls.parallel_handler)
697             cls.logger.propagate = False
698         cls.set_debug_flags(config.debug)
699         cls.tempdir = cls.get_tempdir()
700         cls.create_file_handler()
701         cls.file_handler.setFormatter(
702             Formatter(fmt="%(asctime)s,%(msecs)03d %(message)s", datefmt="%H:%M:%S")
703         )
704         cls.file_handler.setLevel(DEBUG)
705         cls.logger.addHandler(cls.file_handler)
706         cls.logger.debug("--- setUpClass() for %s called ---" % cls.__name__)
707         os.chdir(cls.tempdir)
708         cls.logger.info(
709             "Temporary dir is %s, api socket is %s",
710             cls.tempdir,
711             cls.get_api_sock_path(),
712         )
713         cls.logger.debug("Random seed is %s", config.rnd_seed)
714         cls.setUpConstants()
715         cls.reset_packet_infos()
716         cls._pcaps = []
717         cls._old_pcaps = []
718         cls.verbose = 0
719         cls.vpp_dead = False
720         cls.registry = VppObjectRegistry()
721         cls.vpp_startup_failed = False
722         cls.reporter = KeepAliveReporter()
723         # need to catch exceptions here because if we raise, then the cleanup
724         # doesn't get called and we might end with a zombie vpp
725         try:
726             if cls.debug_attach:
727                 cls.attach_vpp()
728             else:
729                 cls.run_vpp()
730                 if not hasattr(cls, "vpp"):
731                     return
732             cls.reporter.send_keep_alive(cls, "setUpClass")
733             VppTestResult.current_test_case_info = TestCaseInfo(
734                 cls.logger, cls.tempdir, cls.vpp.pid, config.vpp
735             )
736             cls.vpp_stdout_deque = deque()
737             cls.vpp_stderr_deque = deque()
738             # Pump thread in a non-debug-attached & not running-vpp
739             if not cls.debug_attach and not hasattr(cls, "running_vpp"):
740                 cls.pump_thread_stop_flag = Event()
741                 cls.pump_thread_wakeup_pipe = os.pipe()
742                 cls.pump_thread = Thread(target=pump_output, args=(cls,))
743                 cls.pump_thread.daemon = True
744                 cls.pump_thread.start()
745             if cls.debug_gdb or cls.debug_gdbserver or cls.debug_attach:
746                 cls.vapi_response_timeout = 0
747             cls.vapi = VppPapiProvider(cls.__name__, cls, cls.vapi_response_timeout)
748             if cls.step:
749                 hook = hookmodule.StepHook(cls)
750             else:
751                 hook = hookmodule.PollHook(cls)
752             cls.vapi.register_hook(hook)
753             cls.statistics = VPPStats(socketname=cls.get_stats_sock_path())
754             try:
755                 hook.poll_vpp()
756             except VppDiedError:
757                 cls.vpp_startup_failed = True
758                 cls.logger.critical(
759                     "VPP died shortly after startup, check the"
760                     " output to standard error for possible cause"
761                 )
762                 raise
763             try:
764                 cls.vapi.connect()
765             except (vpp_papi.VPPIOError, Exception) as e:
766                 cls.logger.debug("Exception connecting to vapi: %s" % e)
767                 cls.vapi.disconnect()
768
769                 if cls.debug_gdbserver:
770                     print(
771                         colorize(
772                             "You're running VPP inside gdbserver but "
773                             "VPP-API connection failed, did you forget "
774                             "to 'continue' VPP from within gdb?",
775                             RED,
776                         )
777                     )
778                 raise e
779             if cls.debug_attach:
780                 last_line = cls.vapi.cli("show thread").split("\n")[-2]
781                 cls.vpp_worker_count = int(last_line.split(" ")[0])
782                 print("Detected VPP with %s workers." % cls.vpp_worker_count)
783         except vpp_papi.VPPRuntimeError as e:
784             cls.logger.debug("%s" % e)
785             cls.quit()
786             raise e
787         except Exception as e:
788             cls.logger.debug("Exception connecting to VPP: %s" % e)
789             cls.quit()
790             raise e
791
792     @classmethod
793     def _debug_quit(cls):
794         if cls.debug_gdbserver or cls.debug_gdb:
795             try:
796                 cls.vpp.poll()
797
798                 if cls.vpp.returncode is None:
799                     print()
800                     print(double_line_delim)
801                     print("VPP or GDB server is still running")
802                     print(single_line_delim)
803                     input(
804                         "When done debugging, press ENTER to kill the "
805                         "process and finish running the testcase..."
806                     )
807             except AttributeError:
808                 pass
809
810     @classmethod
811     def quit(cls):
812         """
813         Disconnect vpp-api, kill vpp and cleanup shared memory files
814         """
815         cls._debug_quit()
816         if hasattr(cls, "running_vpp"):
817             cls.vpp.quit_vpp()
818
819         # first signal that we want to stop the pump thread, then wake it up
820         if hasattr(cls, "pump_thread_stop_flag"):
821             cls.pump_thread_stop_flag.set()
822         if hasattr(cls, "pump_thread_wakeup_pipe"):
823             os.write(cls.pump_thread_wakeup_pipe[1], b"ding dong wake up")
824         if hasattr(cls, "pump_thread"):
825             cls.logger.debug("Waiting for pump thread to stop")
826             cls.pump_thread.join()
827         if hasattr(cls, "vpp_stderr_reader_thread"):
828             cls.logger.debug("Waiting for stderr pump to stop")
829             cls.vpp_stderr_reader_thread.join()
830
831         if hasattr(cls, "vpp"):
832             if hasattr(cls, "vapi"):
833                 cls.logger.debug(cls.vapi.vpp.get_stats())
834                 cls.logger.debug("Disconnecting class vapi client on %s", cls.__name__)
835                 cls.vapi.disconnect()
836                 cls.logger.debug("Deleting class vapi attribute on %s", cls.__name__)
837                 del cls.vapi
838             cls.vpp.poll()
839             if not cls.debug_attach and cls.vpp.returncode is None:
840                 cls.wait_for_coredump()
841                 cls.logger.debug("Sending TERM to vpp")
842                 cls.vpp.terminate()
843                 cls.logger.debug("Waiting for vpp to die")
844                 try:
845                     outs, errs = cls.vpp.communicate(timeout=5)
846                 except subprocess.TimeoutExpired:
847                     cls.vpp.kill()
848                     outs, errs = cls.vpp.communicate()
849             cls.logger.debug("Deleting class vpp attribute on %s", cls.__name__)
850             if not cls.debug_attach and not hasattr(cls, "running_vpp"):
851                 cls.vpp.stdout.close()
852                 cls.vpp.stderr.close()
853             # If vpp is a dynamic attribute set by the func use_running,
854             # deletion will result in an AttributeError that we can
855             # safetly pass.
856             try:
857                 del cls.vpp
858             except AttributeError:
859                 pass
860
861         if cls.vpp_startup_failed:
862             stdout_log = cls.logger.info
863             stderr_log = cls.logger.critical
864         else:
865             stdout_log = cls.logger.info
866             stderr_log = cls.logger.info
867
868         if hasattr(cls, "vpp_stdout_deque"):
869             stdout_log(single_line_delim)
870             stdout_log("VPP output to stdout while running %s:", cls.__name__)
871             stdout_log(single_line_delim)
872             vpp_output = "".join(cls.vpp_stdout_deque)
873             with open(cls.tempdir + "/vpp_stdout.txt", "w") as f:
874                 f.write(vpp_output)
875             stdout_log("\n%s", vpp_output)
876             stdout_log(single_line_delim)
877
878         if hasattr(cls, "vpp_stderr_deque"):
879             stderr_log(single_line_delim)
880             stderr_log("VPP output to stderr while running %s:", cls.__name__)
881             stderr_log(single_line_delim)
882             vpp_output = "".join(cls.vpp_stderr_deque)
883             with open(cls.tempdir + "/vpp_stderr.txt", "w") as f:
884                 f.write(vpp_output)
885             stderr_log("\n%s", vpp_output)
886             stderr_log(single_line_delim)
887
888     @classmethod
889     def tearDownClass(cls):
890         """Perform final cleanup after running all tests in this test-case"""
891         cls.logger.debug("--- tearDownClass() for %s called ---" % cls.__name__)
892         if not hasattr(cls, "vpp"):
893             return
894         cls.reporter.send_keep_alive(cls, "tearDownClass")
895         cls.quit()
896         cls.file_handler.close()
897         cls.reset_packet_infos()
898         if config.debug_framework:
899             debug_internal.on_tear_down_class(cls)
900
901     def show_commands_at_teardown(self):
902         """Allow subclass specific teardown logging additions."""
903         self.logger.info("--- No test specific show commands provided. ---")
904
905     def tearDown(self):
906         """Show various debug prints after each test"""
907         self.logger.debug(
908             "--- tearDown() for %s.%s(%s) called ---"
909             % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
910         )
911         if not hasattr(self, "vpp"):
912             return
913
914         try:
915             if not self.vpp_dead:
916                 self.logger.debug(self.vapi.cli("show trace max 1000"))
917                 self.logger.info(self.vapi.ppcli("show interface"))
918                 self.logger.info(self.vapi.ppcli("show hardware"))
919                 self.logger.info(self.statistics.set_errors_str())
920                 self.logger.info(self.vapi.ppcli("show run"))
921                 self.logger.info(self.vapi.ppcli("show log"))
922                 self.logger.info(self.vapi.ppcli("show bihash"))
923                 self.logger.info("Logging testcase specific show commands.")
924                 self.show_commands_at_teardown()
925                 if self.remove_configured_vpp_objects_on_tear_down:
926                     self.registry.remove_vpp_config(self.logger)
927             # Save/Dump VPP api trace log
928             m = self._testMethodName
929             api_trace = "vpp_api_trace.%s.%d.log" % (m, self.vpp.pid)
930             tmp_api_trace = "/tmp/%s" % api_trace
931             vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
932             self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
933             self.logger.info("Moving %s to %s\n" % (tmp_api_trace, vpp_api_trace_log))
934             os.rename(tmp_api_trace, vpp_api_trace_log)
935         except VppTransportSocketIOError:
936             self.logger.debug(
937                 "VppTransportSocketIOError: Vpp dead. Cannot log show commands."
938             )
939             self.vpp_dead = True
940         else:
941             self.registry.unregister_all(self.logger)
942
943     def setUp(self):
944         """Clear trace before running each test"""
945         super(VppTestCase, self).setUp()
946         if not hasattr(self, "vpp"):
947             return
948         self.reporter.send_keep_alive(self)
949         if self.vpp_dead:
950             raise VppDiedError(
951                 rv=None,
952                 testcase=self.__class__.__name__,
953                 method_name=self._testMethodName,
954             )
955         self.sleep(0.1, "during setUp")
956         self.vpp_stdout_deque.append(
957             "--- test setUp() for %s.%s(%s) starts here ---\n"
958             % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
959         )
960         self.vpp_stderr_deque.append(
961             "--- test setUp() for %s.%s(%s) starts here ---\n"
962             % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
963         )
964         self.vapi.cli("clear trace")
965         # store the test instance inside the test class - so that objects
966         # holding the class can access instance methods (like assertEqual)
967         type(self).test_instance = self
968
969     @classmethod
970     def pg_enable_capture(cls, interfaces=None):
971         """
972         Enable capture on packet-generator interfaces
973
974         :param interfaces: iterable interface indexes (if None,
975                            use self.pg_interfaces)
976
977         """
978         if interfaces is None:
979             interfaces = cls.pg_interfaces
980         for i in interfaces:
981             i.enable_capture()
982
983     @classmethod
984     def register_pcap(cls, intf, worker):
985         """Register a pcap in the testclass"""
986         # add to the list of captures with current timestamp
987         cls._pcaps.append((intf, worker))
988
989     @classmethod
990     def get_vpp_time(cls):
991         # processes e.g. "Time now 2.190522, Wed, 11 Mar 2020 17:29:54 GMT"
992         # returns float("2.190522")
993         timestr = cls.vapi.cli("show clock")
994         head, sep, tail = timestr.partition(",")
995         head, sep, tail = head.partition("Time now")
996         return float(tail)
997
998     @classmethod
999     def sleep_on_vpp_time(cls, sec):
1000         """Sleep according to time in VPP world"""
1001         # On a busy system with many processes
1002         # we might end up with VPP time being slower than real world
1003         # So take that into account when waiting for VPP to do something
1004         start_time = cls.get_vpp_time()
1005         while cls.get_vpp_time() - start_time < sec:
1006             cls.sleep(0.1)
1007
1008     @classmethod
1009     def pg_start(cls, trace=True):
1010         """Enable the PG, wait till it is done, then clean up"""
1011         for (intf, worker) in cls._old_pcaps:
1012             intf.handle_old_pcap_file(intf.get_in_path(worker), intf.in_history_counter)
1013         cls._old_pcaps = []
1014         if trace:
1015             cls.vapi.cli("clear trace")
1016             cls.vapi.cli("trace add pg-input 1000")
1017         cls.vapi.cli("packet-generator enable")
1018         # PG, when starts, runs to completion -
1019         # so let's avoid a race condition,
1020         # and wait a little till it's done.
1021         # Then clean it up  - and then be gone.
1022         deadline = time.time() + 300
1023         while cls.vapi.cli("show packet-generator").find("Yes") != -1:
1024             cls.sleep(0.01)  # yield
1025             if time.time() > deadline:
1026                 cls.logger.error("Timeout waiting for pg to stop")
1027                 break
1028         for intf, worker in cls._pcaps:
1029             cls.vapi.cli("packet-generator delete %s" % intf.get_cap_name(worker))
1030         cls._old_pcaps = cls._pcaps
1031         cls._pcaps = []
1032
1033     @classmethod
1034     def create_pg_interfaces_internal(cls, interfaces, gso=0, gso_size=0, mode=None):
1035         """
1036         Create packet-generator interfaces.
1037
1038         :param interfaces: iterable indexes of the interfaces.
1039         :returns: List of created interfaces.
1040
1041         """
1042         result = []
1043         for i in interfaces:
1044             intf = VppPGInterface(cls, i, gso, gso_size, mode)
1045             setattr(cls, intf.name, intf)
1046             result.append(intf)
1047         cls.pg_interfaces = result
1048         return result
1049
1050     @classmethod
1051     def create_pg_ip4_interfaces(cls, interfaces, gso=0, gso_size=0):
1052         if not hasattr(cls, "vpp"):
1053             cls.pg_interfaces = []
1054             return cls.pg_interfaces
1055         pgmode = VppEnum.vl_api_pg_interface_mode_t
1056         return cls.create_pg_interfaces_internal(
1057             interfaces, gso, gso_size, pgmode.PG_API_MODE_IP4
1058         )
1059
1060     @classmethod
1061     def create_pg_ip6_interfaces(cls, interfaces, gso=0, gso_size=0):
1062         if not hasattr(cls, "vpp"):
1063             cls.pg_interfaces = []
1064             return cls.pg_interfaces
1065         pgmode = VppEnum.vl_api_pg_interface_mode_t
1066         return cls.create_pg_interfaces_internal(
1067             interfaces, gso, gso_size, pgmode.PG_API_MODE_IP6
1068         )
1069
1070     @classmethod
1071     def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0):
1072         if not hasattr(cls, "vpp"):
1073             cls.pg_interfaces = []
1074             return cls.pg_interfaces
1075         pgmode = VppEnum.vl_api_pg_interface_mode_t
1076         return cls.create_pg_interfaces_internal(
1077             interfaces, gso, gso_size, pgmode.PG_API_MODE_ETHERNET
1078         )
1079
1080     @classmethod
1081     def create_pg_ethernet_interfaces(cls, interfaces, gso=0, gso_size=0):
1082         if not hasattr(cls, "vpp"):
1083             cls.pg_interfaces = []
1084             return cls.pg_interfaces
1085         pgmode = VppEnum.vl_api_pg_interface_mode_t
1086         return cls.create_pg_interfaces_internal(
1087             interfaces, gso, gso_size, pgmode.PG_API_MODE_ETHERNET
1088         )
1089
1090     @classmethod
1091     def create_loopback_interfaces(cls, count):
1092         """
1093         Create loopback interfaces.
1094
1095         :param count: number of interfaces created.
1096         :returns: List of created interfaces.
1097         """
1098         if not hasattr(cls, "vpp"):
1099             cls.lo_interfaces = []
1100             return cls.lo_interfaces
1101         result = [VppLoInterface(cls) for i in range(count)]
1102         for intf in result:
1103             setattr(cls, intf.name, intf)
1104         cls.lo_interfaces = result
1105         return result
1106
1107     @classmethod
1108     def create_bvi_interfaces(cls, count):
1109         """
1110         Create BVI interfaces.
1111
1112         :param count: number of interfaces created.
1113         :returns: List of created interfaces.
1114         """
1115         if not hasattr(cls, "vpp"):
1116             cls.bvi_interfaces = []
1117             return cls.bvi_interfaces
1118         result = [VppBviInterface(cls) for i in range(count)]
1119         for intf in result:
1120             setattr(cls, intf.name, intf)
1121         cls.bvi_interfaces = result
1122         return result
1123
1124     @staticmethod
1125     def extend_packet(packet, size, padding=" "):
1126         """
1127         Extend packet to given size by padding with spaces or custom padding
1128         NOTE: Currently works only when Raw layer is present.
1129
1130         :param packet: packet
1131         :param size: target size
1132         :param padding: padding used to extend the payload
1133
1134         """
1135         packet_len = len(packet) + 4
1136         extend = size - packet_len
1137         if extend > 0:
1138             num = (extend // len(padding)) + 1
1139             packet[Raw].load += (padding * num)[:extend].encode("ascii")
1140
1141     @classmethod
1142     def reset_packet_infos(cls):
1143         """Reset the list of packet info objects and packet counts to zero"""
1144         cls._packet_infos = {}
1145         cls._packet_count_for_dst_if_idx = {}
1146
1147     @classmethod
1148     def create_packet_info(cls, src_if, dst_if):
1149         """
1150         Create packet info object containing the source and destination indexes
1151         and add it to the testcase's packet info list
1152
1153         :param VppInterface src_if: source interface
1154         :param VppInterface dst_if: destination interface
1155
1156         :returns: _PacketInfo object
1157
1158         """
1159         info = _PacketInfo()
1160         info.index = len(cls._packet_infos)
1161         info.src = src_if.sw_if_index
1162         info.dst = dst_if.sw_if_index
1163         if isinstance(dst_if, VppSubInterface):
1164             dst_idx = dst_if.parent.sw_if_index
1165         else:
1166             dst_idx = dst_if.sw_if_index
1167         if dst_idx in cls._packet_count_for_dst_if_idx:
1168             cls._packet_count_for_dst_if_idx[dst_idx] += 1
1169         else:
1170             cls._packet_count_for_dst_if_idx[dst_idx] = 1
1171         cls._packet_infos[info.index] = info
1172         return info
1173
1174     @staticmethod
1175     def info_to_payload(info):
1176         """
1177         Convert _PacketInfo object to packet payload
1178
1179         :param info: _PacketInfo object
1180
1181         :returns: string containing serialized data from packet info
1182         """
1183
1184         # retrieve payload, currently 18 bytes (4 x ints + 1 short)
1185         return pack("iiiih", info.index, info.src, info.dst, info.ip, info.proto)
1186
1187     @staticmethod
1188     def payload_to_info(payload, payload_field="load"):
1189         """
1190         Convert packet payload to _PacketInfo object
1191
1192         :param payload: packet payload
1193         :type payload:  <class 'scapy.packet.Raw'>
1194         :param payload_field: packet fieldname of payload "load" for
1195                 <class 'scapy.packet.Raw'>
1196         :type payload_field: str
1197         :returns: _PacketInfo object containing de-serialized data from payload
1198
1199         """
1200
1201         # retrieve payload, currently 18 bytes (4 x ints + 1 short)
1202         payload_b = getattr(payload, payload_field)[:18]
1203
1204         info = _PacketInfo()
1205         info.index, info.src, info.dst, info.ip, info.proto = unpack("iiiih", payload_b)
1206
1207         # some SRv6 TCs depend on get an exception if bad values are detected
1208         if info.index > 0x4000:
1209             raise ValueError("Index value is invalid")
1210
1211         return info
1212
1213     def get_next_packet_info(self, info):
1214         """
1215         Iterate over the packet info list stored in the testcase
1216         Start iteration with first element if info is None
1217         Continue based on index in info if info is specified
1218
1219         :param info: info or None
1220         :returns: next info in list or None if no more infos
1221         """
1222         if info is None:
1223             next_index = 0
1224         else:
1225             next_index = info.index + 1
1226         if next_index == len(self._packet_infos):
1227             return None
1228         else:
1229             return self._packet_infos[next_index]
1230
1231     def get_next_packet_info_for_interface(self, src_index, info):
1232         """
1233         Search the packet info list for the next packet info with same source
1234         interface index
1235
1236         :param src_index: source interface index to search for
1237         :param info: packet info - where to start the search
1238         :returns: packet info or None
1239
1240         """
1241         while True:
1242             info = self.get_next_packet_info(info)
1243             if info is None:
1244                 return None
1245             if info.src == src_index:
1246                 return info
1247
1248     def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
1249         """
1250         Search the packet info list for the next packet info with same source
1251         and destination interface indexes
1252
1253         :param src_index: source interface index to search for
1254         :param dst_index: destination interface index to search for
1255         :param info: packet info - where to start the search
1256         :returns: packet info or None
1257
1258         """
1259         while True:
1260             info = self.get_next_packet_info_for_interface(src_index, info)
1261             if info is None:
1262                 return None
1263             if info.dst == dst_index:
1264                 return info
1265
1266     def assert_equal(self, real_value, expected_value, name_or_class=None):
1267         if name_or_class is None:
1268             self.assertEqual(real_value, expected_value)
1269             return
1270         try:
1271             msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
1272             msg = msg % (
1273                 getdoc(name_or_class).strip(),
1274                 real_value,
1275                 str(name_or_class(real_value)),
1276                 expected_value,
1277                 str(name_or_class(expected_value)),
1278             )
1279         except Exception:
1280             msg = "Invalid %s: %s does not match expected value %s" % (
1281                 name_or_class,
1282                 real_value,
1283                 expected_value,
1284             )
1285
1286         self.assertEqual(real_value, expected_value, msg)
1287
1288     def assert_in_range(self, real_value, expected_min, expected_max, name=None):
1289         if name is None:
1290             msg = None
1291         else:
1292             msg = "Invalid %s: %s out of range <%s,%s>" % (
1293                 name,
1294                 real_value,
1295                 expected_min,
1296                 expected_max,
1297             )
1298         self.assertTrue(expected_min <= real_value <= expected_max, msg)
1299
1300     def assert_packet_checksums_valid(self, packet, ignore_zero_udp_checksums=True):
1301         received = packet.__class__(scapy.compat.raw(packet))
1302         udp_layers = ["UDP", "UDPerror"]
1303         checksum_fields = ["cksum", "chksum"]
1304         checksums = []
1305         counter = 0
1306         temp = received.__class__(scapy.compat.raw(received))
1307         while True:
1308             layer = temp.getlayer(counter)
1309             if layer:
1310                 layer = layer.copy()
1311                 layer.remove_payload()
1312                 for cf in checksum_fields:
1313                     if hasattr(layer, cf):
1314                         if (
1315                             ignore_zero_udp_checksums
1316                             and 0 == getattr(layer, cf)
1317                             and layer.name in udp_layers
1318                         ):
1319                             continue
1320                         delattr(temp.getlayer(counter), cf)
1321                         checksums.append((counter, cf))
1322             else:
1323                 break
1324             counter = counter + 1
1325         if 0 == len(checksums):
1326             return
1327         temp = temp.__class__(scapy.compat.raw(temp))
1328         for layer, cf in checksums:
1329             calc_sum = getattr(temp[layer], cf)
1330             self.assert_equal(
1331                 getattr(received[layer], cf),
1332                 calc_sum,
1333                 "packet checksum on layer #%d: %s" % (layer, temp[layer].name),
1334             )
1335             self.logger.debug(
1336                 "Checksum field `%s` on `%s` layer has correct value `%s`"
1337                 % (cf, temp[layer].name, calc_sum)
1338             )
1339
1340     def assert_checksum_valid(
1341         self, received_packet, layer, field_name="chksum", ignore_zero_checksum=False
1342     ):
1343         """Check checksum of received packet on given layer"""
1344         received_packet_checksum = getattr(received_packet[layer], field_name)
1345         if ignore_zero_checksum and 0 == received_packet_checksum:
1346             return
1347         recalculated = received_packet.__class__(scapy.compat.raw(received_packet))
1348         delattr(recalculated[layer], field_name)
1349         recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
1350         self.assert_equal(
1351             received_packet_checksum,
1352             getattr(recalculated[layer], field_name),
1353             "packet checksum on layer: %s" % layer,
1354         )
1355
1356     def assert_ip_checksum_valid(self, received_packet, ignore_zero_checksum=False):
1357         self.assert_checksum_valid(
1358             received_packet, "IP", ignore_zero_checksum=ignore_zero_checksum
1359         )
1360
1361     def assert_tcp_checksum_valid(self, received_packet, ignore_zero_checksum=False):
1362         self.assert_checksum_valid(
1363             received_packet, "TCP", ignore_zero_checksum=ignore_zero_checksum
1364         )
1365
1366     def assert_udp_checksum_valid(self, received_packet, ignore_zero_checksum=True):
1367         self.assert_checksum_valid(
1368             received_packet, "UDP", ignore_zero_checksum=ignore_zero_checksum
1369         )
1370
1371     def assert_embedded_icmp_checksum_valid(self, received_packet):
1372         if received_packet.haslayer(IPerror):
1373             self.assert_checksum_valid(received_packet, "IPerror")
1374         if received_packet.haslayer(TCPerror):
1375             self.assert_checksum_valid(received_packet, "TCPerror")
1376         if received_packet.haslayer(UDPerror):
1377             self.assert_checksum_valid(
1378                 received_packet, "UDPerror", ignore_zero_checksum=True
1379             )
1380         if received_packet.haslayer(ICMPerror):
1381             self.assert_checksum_valid(received_packet, "ICMPerror")
1382
1383     def assert_icmp_checksum_valid(self, received_packet):
1384         self.assert_checksum_valid(received_packet, "ICMP")
1385         self.assert_embedded_icmp_checksum_valid(received_packet)
1386
1387     def assert_icmpv6_checksum_valid(self, pkt):
1388         if pkt.haslayer(ICMPv6DestUnreach):
1389             self.assert_checksum_valid(pkt, "ICMPv6DestUnreach", "cksum")
1390             self.assert_embedded_icmp_checksum_valid(pkt)
1391         if pkt.haslayer(ICMPv6EchoRequest):
1392             self.assert_checksum_valid(pkt, "ICMPv6EchoRequest", "cksum")
1393         if pkt.haslayer(ICMPv6EchoReply):
1394             self.assert_checksum_valid(pkt, "ICMPv6EchoReply", "cksum")
1395
1396     def get_counter(self, counter):
1397         if counter.startswith("/"):
1398             counter_value = self.statistics.get_counter(counter)
1399         else:
1400             counters = self.vapi.cli("sh errors").split("\n")
1401             counter_value = 0
1402             for i in range(1, len(counters) - 1):
1403                 results = counters[i].split()
1404                 if results[1] == counter:
1405                     counter_value = int(results[0])
1406                     break
1407         return counter_value
1408
1409     def assert_counter_equal(self, counter, expected_value, thread=None, index=0):
1410         c = self.get_counter(counter)
1411         if thread is not None:
1412             c = c[thread][index]
1413         else:
1414             c = sum(x[index] for x in c)
1415         self.assert_equal(c, expected_value, "counter `%s'" % counter)
1416
1417     def assert_packet_counter_equal(self, counter, expected_value):
1418         counter_value = self.get_counter(counter)
1419         self.assert_equal(
1420             counter_value, expected_value, "packet counter `%s'" % counter
1421         )
1422
1423     def assert_error_counter_equal(self, counter, expected_value):
1424         counter_value = self.statistics[counter].sum()
1425         self.assert_equal(counter_value, expected_value, "error counter `%s'" % counter)
1426
1427     @classmethod
1428     def sleep(cls, timeout, remark=None):
1429
1430         # /* Allow sleep(0) to maintain win32 semantics, and as decreed
1431         #  * by Guido, only the main thread can be interrupted.
1432         # */
1433         # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892  # noqa
1434         if timeout == 0:
1435             # yield quantum
1436             if hasattr(os, "sched_yield"):
1437                 os.sched_yield()
1438             else:
1439                 time.sleep(0)
1440             return
1441
1442         cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1443         before = time.time()
1444         time.sleep(timeout)
1445         after = time.time()
1446         if after - before > 2 * timeout:
1447             cls.logger.error(
1448                 "unexpected self.sleep() result - slept for %es instead of ~%es!",
1449                 after - before,
1450                 timeout,
1451             )
1452
1453         cls.logger.debug(
1454             "Finished sleep (%s) - slept %es (wanted %es)",
1455             remark,
1456             after - before,
1457             timeout,
1458         )
1459
1460     def virtual_sleep(self, timeout, remark=None):
1461         self.logger.debug("Moving VPP time by %s (%s)", timeout, remark)
1462         self.vapi.cli("set clock adjust %s" % timeout)
1463
1464     def pg_send(self, intf, pkts, worker=None, trace=True):
1465         intf.add_stream(pkts, worker=worker)
1466         self.pg_enable_capture(self.pg_interfaces)
1467         self.pg_start(trace=trace)
1468
1469     def snapshot_stats(self, stats_diff):
1470         """Return snapshot of interesting stats based on diff dictionary."""
1471         stats_snapshot = {}
1472         for sw_if_index in stats_diff:
1473             for counter in stats_diff[sw_if_index]:
1474                 stats_snapshot[counter] = self.statistics[counter]
1475         self.logger.debug(f"Took statistics stats_snapshot: {stats_snapshot}")
1476         return stats_snapshot
1477
1478     def compare_stats_with_snapshot(self, stats_diff, stats_snapshot):
1479         """Assert appropriate difference between current stats and snapshot."""
1480         for sw_if_index in stats_diff:
1481             for cntr, diff in stats_diff[sw_if_index].items():
1482                 if sw_if_index == "err":
1483                     self.assert_equal(
1484                         self.statistics[cntr].sum(),
1485                         stats_snapshot[cntr].sum() + diff,
1486                         f"'{cntr}' counter value (previous value: "
1487                         f"{stats_snapshot[cntr].sum()}, "
1488                         f"expected diff: {diff})",
1489                     )
1490                 else:
1491                     try:
1492                         self.assert_equal(
1493                             self.statistics[cntr][:, sw_if_index].sum(),
1494                             stats_snapshot[cntr][:, sw_if_index].sum() + diff,
1495                             f"'{cntr}' counter value (previous value: "
1496                             f"{stats_snapshot[cntr][:, sw_if_index].sum()}, "
1497                             f"expected diff: {diff})",
1498                         )
1499                     except IndexError:
1500                         # if diff is 0, then this most probably a case where
1501                         # test declares multiple interfaces but traffic hasn't
1502                         # passed through this one yet - which means the counter
1503                         # value is 0 and can be ignored
1504                         if 0 != diff:
1505                             raise
1506
1507     def send_and_assert_no_replies(
1508         self, intf, pkts, remark="", timeout=None, stats_diff=None, trace=True, msg=None
1509     ):
1510         if stats_diff:
1511             stats_snapshot = self.snapshot_stats(stats_diff)
1512
1513         self.pg_send(intf, pkts)
1514
1515         try:
1516             if not timeout:
1517                 timeout = 1
1518             for i in self.pg_interfaces:
1519                 i.assert_nothing_captured(timeout=timeout, remark=remark)
1520                 timeout = 0.1
1521         finally:
1522             if trace:
1523                 if msg:
1524                     self.logger.debug(f"send_and_assert_no_replies: {msg}")
1525                 self.logger.debug(self.vapi.cli("show trace"))
1526
1527         if stats_diff:
1528             self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1529
1530     def send_and_expect(
1531         self,
1532         intf,
1533         pkts,
1534         output,
1535         n_rx=None,
1536         worker=None,
1537         trace=True,
1538         msg=None,
1539         stats_diff=None,
1540     ):
1541         if stats_diff:
1542             stats_snapshot = self.snapshot_stats(stats_diff)
1543
1544         if not n_rx:
1545             n_rx = 1 if isinstance(pkts, Packet) else len(pkts)
1546         self.pg_send(intf, pkts, worker=worker, trace=trace)
1547         rx = output.get_capture(n_rx)
1548         if trace:
1549             if msg:
1550                 self.logger.debug(f"send_and_expect: {msg}")
1551             self.logger.debug(self.vapi.cli("show trace"))
1552
1553         if stats_diff:
1554             self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1555
1556         return rx
1557
1558     def send_and_expect_load_balancing(
1559         self, input, pkts, outputs, worker=None, trace=True
1560     ):
1561         self.pg_send(input, pkts, worker=worker, trace=trace)
1562         rxs = []
1563         for oo in outputs:
1564             rx = oo._get_capture(1)
1565             self.assertNotEqual(0, len(rx))
1566             rxs.append(rx)
1567         if trace:
1568             self.logger.debug(self.vapi.cli("show trace"))
1569         return rxs
1570
1571     def send_and_expect_some(self, intf, pkts, output, worker=None, trace=True):
1572         self.pg_send(intf, pkts, worker=worker, trace=trace)
1573         rx = output._get_capture(1)
1574         if trace:
1575             self.logger.debug(self.vapi.cli("show trace"))
1576         self.assertTrue(len(rx) > 0)
1577         self.assertTrue(len(rx) < len(pkts))
1578         return rx
1579
1580     def send_and_expect_only(self, intf, pkts, output, timeout=None, stats_diff=None):
1581         if stats_diff:
1582             stats_snapshot = self.snapshot_stats(stats_diff)
1583
1584         self.pg_send(intf, pkts)
1585         rx = output.get_capture(len(pkts))
1586         outputs = [output]
1587         if not timeout:
1588             timeout = 1
1589         for i in self.pg_interfaces:
1590             if i not in outputs:
1591                 i.assert_nothing_captured(timeout=timeout)
1592                 timeout = 0.1
1593
1594         if stats_diff:
1595             self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1596
1597         return rx
1598
1599
1600 def get_testcase_doc_name(test):
1601     return getdoc(test.__class__).splitlines()[0]
1602
1603
1604 def get_test_description(descriptions, test):
1605     short_description = test.shortDescription()
1606     if descriptions and short_description:
1607         return short_description
1608     else:
1609         return str(test)
1610
1611
1612 class TestCaseInfo(object):
1613     def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1614         self.logger = logger
1615         self.tempdir = tempdir
1616         self.vpp_pid = vpp_pid
1617         self.vpp_bin_path = vpp_bin_path
1618         self.core_crash_test = None
1619
1620
1621 class VppTestResult(unittest.TestResult):
1622     """
1623     @property result_string
1624      String variable to store the test case result string.
1625     @property errors
1626      List variable containing 2-tuples of TestCase instances and strings
1627      holding formatted tracebacks. Each tuple represents a test which
1628      raised an unexpected exception.
1629     @property failures
1630      List variable containing 2-tuples of TestCase instances and strings
1631      holding formatted tracebacks. Each tuple represents a test where
1632      a failure was explicitly signalled using the TestCase.assert*()
1633      methods.
1634     """
1635
1636     failed_test_cases_info = set()
1637     core_crash_test_cases_info = set()
1638     current_test_case_info = None
1639
1640     def __init__(self, stream=None, descriptions=None, verbosity=None, runner=None):
1641         """
1642         :param stream File descriptor to store where to report test results.
1643             Set to the standard error stream by default.
1644         :param descriptions Boolean variable to store information if to use
1645             test case descriptions.
1646         :param verbosity Integer variable to store required verbosity level.
1647         """
1648         super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1649         self.stream = stream
1650         self.descriptions = descriptions
1651         self.verbosity = verbosity
1652         self.result_string = None
1653         self.runner = runner
1654         self.printed = []
1655
1656     def addSuccess(self, test):
1657         """
1658         Record a test succeeded result
1659
1660         :param test:
1661
1662         """
1663         if self.current_test_case_info:
1664             self.current_test_case_info.logger.debug(
1665                 "--- addSuccess() %s.%s(%s) called"
1666                 % (test.__class__.__name__, test._testMethodName, test._testMethodDoc)
1667             )
1668         unittest.TestResult.addSuccess(self, test)
1669         self.result_string = colorize("OK", GREEN)
1670
1671         self.send_result_through_pipe(test, PASS)
1672
1673     def addSkip(self, test, reason):
1674         """
1675         Record a test skipped.
1676
1677         :param test:
1678         :param reason:
1679
1680         """
1681         if self.current_test_case_info:
1682             self.current_test_case_info.logger.debug(
1683                 "--- addSkip() %s.%s(%s) called, reason is %s"
1684                 % (
1685                     test.__class__.__name__,
1686                     test._testMethodName,
1687                     test._testMethodDoc,
1688                     reason,
1689                 )
1690             )
1691         unittest.TestResult.addSkip(self, test, reason)
1692         self.result_string = colorize("SKIP", YELLOW)
1693
1694         if reason == "not enough cpus":
1695             self.send_result_through_pipe(test, SKIP_CPU_SHORTAGE)
1696         else:
1697             self.send_result_through_pipe(test, SKIP)
1698
1699     def symlink_failed(self):
1700         if self.current_test_case_info:
1701             try:
1702                 failed_dir = config.failed_dir
1703                 link_path = os.path.join(
1704                     failed_dir,
1705                     "%s-FAILED" % os.path.basename(self.current_test_case_info.tempdir),
1706                 )
1707
1708                 self.current_test_case_info.logger.debug(
1709                     "creating a link to the failed test"
1710                 )
1711                 self.current_test_case_info.logger.debug(
1712                     "os.symlink(%s, %s)"
1713                     % (self.current_test_case_info.tempdir, link_path)
1714                 )
1715                 if os.path.exists(link_path):
1716                     self.current_test_case_info.logger.debug("symlink already exists")
1717                 else:
1718                     os.symlink(self.current_test_case_info.tempdir, link_path)
1719
1720             except Exception as e:
1721                 self.current_test_case_info.logger.error(e)
1722
1723     def send_result_through_pipe(self, test, result):
1724         if hasattr(self, "test_framework_result_pipe"):
1725             pipe = self.test_framework_result_pipe
1726             if pipe:
1727                 pipe.send((test.id(), result))
1728
1729     def log_error(self, test, err, fn_name):
1730         if self.current_test_case_info:
1731             if isinstance(test, unittest.suite._ErrorHolder):
1732                 test_name = test.description
1733             else:
1734                 test_name = "%s.%s(%s)" % (
1735                     test.__class__.__name__,
1736                     test._testMethodName,
1737                     test._testMethodDoc,
1738                 )
1739             self.current_test_case_info.logger.debug(
1740                 "--- %s() %s called, err is %s" % (fn_name, test_name, err)
1741             )
1742             self.current_test_case_info.logger.debug(
1743                 "formatted exception is:\n%s" % "".join(format_exception(*err))
1744             )
1745
1746     def add_error(self, test, err, unittest_fn, error_type):
1747         if error_type == FAIL:
1748             self.log_error(test, err, "addFailure")
1749             error_type_str = colorize("FAIL", RED)
1750         elif error_type == ERROR:
1751             self.log_error(test, err, "addError")
1752             error_type_str = colorize("ERROR", RED)
1753         else:
1754             raise Exception(
1755                 "Error type %s cannot be used to record an "
1756                 "error or a failure" % error_type
1757             )
1758
1759         unittest_fn(self, test, err)
1760         if self.current_test_case_info:
1761             self.result_string = "%s [ temp dir used by test case: %s ]" % (
1762                 error_type_str,
1763                 self.current_test_case_info.tempdir,
1764             )
1765             self.symlink_failed()
1766             self.failed_test_cases_info.add(self.current_test_case_info)
1767             if is_core_present(self.current_test_case_info.tempdir):
1768                 if not self.current_test_case_info.core_crash_test:
1769                     if isinstance(test, unittest.suite._ErrorHolder):
1770                         test_name = str(test)
1771                     else:
1772                         test_name = "'{!s}' ({!s})".format(
1773                             get_testcase_doc_name(test), test.id()
1774                         )
1775                     self.current_test_case_info.core_crash_test = test_name
1776                 self.core_crash_test_cases_info.add(self.current_test_case_info)
1777         else:
1778             self.result_string = "%s [no temp dir]" % error_type_str
1779
1780         self.send_result_through_pipe(test, error_type)
1781
1782     def addFailure(self, test, err):
1783         """
1784         Record a test failed result
1785
1786         :param test:
1787         :param err: error message
1788
1789         """
1790         self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1791
1792     def addError(self, test, err):
1793         """
1794         Record a test error result
1795
1796         :param test:
1797         :param err: error message
1798
1799         """
1800         self.add_error(test, err, unittest.TestResult.addError, ERROR)
1801
1802     def getDescription(self, test):
1803         """
1804         Get test description
1805
1806         :param test:
1807         :returns: test description
1808
1809         """
1810         return get_test_description(self.descriptions, test)
1811
1812     def startTest(self, test):
1813         """
1814         Start a test
1815
1816         :param test:
1817
1818         """
1819
1820         def print_header(test):
1821             if test.__class__ in self.printed:
1822                 return
1823
1824             test_doc = getdoc(test)
1825             if not test_doc:
1826                 raise Exception("No doc string for test '%s'" % test.id())
1827
1828             test_title = test_doc.splitlines()[0].rstrip()
1829             test_title = colorize(test_title, GREEN)
1830             if test.is_tagged_run_solo():
1831                 test_title = colorize(f"SOLO RUN: {test_title}", YELLOW)
1832
1833             # This block may overwrite the colorized title above,
1834             # but we want this to stand out and be fixed
1835             if test.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
1836                 test_title = colorize(f"FIXME with VPP workers: {test_title}", RED)
1837
1838             if test.has_tag(TestCaseTag.FIXME_ASAN):
1839                 test_title = colorize(f"FIXME with ASAN: {test_title}", RED)
1840                 test.skip_fixme_asan()
1841
1842             if is_distro_ubuntu2204 == True and test.has_tag(
1843                 TestCaseTag.FIXME_UBUNTU2204
1844             ):
1845                 test_title = colorize(f"FIXME on Ubuntu-22.04: {test_title}", RED)
1846                 test.skip_fixme_ubuntu2204()
1847
1848             if is_distro_debian11 == True and test.has_tag(TestCaseTag.FIXME_DEBIAN11):
1849                 test_title = colorize(f"FIXME on Debian-11: {test_title}", RED)
1850                 test.skip_fixme_debian11()
1851
1852             if "debug" in config.vpp_tag and test.has_tag(TestCaseTag.FIXME_VPP_DEBUG):
1853                 test_title = colorize(f"FIXME on VPP Debug: {test_title}", RED)
1854                 test.skip_fixme_vpp_debug()
1855
1856             if hasattr(test, "vpp_worker_count"):
1857                 if test.vpp_worker_count == 0:
1858                     test_title += " [main thread only]"
1859                 elif test.vpp_worker_count == 1:
1860                     test_title += " [1 worker thread]"
1861                 else:
1862                     test_title += f" [{test.vpp_worker_count} worker threads]"
1863
1864             if test.__class__.skipped_due_to_cpu_lack:
1865                 test_title = colorize(
1866                     f"{test_title} [skipped - not enough cpus, "
1867                     f"required={test.__class__.get_cpus_required()}, "
1868                     f"available={max_vpp_cpus}]",
1869                     YELLOW,
1870                 )
1871
1872             print(double_line_delim)
1873             print(test_title)
1874             print(double_line_delim)
1875             self.printed.append(test.__class__)
1876
1877         print_header(test)
1878         self.start_test = time.time()
1879         unittest.TestResult.startTest(self, test)
1880         if self.verbosity > 0:
1881             self.stream.writeln("Starting " + self.getDescription(test) + " ...")
1882             self.stream.writeln(single_line_delim)
1883
1884     def stopTest(self, test):
1885         """
1886         Called when the given test has been run
1887
1888         :param test:
1889
1890         """
1891         unittest.TestResult.stopTest(self, test)
1892
1893         if self.verbosity > 0:
1894             self.stream.writeln(single_line_delim)
1895             self.stream.writeln(
1896                 "%-73s%s" % (self.getDescription(test), self.result_string)
1897             )
1898             self.stream.writeln(single_line_delim)
1899         else:
1900             self.stream.writeln(
1901                 "%-68s %4.2f %s"
1902                 % (
1903                     self.getDescription(test),
1904                     time.time() - self.start_test,
1905                     self.result_string,
1906                 )
1907             )
1908
1909         self.send_result_through_pipe(test, TEST_RUN)
1910
1911     def printErrors(self):
1912         """
1913         Print errors from running the test case
1914         """
1915         if len(self.errors) > 0 or len(self.failures) > 0:
1916             self.stream.writeln()
1917             self.printErrorList("ERROR", self.errors)
1918             self.printErrorList("FAIL", self.failures)
1919
1920         # ^^ that is the last output from unittest before summary
1921         if not self.runner.print_summary:
1922             devnull = unittest.runner._WritelnDecorator(open(os.devnull, "w"))
1923             self.stream = devnull
1924             self.runner.stream = devnull
1925
1926     def printErrorList(self, flavour, errors):
1927         """
1928         Print error list to the output stream together with error type
1929         and test case description.
1930
1931         :param flavour: error type
1932         :param errors: iterable errors
1933
1934         """
1935         for test, err in errors:
1936             self.stream.writeln(double_line_delim)
1937             self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
1938             self.stream.writeln(single_line_delim)
1939             self.stream.writeln("%s" % err)
1940
1941
1942 class VppTestRunner(unittest.TextTestRunner):
1943     """
1944     A basic test runner implementation which prints results to standard error.
1945     """
1946
1947     @property
1948     def resultclass(self):
1949         """Class maintaining the results of the tests"""
1950         return VppTestResult
1951
1952     def __init__(
1953         self,
1954         keep_alive_pipe=None,
1955         descriptions=True,
1956         verbosity=1,
1957         result_pipe=None,
1958         failfast=False,
1959         buffer=False,
1960         resultclass=None,
1961         print_summary=True,
1962         **kwargs,
1963     ):
1964         # ignore stream setting here, use hard-coded stdout to be in sync
1965         # with prints from VppTestCase methods ...
1966         super(VppTestRunner, self).__init__(
1967             sys.stdout, descriptions, verbosity, failfast, buffer, resultclass, **kwargs
1968         )
1969         KeepAliveReporter.pipe = keep_alive_pipe
1970
1971         self.orig_stream = self.stream
1972         self.resultclass.test_framework_result_pipe = result_pipe
1973
1974         self.print_summary = print_summary
1975
1976     def _makeResult(self):
1977         return self.resultclass(self.stream, self.descriptions, self.verbosity, self)
1978
1979     def run(self, test):
1980         """
1981         Run the tests
1982
1983         :param test:
1984
1985         """
1986         faulthandler.enable()  # emit stack trace to stderr if killed by signal
1987
1988         result = super(VppTestRunner, self).run(test)
1989         if not self.print_summary:
1990             self.stream = self.orig_stream
1991             result.stream = self.orig_stream
1992         return result
1993
1994
1995 class Worker(Thread):
1996     def __init__(self, executable_args, logger, env=None, *args, **kwargs):
1997         super(Worker, self).__init__(*args, **kwargs)
1998         self.logger = logger
1999         self.args = executable_args
2000         if hasattr(self, "testcase") and self.testcase.debug_all:
2001             if self.testcase.debug_gdbserver:
2002                 self.args = [
2003                     "/usr/bin/gdbserver",
2004                     "localhost:{port}".format(port=self.testcase.gdbserver_port),
2005                 ] + args
2006             elif self.testcase.debug_gdb and hasattr(self, "wait_for_gdb"):
2007                 self.args.append(self.wait_for_gdb)
2008         self.app_bin = executable_args[0]
2009         self.app_name = os.path.basename(self.app_bin)
2010         if hasattr(self, "role"):
2011             self.app_name += " {role}".format(role=self.role)
2012         self.process = None
2013         self.result = None
2014         env = {} if env is None else env
2015         self.env = copy.deepcopy(env)
2016
2017     def wait_for_enter(self):
2018         if not hasattr(self, "testcase"):
2019             return
2020         if self.testcase.debug_all and self.testcase.debug_gdbserver:
2021             print()
2022             print(double_line_delim)
2023             print(
2024                 "Spawned GDB Server for '{app}' with PID: {pid}".format(
2025                     app=self.app_name, pid=self.process.pid
2026                 )
2027             )
2028         elif self.testcase.debug_all and self.testcase.debug_gdb:
2029             print()
2030             print(double_line_delim)
2031             print(
2032                 "Spawned '{app}' with PID: {pid}".format(
2033                     app=self.app_name, pid=self.process.pid
2034                 )
2035             )
2036         else:
2037             return
2038         print(single_line_delim)
2039         print("You can debug '{app}' using:".format(app=self.app_name))
2040         if self.testcase.debug_gdbserver:
2041             print(
2042                 "sudo gdb "
2043                 + self.app_bin
2044                 + " -ex 'target remote localhost:{port}'".format(
2045                     port=self.testcase.gdbserver_port
2046                 )
2047             )
2048             print(
2049                 "Now is the time to attach gdb by running the above "
2050                 "command, set up breakpoints etc., then resume from "
2051                 "within gdb by issuing the 'continue' command"
2052             )
2053             self.testcase.gdbserver_port += 1
2054         elif self.testcase.debug_gdb:
2055             print(
2056                 "sudo gdb "
2057                 + self.app_bin
2058                 + " -ex 'attach {pid}'".format(pid=self.process.pid)
2059             )
2060             print(
2061                 "Now is the time to attach gdb by running the above "
2062                 "command and set up breakpoints etc., then resume from"
2063                 " within gdb by issuing the 'continue' command"
2064             )
2065         print(single_line_delim)
2066         input("Press ENTER to continue running the testcase...")
2067
2068     def run(self):
2069         executable = self.args[0]
2070         if not os.path.exists(executable) or not os.access(
2071             executable, os.F_OK | os.X_OK
2072         ):
2073             # Exit code that means some system file did not exist,
2074             # could not be opened, or had some other kind of error.
2075             self.result = os.EX_OSFILE
2076             raise EnvironmentError(
2077                 "executable '%s' is not found or executable." % executable
2078             )
2079         self.logger.debug(
2080             "Running executable '{app}': '{cmd}'".format(
2081                 app=self.app_name, cmd=" ".join(self.args)
2082             )
2083         )
2084         env = os.environ.copy()
2085         env.update(self.env)
2086         env["CK_LOG_FILE_NAME"] = "-"
2087         self.process = subprocess.Popen(
2088             ["stdbuf", "-o0", "-e0"] + self.args,
2089             shell=False,
2090             env=env,
2091             preexec_fn=os.setpgrp,
2092             stdout=subprocess.PIPE,
2093             stderr=subprocess.PIPE,
2094         )
2095         self.wait_for_enter()
2096         out, err = self.process.communicate()
2097         self.logger.debug("Finished running `{app}'".format(app=self.app_name))
2098         self.logger.info("Return code is `%s'" % self.process.returncode)
2099         self.logger.info(single_line_delim)
2100         self.logger.info(
2101             "Executable `{app}' wrote to stdout:".format(app=self.app_name)
2102         )
2103         self.logger.info(single_line_delim)
2104         self.logger.info(out.decode("utf-8"))
2105         self.logger.info(single_line_delim)
2106         self.logger.info(
2107             "Executable `{app}' wrote to stderr:".format(app=self.app_name)
2108         )
2109         self.logger.info(single_line_delim)
2110         self.logger.info(err.decode("utf-8"))
2111         self.logger.info(single_line_delim)
2112         self.result = self.process.returncode
2113
2114
2115 if __name__ == "__main__":
2116     pass