tests: skip tests failing on ubuntu 22.04
[vpp.git] / test / framework.py
1 #!/usr/bin/env python3
2
3 from __future__ import print_function
4 import logging
5 import sys
6 import os
7 import select
8 import signal
9 import subprocess
10 import unittest
11 import re
12 import time
13 import faulthandler
14 import random
15 import copy
16 import platform
17 import shutil
18 from collections import deque
19 from threading import Thread, Event
20 from inspect import getdoc, isclass
21 from traceback import format_exception
22 from logging import FileHandler, DEBUG, Formatter
23 from enum import Enum
24 from abc import ABC, abstractmethod
25 from struct import pack, unpack
26
27 import scapy.compat
28 from scapy.packet import Raw, Packet
29 from config import config, available_cpus, num_cpus, max_vpp_cpus
30 import hook as hookmodule
31 from vpp_pg_interface import VppPGInterface
32 from vpp_sub_interface import VppSubInterface
33 from vpp_lo_interface import VppLoInterface
34 from vpp_bvi_interface import VppBviInterface
35 from vpp_papi_provider import VppPapiProvider
36 from vpp_papi import VppEnum
37 import vpp_papi
38 from vpp_papi.vpp_stats import VPPStats
39 from vpp_papi.vpp_transport_socket import VppTransportSocketIOError
40 from log import (
41     RED,
42     GREEN,
43     YELLOW,
44     double_line_delim,
45     single_line_delim,
46     get_logger,
47     colorize,
48 )
49 from vpp_object import VppObjectRegistry
50 from util import ppp, is_core_present
51 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
52 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
53 from scapy.layers.inet6 import ICMPv6EchoReply
54
55
56 logger = logging.getLogger(__name__)
57
58 # Set up an empty logger for the testcase that can be overridden as necessary
59 null_logger = logging.getLogger("VppTestCase")
60 null_logger.addHandler(logging.NullHandler())
61
62 PASS = 0
63 FAIL = 1
64 ERROR = 2
65 SKIP = 3
66 TEST_RUN = 4
67 SKIP_CPU_SHORTAGE = 5
68
69
70 if config.debug_framework:
71     import debug_internal
72
73 """
74   Test framework module.
75
76   The module provides a set of tools for constructing and running tests and
77   representing the results.
78 """
79
80
81 class VppDiedError(Exception):
82     """exception for reporting that the subprocess has died."""
83
84     signals_by_value = {
85         v: k
86         for k, v in signal.__dict__.items()
87         if k.startswith("SIG") and not k.startswith("SIG_")
88     }
89
90     def __init__(self, rv=None, testcase=None, method_name=None):
91         self.rv = rv
92         self.signal_name = None
93         self.testcase = testcase
94         self.method_name = method_name
95
96         try:
97             self.signal_name = VppDiedError.signals_by_value[-rv]
98         except (KeyError, TypeError):
99             pass
100
101         if testcase is None and method_name is None:
102             in_msg = ""
103         else:
104             in_msg = " while running %s.%s" % (testcase, method_name)
105
106         if self.rv:
107             msg = "VPP subprocess died unexpectedly%s with return code: %d%s." % (
108                 in_msg,
109                 self.rv,
110                 " [%s]" % (self.signal_name if self.signal_name is not None else ""),
111             )
112         else:
113             msg = "VPP subprocess died unexpectedly%s." % in_msg
114
115         super(VppDiedError, self).__init__(msg)
116
117
118 class _PacketInfo(object):
119     """Private class to create packet info object.
120
121     Help process information about the next packet.
122     Set variables to default values.
123     """
124
125     #: Store the index of the packet.
126     index = -1
127     #: Store the index of the source packet generator interface of the packet.
128     src = -1
129     #: Store the index of the destination packet generator interface
130     #: of the packet.
131     dst = -1
132     #: Store expected ip version
133     ip = -1
134     #: Store expected upper protocol
135     proto = -1
136     #: Store the copy of the former packet.
137     data = None
138
139     def __eq__(self, other):
140         index = self.index == other.index
141         src = self.src == other.src
142         dst = self.dst == other.dst
143         data = self.data == other.data
144         return index and src and dst and data
145
146
147 def pump_output(testclass):
148     """pump output from vpp stdout/stderr to proper queues"""
149     stdout_fragment = ""
150     stderr_fragment = ""
151     while not testclass.pump_thread_stop_flag.is_set():
152         readable = select.select(
153             [
154                 testclass.vpp.stdout.fileno(),
155                 testclass.vpp.stderr.fileno(),
156                 testclass.pump_thread_wakeup_pipe[0],
157             ],
158             [],
159             [],
160         )[0]
161         if testclass.vpp.stdout.fileno() in readable:
162             read = os.read(testclass.vpp.stdout.fileno(), 102400)
163             if len(read) > 0:
164                 split = read.decode("ascii", errors="backslashreplace").splitlines(True)
165                 if len(stdout_fragment) > 0:
166                     split[0] = "%s%s" % (stdout_fragment, split[0])
167                 if len(split) > 0 and split[-1].endswith("\n"):
168                     limit = None
169                 else:
170                     limit = -1
171                     stdout_fragment = split[-1]
172                 testclass.vpp_stdout_deque.extend(split[:limit])
173                 if not config.cache_vpp_output:
174                     for line in split[:limit]:
175                         testclass.logger.info("VPP STDOUT: %s" % line.rstrip("\n"))
176         if testclass.vpp.stderr.fileno() in readable:
177             read = os.read(testclass.vpp.stderr.fileno(), 102400)
178             if len(read) > 0:
179                 split = read.decode("ascii", errors="backslashreplace").splitlines(True)
180                 if len(stderr_fragment) > 0:
181                     split[0] = "%s%s" % (stderr_fragment, split[0])
182                 if len(split) > 0 and split[-1].endswith("\n"):
183                     limit = None
184                 else:
185                     limit = -1
186                     stderr_fragment = split[-1]
187
188                 testclass.vpp_stderr_deque.extend(split[:limit])
189                 if not config.cache_vpp_output:
190                     for line in split[:limit]:
191                         testclass.logger.error("VPP STDERR: %s" % line.rstrip("\n"))
192                         # ignoring the dummy pipe here intentionally - the
193                         # flag will take care of properly terminating the loop
194
195
196 def _is_platform_aarch64():
197     return platform.machine() == "aarch64"
198
199
200 is_platform_aarch64 = _is_platform_aarch64()
201
202
203 def _is_distro_ubuntu2204():
204     with open("/etc/os-release") as f:
205         for line in f.readlines():
206             if "jammy" in line:
207                 return True
208     return False
209
210
211 is_distro_ubuntu2204 = _is_distro_ubuntu2204()
212
213
214 class KeepAliveReporter(object):
215     """
216     Singleton object which reports test start to parent process
217     """
218
219     _shared_state = {}
220
221     def __init__(self):
222         self.__dict__ = self._shared_state
223         self._pipe = None
224
225     @property
226     def pipe(self):
227         return self._pipe
228
229     @pipe.setter
230     def pipe(self, pipe):
231         if self._pipe is not None:
232             raise Exception("Internal error - pipe should only be set once.")
233         self._pipe = pipe
234
235     def send_keep_alive(self, test, desc=None):
236         """
237         Write current test tmpdir & desc to keep-alive pipe to signal liveness
238         """
239         if self.pipe is None:
240             # if not running forked..
241             return
242
243         if isclass(test):
244             desc = "%s (%s)" % (desc, unittest.util.strclass(test))
245         else:
246             desc = test.id()
247
248         self.pipe.send((desc, config.vpp, test.tempdir, test.vpp.pid))
249
250
251 class TestCaseTag(Enum):
252     # marks the suites that must run at the end
253     # using only a single test runner
254     RUN_SOLO = 1
255     # marks the suites broken on VPP multi-worker
256     FIXME_VPP_WORKERS = 2
257     # marks the suites broken when ASan is enabled
258     FIXME_ASAN = 3
259     # marks suites broken on Ubuntu-22.04
260     FIXME_UBUNTU2204 = 4
261
262
263 def create_tag_decorator(e):
264     def decorator(cls):
265         try:
266             cls.test_tags.append(e)
267         except AttributeError:
268             cls.test_tags = [e]
269         return cls
270
271     return decorator
272
273
274 tag_run_solo = create_tag_decorator(TestCaseTag.RUN_SOLO)
275 tag_fixme_vpp_workers = create_tag_decorator(TestCaseTag.FIXME_VPP_WORKERS)
276 tag_fixme_asan = create_tag_decorator(TestCaseTag.FIXME_ASAN)
277 tag_fixme_ubuntu2204 = create_tag_decorator(TestCaseTag.FIXME_UBUNTU2204)
278
279
280 class DummyVpp:
281     returncode = None
282     pid = 0xCAFEBAFE
283
284     def poll(self):
285         pass
286
287     def terminate(self):
288         pass
289
290
291 class CPUInterface(ABC):
292     cpus = []
293     skipped_due_to_cpu_lack = False
294
295     @classmethod
296     @abstractmethod
297     def get_cpus_required(cls):
298         pass
299
300     @classmethod
301     def assign_cpus(cls, cpus):
302         cls.cpus = cpus
303
304
305 class VppTestCase(CPUInterface, unittest.TestCase):
306     """This subclass is a base class for VPP test cases that are implemented as
307     classes. It provides methods to create and run test case.
308     """
309
310     extra_vpp_statseg_config = ""
311     extra_vpp_punt_config = []
312     extra_vpp_plugin_config = []
313     logger = null_logger
314     vapi_response_timeout = 5
315     remove_configured_vpp_objects_on_tear_down = True
316
317     @property
318     def packet_infos(self):
319         """List of packet infos"""
320         return self._packet_infos
321
322     @classmethod
323     def get_packet_count_for_if_idx(cls, dst_if_index):
324         """Get the number of packet info for specified destination if index"""
325         if dst_if_index in cls._packet_count_for_dst_if_idx:
326             return cls._packet_count_for_dst_if_idx[dst_if_index]
327         else:
328             return 0
329
330     @classmethod
331     def has_tag(cls, tag):
332         """if the test case has a given tag - return true"""
333         try:
334             return tag in cls.test_tags
335         except AttributeError:
336             pass
337         return False
338
339     @classmethod
340     def is_tagged_run_solo(cls):
341         """if the test case class is timing-sensitive - return true"""
342         return cls.has_tag(TestCaseTag.RUN_SOLO)
343
344     @classmethod
345     def skip_fixme_asan(cls):
346         """if @tag_fixme_asan & ASan is enabled - mark for skip"""
347         if cls.has_tag(TestCaseTag.FIXME_ASAN):
348             vpp_extra_cmake_args = os.environ.get("VPP_EXTRA_CMAKE_ARGS", "")
349             if "DVPP_ENABLE_SANITIZE_ADDR=ON" in vpp_extra_cmake_args:
350                 cls = unittest.skip("Skipping @tag_fixme_asan tests")(cls)
351
352     @classmethod
353     def skip_fixme_ubuntu2204(cls):
354         """if distro is ubuntu 22.04 and @tag_fixme_ubuntu2204 mark for skip"""
355         if cls.has_tag(TestCaseTag.FIXME_UBUNTU2204):
356             cls = unittest.skip("Skipping @tag_fixme_ubuntu2204 tests")(cls)
357
358     @classmethod
359     def instance(cls):
360         """Return the instance of this testcase"""
361         return cls.test_instance
362
363     @classmethod
364     def set_debug_flags(cls, d):
365         cls.gdbserver_port = 7777
366         cls.debug_core = False
367         cls.debug_gdb = False
368         cls.debug_gdbserver = False
369         cls.debug_all = False
370         cls.debug_attach = False
371         if d is None:
372             return
373         dl = d.lower()
374         if dl == "core":
375             cls.debug_core = True
376         elif dl == "gdb" or dl == "gdb-all":
377             cls.debug_gdb = True
378         elif dl == "gdbserver" or dl == "gdbserver-all":
379             cls.debug_gdbserver = True
380         elif dl == "attach":
381             cls.debug_attach = True
382         else:
383             raise Exception("Unrecognized DEBUG option: '%s'" % d)
384         if dl == "gdb-all" or dl == "gdbserver-all":
385             cls.debug_all = True
386
387     @classmethod
388     def get_vpp_worker_count(cls):
389         if not hasattr(cls, "vpp_worker_count"):
390             if cls.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
391                 cls.vpp_worker_count = 0
392             else:
393                 cls.vpp_worker_count = config.vpp_worker_count
394         return cls.vpp_worker_count
395
396     @classmethod
397     def get_cpus_required(cls):
398         return 1 + cls.get_vpp_worker_count()
399
400     @classmethod
401     def setUpConstants(cls):
402         """Set-up the test case class based on environment variables"""
403         cls.step = config.step
404         cls.plugin_path = ":".join(config.vpp_plugin_dir)
405         cls.test_plugin_path = ":".join(config.vpp_test_plugin_dir)
406         cls.extern_plugin_path = ":".join(config.extern_plugin_dir)
407         debug_cli = ""
408         if cls.step or cls.debug_gdb or cls.debug_gdbserver:
409             debug_cli = "cli-listen localhost:5002"
410         size = re.search(r"\d+[gG]", config.coredump_size)
411         if size:
412             coredump_size = f"coredump-size {config.coredump_size}".lower()
413         else:
414             coredump_size = "coredump-size unlimited"
415         default_variant = config.variant
416         if default_variant is not None:
417             default_variant = "default { variant %s 100 }" % default_variant
418         else:
419             default_variant = ""
420
421         api_fuzzing = config.api_fuzz
422         if api_fuzzing is None:
423             api_fuzzing = "off"
424
425         cls.vpp_cmdline = [
426             config.vpp,
427             "unix",
428             "{",
429             "nodaemon",
430             debug_cli,
431             "full-coredump",
432             coredump_size,
433             "runtime-dir",
434             cls.tempdir,
435             "}",
436             "api-trace",
437             "{",
438             "on",
439             "}",
440             "api-segment",
441             "{",
442             "prefix",
443             cls.get_api_segment_prefix(),
444             "}",
445             "cpu",
446             "{",
447             "main-core",
448             str(cls.cpus[0]),
449         ]
450         if cls.extern_plugin_path not in (None, ""):
451             cls.extra_vpp_plugin_config.append("add-path %s" % cls.extern_plugin_path)
452         if cls.get_vpp_worker_count():
453             cls.vpp_cmdline.extend(
454                 ["corelist-workers", ",".join([str(x) for x in cls.cpus[1:]])]
455             )
456         cls.vpp_cmdline.extend(
457             [
458                 "}",
459                 "physmem",
460                 "{",
461                 "max-size",
462                 "32m",
463                 "}",
464                 "statseg",
465                 "{",
466                 "socket-name",
467                 cls.get_stats_sock_path(),
468                 cls.extra_vpp_statseg_config,
469                 "}",
470                 "socksvr",
471                 "{",
472                 "socket-name",
473                 cls.get_api_sock_path(),
474                 "}",
475                 "node { ",
476                 default_variant,
477                 "}",
478                 "api-fuzz {",
479                 api_fuzzing,
480                 "}",
481                 "plugins",
482                 "{",
483                 "plugin",
484                 "dpdk_plugin.so",
485                 "{",
486                 "disable",
487                 "}",
488                 "plugin",
489                 "rdma_plugin.so",
490                 "{",
491                 "disable",
492                 "}",
493                 "plugin",
494                 "lisp_unittest_plugin.so",
495                 "{",
496                 "enable",
497                 "}",
498                 "plugin",
499                 "unittest_plugin.so",
500                 "{",
501                 "enable",
502                 "}",
503             ]
504             + cls.extra_vpp_plugin_config
505             + [
506                 "}",
507             ]
508         )
509
510         if cls.extra_vpp_punt_config is not None:
511             cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
512
513         if not cls.debug_attach:
514             cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
515             cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
516
517     @classmethod
518     def wait_for_enter(cls):
519         if cls.debug_gdbserver:
520             print(double_line_delim)
521             print("Spawned GDB server with PID: %d" % cls.vpp.pid)
522         elif cls.debug_gdb:
523             print(double_line_delim)
524             print("Spawned VPP with PID: %d" % cls.vpp.pid)
525         else:
526             cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
527             return
528         print(single_line_delim)
529         print("You can debug VPP using:")
530         if cls.debug_gdbserver:
531             print(
532                 f"sudo gdb {config.vpp} "
533                 f"-ex 'target remote localhost:{cls.gdbserver_port}'"
534             )
535             print(
536                 "Now is the time to attach gdb by running the above "
537                 "command, set up breakpoints etc., then resume VPP from "
538                 "within gdb by issuing the 'continue' command"
539             )
540             cls.gdbserver_port += 1
541         elif cls.debug_gdb:
542             print(f"sudo gdb {config.vpp} -ex 'attach {cls.vpp.pid}'")
543             print(
544                 "Now is the time to attach gdb by running the above "
545                 "command and set up breakpoints etc., then resume VPP from"
546                 " within gdb by issuing the 'continue' command"
547             )
548         print(single_line_delim)
549         input("Press ENTER to continue running the testcase...")
550
551     @classmethod
552     def attach_vpp(cls):
553         cls.vpp = DummyVpp()
554
555     @classmethod
556     def run_vpp(cls):
557         cls.logger.debug(f"Assigned cpus: {cls.cpus}")
558         cmdline = cls.vpp_cmdline
559
560         if cls.debug_gdbserver:
561             gdbserver = "/usr/bin/gdbserver"
562             if not os.path.isfile(gdbserver) or not os.access(gdbserver, os.X_OK):
563                 raise Exception(
564                     "gdbserver binary '%s' does not exist or is "
565                     "not executable" % gdbserver
566                 )
567
568             cmdline = [
569                 gdbserver,
570                 "localhost:{port}".format(port=cls.gdbserver_port),
571             ] + cls.vpp_cmdline
572             cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
573
574         try:
575             cls.vpp = subprocess.Popen(
576                 cmdline, stdout=subprocess.PIPE, stderr=subprocess.PIPE
577             )
578         except subprocess.CalledProcessError as e:
579             cls.logger.critical(
580                 "Subprocess returned with non-0 return code: (%s)", e.returncode
581             )
582             raise
583         except OSError as e:
584             cls.logger.critical(
585                 "Subprocess returned with OS error: (%s) %s", e.errno, e.strerror
586             )
587             raise
588         except Exception as e:
589             cls.logger.exception("Subprocess returned unexpected from %s:", cmdline)
590             raise
591
592         cls.wait_for_enter()
593
594     @classmethod
595     def wait_for_coredump(cls):
596         corefile = cls.tempdir + "/core"
597         if os.path.isfile(corefile):
598             cls.logger.error("Waiting for coredump to complete: %s", corefile)
599             curr_size = os.path.getsize(corefile)
600             deadline = time.time() + 60
601             ok = False
602             while time.time() < deadline:
603                 cls.sleep(1)
604                 size = curr_size
605                 curr_size = os.path.getsize(corefile)
606                 if size == curr_size:
607                     ok = True
608                     break
609             if not ok:
610                 cls.logger.error(
611                     "Timed out waiting for coredump to complete: %s", corefile
612                 )
613             else:
614                 cls.logger.error("Coredump complete: %s, size %d", corefile, curr_size)
615
616     @classmethod
617     def get_stats_sock_path(cls):
618         return "%s/stats.sock" % cls.tempdir
619
620     @classmethod
621     def get_api_sock_path(cls):
622         return "%s/api.sock" % cls.tempdir
623
624     @classmethod
625     def get_api_segment_prefix(cls):
626         return os.path.basename(cls.tempdir)  # Only used for VAPI
627
628     @classmethod
629     def get_tempdir(cls):
630         if cls.debug_attach:
631             tmpdir = f"{config.tmp_dir}/unittest-attach-gdb"
632         else:
633             tmpdir = f"{config.tmp_dir}/vpp-unittest-{cls.__name__}"
634             if config.wipe_tmp_dir:
635                 shutil.rmtree(tmpdir, ignore_errors=True)
636             os.mkdir(tmpdir)
637         return tmpdir
638
639     @classmethod
640     def create_file_handler(cls):
641         if config.log_dir is None:
642             cls.file_handler = FileHandler(f"{cls.tempdir}/log.txt")
643             return
644
645         logdir = f"{config.log_dir}/vpp-unittest-{cls.__name__}"
646         if config.wipe_tmp_dir:
647             shutil.rmtree(logdir, ignore_errors=True)
648         os.mkdir(logdir)
649         cls.file_handler = FileHandler(f"{logdir}/log.txt")
650
651     @classmethod
652     def setUpClass(cls):
653         """
654         Perform class setup before running the testcase
655         Remove shared memory files, start vpp and connect the vpp-api
656         """
657         super(VppTestCase, cls).setUpClass()
658         cls.logger = get_logger(cls.__name__)
659         random.seed(config.rnd_seed)
660         if hasattr(cls, "parallel_handler"):
661             cls.logger.addHandler(cls.parallel_handler)
662             cls.logger.propagate = False
663         cls.set_debug_flags(config.debug)
664         cls.tempdir = cls.get_tempdir()
665         cls.create_file_handler()
666         cls.file_handler.setFormatter(
667             Formatter(fmt="%(asctime)s,%(msecs)03d %(message)s", datefmt="%H:%M:%S")
668         )
669         cls.file_handler.setLevel(DEBUG)
670         cls.logger.addHandler(cls.file_handler)
671         cls.logger.debug("--- setUpClass() for %s called ---" % cls.__name__)
672         os.chdir(cls.tempdir)
673         cls.logger.info(
674             "Temporary dir is %s, api socket is %s",
675             cls.tempdir,
676             cls.get_api_sock_path(),
677         )
678         cls.logger.debug("Random seed is %s", config.rnd_seed)
679         cls.setUpConstants()
680         cls.reset_packet_infos()
681         cls._pcaps = []
682         cls._old_pcaps = []
683         cls.verbose = 0
684         cls.vpp_dead = False
685         cls.registry = VppObjectRegistry()
686         cls.vpp_startup_failed = False
687         cls.reporter = KeepAliveReporter()
688         # need to catch exceptions here because if we raise, then the cleanup
689         # doesn't get called and we might end with a zombie vpp
690         try:
691             if cls.debug_attach:
692                 cls.attach_vpp()
693             else:
694                 cls.run_vpp()
695             cls.reporter.send_keep_alive(cls, "setUpClass")
696             VppTestResult.current_test_case_info = TestCaseInfo(
697                 cls.logger, cls.tempdir, cls.vpp.pid, config.vpp
698             )
699             cls.vpp_stdout_deque = deque()
700             cls.vpp_stderr_deque = deque()
701             if not cls.debug_attach:
702                 cls.pump_thread_stop_flag = Event()
703                 cls.pump_thread_wakeup_pipe = os.pipe()
704                 cls.pump_thread = Thread(target=pump_output, args=(cls,))
705                 cls.pump_thread.daemon = True
706                 cls.pump_thread.start()
707             if cls.debug_gdb or cls.debug_gdbserver or cls.debug_attach:
708                 cls.vapi_response_timeout = 0
709             cls.vapi = VppPapiProvider(cls.__name__, cls, cls.vapi_response_timeout)
710             if cls.step:
711                 hook = hookmodule.StepHook(cls)
712             else:
713                 hook = hookmodule.PollHook(cls)
714             cls.vapi.register_hook(hook)
715             cls.statistics = VPPStats(socketname=cls.get_stats_sock_path())
716             try:
717                 hook.poll_vpp()
718             except VppDiedError:
719                 cls.vpp_startup_failed = True
720                 cls.logger.critical(
721                     "VPP died shortly after startup, check the"
722                     " output to standard error for possible cause"
723                 )
724                 raise
725             try:
726                 cls.vapi.connect()
727             except (vpp_papi.VPPIOError, Exception) as e:
728                 cls.logger.debug("Exception connecting to vapi: %s" % e)
729                 cls.vapi.disconnect()
730
731                 if cls.debug_gdbserver:
732                     print(
733                         colorize(
734                             "You're running VPP inside gdbserver but "
735                             "VPP-API connection failed, did you forget "
736                             "to 'continue' VPP from within gdb?",
737                             RED,
738                         )
739                     )
740                 raise e
741             if cls.debug_attach:
742                 last_line = cls.vapi.cli("show thread").split("\n")[-2]
743                 cls.vpp_worker_count = int(last_line.split(" ")[0])
744                 print("Detected VPP with %s workers." % cls.vpp_worker_count)
745         except vpp_papi.VPPRuntimeError as e:
746             cls.logger.debug("%s" % e)
747             cls.quit()
748             raise e
749         except Exception as e:
750             cls.logger.debug("Exception connecting to VPP: %s" % e)
751             cls.quit()
752             raise e
753
754     @classmethod
755     def _debug_quit(cls):
756         if cls.debug_gdbserver or cls.debug_gdb:
757             try:
758                 cls.vpp.poll()
759
760                 if cls.vpp.returncode is None:
761                     print()
762                     print(double_line_delim)
763                     print("VPP or GDB server is still running")
764                     print(single_line_delim)
765                     input(
766                         "When done debugging, press ENTER to kill the "
767                         "process and finish running the testcase..."
768                     )
769             except AttributeError:
770                 pass
771
772     @classmethod
773     def quit(cls):
774         """
775         Disconnect vpp-api, kill vpp and cleanup shared memory files
776         """
777         cls._debug_quit()
778
779         # first signal that we want to stop the pump thread, then wake it up
780         if hasattr(cls, "pump_thread_stop_flag"):
781             cls.pump_thread_stop_flag.set()
782         if hasattr(cls, "pump_thread_wakeup_pipe"):
783             os.write(cls.pump_thread_wakeup_pipe[1], b"ding dong wake up")
784         if hasattr(cls, "pump_thread"):
785             cls.logger.debug("Waiting for pump thread to stop")
786             cls.pump_thread.join()
787         if hasattr(cls, "vpp_stderr_reader_thread"):
788             cls.logger.debug("Waiting for stderr pump to stop")
789             cls.vpp_stderr_reader_thread.join()
790
791         if hasattr(cls, "vpp"):
792             if hasattr(cls, "vapi"):
793                 cls.logger.debug(cls.vapi.vpp.get_stats())
794                 cls.logger.debug("Disconnecting class vapi client on %s", cls.__name__)
795                 cls.vapi.disconnect()
796                 cls.logger.debug("Deleting class vapi attribute on %s", cls.__name__)
797                 del cls.vapi
798             cls.vpp.poll()
799             if not cls.debug_attach and cls.vpp.returncode is None:
800                 cls.wait_for_coredump()
801                 cls.logger.debug("Sending TERM to vpp")
802                 cls.vpp.terminate()
803                 cls.logger.debug("Waiting for vpp to die")
804                 try:
805                     outs, errs = cls.vpp.communicate(timeout=5)
806                 except subprocess.TimeoutExpired:
807                     cls.vpp.kill()
808                     outs, errs = cls.vpp.communicate()
809             cls.logger.debug("Deleting class vpp attribute on %s", cls.__name__)
810             if not cls.debug_attach:
811                 cls.vpp.stdout.close()
812                 cls.vpp.stderr.close()
813             del cls.vpp
814
815         if cls.vpp_startup_failed:
816             stdout_log = cls.logger.info
817             stderr_log = cls.logger.critical
818         else:
819             stdout_log = cls.logger.info
820             stderr_log = cls.logger.info
821
822         if hasattr(cls, "vpp_stdout_deque"):
823             stdout_log(single_line_delim)
824             stdout_log("VPP output to stdout while running %s:", cls.__name__)
825             stdout_log(single_line_delim)
826             vpp_output = "".join(cls.vpp_stdout_deque)
827             with open(cls.tempdir + "/vpp_stdout.txt", "w") as f:
828                 f.write(vpp_output)
829             stdout_log("\n%s", vpp_output)
830             stdout_log(single_line_delim)
831
832         if hasattr(cls, "vpp_stderr_deque"):
833             stderr_log(single_line_delim)
834             stderr_log("VPP output to stderr while running %s:", cls.__name__)
835             stderr_log(single_line_delim)
836             vpp_output = "".join(cls.vpp_stderr_deque)
837             with open(cls.tempdir + "/vpp_stderr.txt", "w") as f:
838                 f.write(vpp_output)
839             stderr_log("\n%s", vpp_output)
840             stderr_log(single_line_delim)
841
842     @classmethod
843     def tearDownClass(cls):
844         """Perform final cleanup after running all tests in this test-case"""
845         cls.logger.debug("--- tearDownClass() for %s called ---" % cls.__name__)
846         cls.reporter.send_keep_alive(cls, "tearDownClass")
847         cls.quit()
848         cls.file_handler.close()
849         cls.reset_packet_infos()
850         if config.debug_framework:
851             debug_internal.on_tear_down_class(cls)
852
853     def show_commands_at_teardown(self):
854         """Allow subclass specific teardown logging additions."""
855         self.logger.info("--- No test specific show commands provided. ---")
856
857     def tearDown(self):
858         """Show various debug prints after each test"""
859         self.logger.debug(
860             "--- tearDown() for %s.%s(%s) called ---"
861             % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
862         )
863
864         try:
865             if not self.vpp_dead:
866                 self.logger.debug(self.vapi.cli("show trace max 1000"))
867                 self.logger.info(self.vapi.ppcli("show interface"))
868                 self.logger.info(self.vapi.ppcli("show hardware"))
869                 self.logger.info(self.statistics.set_errors_str())
870                 self.logger.info(self.vapi.ppcli("show run"))
871                 self.logger.info(self.vapi.ppcli("show log"))
872                 self.logger.info(self.vapi.ppcli("show bihash"))
873                 self.logger.info("Logging testcase specific show commands.")
874                 self.show_commands_at_teardown()
875                 if self.remove_configured_vpp_objects_on_tear_down:
876                     self.registry.remove_vpp_config(self.logger)
877             # Save/Dump VPP api trace log
878             m = self._testMethodName
879             api_trace = "vpp_api_trace.%s.%d.log" % (m, self.vpp.pid)
880             tmp_api_trace = "/tmp/%s" % api_trace
881             vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
882             self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
883             self.logger.info("Moving %s to %s\n" % (tmp_api_trace, vpp_api_trace_log))
884             os.rename(tmp_api_trace, vpp_api_trace_log)
885         except VppTransportSocketIOError:
886             self.logger.debug(
887                 "VppTransportSocketIOError: Vpp dead. Cannot log show commands."
888             )
889             self.vpp_dead = True
890         else:
891             self.registry.unregister_all(self.logger)
892
893     def setUp(self):
894         """Clear trace before running each test"""
895         super(VppTestCase, self).setUp()
896         self.reporter.send_keep_alive(self)
897         if self.vpp_dead:
898             raise VppDiedError(
899                 rv=None,
900                 testcase=self.__class__.__name__,
901                 method_name=self._testMethodName,
902             )
903         self.sleep(0.1, "during setUp")
904         self.vpp_stdout_deque.append(
905             "--- test setUp() for %s.%s(%s) starts here ---\n"
906             % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
907         )
908         self.vpp_stderr_deque.append(
909             "--- test setUp() for %s.%s(%s) starts here ---\n"
910             % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
911         )
912         self.vapi.cli("clear trace")
913         # store the test instance inside the test class - so that objects
914         # holding the class can access instance methods (like assertEqual)
915         type(self).test_instance = self
916
917     @classmethod
918     def pg_enable_capture(cls, interfaces=None):
919         """
920         Enable capture on packet-generator interfaces
921
922         :param interfaces: iterable interface indexes (if None,
923                            use self.pg_interfaces)
924
925         """
926         if interfaces is None:
927             interfaces = cls.pg_interfaces
928         for i in interfaces:
929             i.enable_capture()
930
931     @classmethod
932     def register_pcap(cls, intf, worker):
933         """Register a pcap in the testclass"""
934         # add to the list of captures with current timestamp
935         cls._pcaps.append((intf, worker))
936
937     @classmethod
938     def get_vpp_time(cls):
939         # processes e.g. "Time now 2.190522, Wed, 11 Mar 2020 17:29:54 GMT"
940         # returns float("2.190522")
941         timestr = cls.vapi.cli("show clock")
942         head, sep, tail = timestr.partition(",")
943         head, sep, tail = head.partition("Time now")
944         return float(tail)
945
946     @classmethod
947     def sleep_on_vpp_time(cls, sec):
948         """Sleep according to time in VPP world"""
949         # On a busy system with many processes
950         # we might end up with VPP time being slower than real world
951         # So take that into account when waiting for VPP to do something
952         start_time = cls.get_vpp_time()
953         while cls.get_vpp_time() - start_time < sec:
954             cls.sleep(0.1)
955
956     @classmethod
957     def pg_start(cls, trace=True):
958         """Enable the PG, wait till it is done, then clean up"""
959         for (intf, worker) in cls._old_pcaps:
960             intf.handle_old_pcap_file(intf.get_in_path(worker), intf.in_history_counter)
961         cls._old_pcaps = []
962         if trace:
963             cls.vapi.cli("clear trace")
964             cls.vapi.cli("trace add pg-input 1000")
965         cls.vapi.cli("packet-generator enable")
966         # PG, when starts, runs to completion -
967         # so let's avoid a race condition,
968         # and wait a little till it's done.
969         # Then clean it up  - and then be gone.
970         deadline = time.time() + 300
971         while cls.vapi.cli("show packet-generator").find("Yes") != -1:
972             cls.sleep(0.01)  # yield
973             if time.time() > deadline:
974                 cls.logger.error("Timeout waiting for pg to stop")
975                 break
976         for intf, worker in cls._pcaps:
977             cls.vapi.cli("packet-generator delete %s" % intf.get_cap_name(worker))
978         cls._old_pcaps = cls._pcaps
979         cls._pcaps = []
980
981     @classmethod
982     def create_pg_interfaces_internal(cls, interfaces, gso=0, gso_size=0, mode=None):
983         """
984         Create packet-generator interfaces.
985
986         :param interfaces: iterable indexes of the interfaces.
987         :returns: List of created interfaces.
988
989         """
990         result = []
991         for i in interfaces:
992             intf = VppPGInterface(cls, i, gso, gso_size, mode)
993             setattr(cls, intf.name, intf)
994             result.append(intf)
995         cls.pg_interfaces = result
996         return result
997
998     @classmethod
999     def create_pg_ip4_interfaces(cls, interfaces, gso=0, gso_size=0):
1000         pgmode = VppEnum.vl_api_pg_interface_mode_t
1001         return cls.create_pg_interfaces_internal(
1002             interfaces, gso, gso_size, pgmode.PG_API_MODE_IP4
1003         )
1004
1005     @classmethod
1006     def create_pg_ip6_interfaces(cls, interfaces, gso=0, gso_size=0):
1007         pgmode = VppEnum.vl_api_pg_interface_mode_t
1008         return cls.create_pg_interfaces_internal(
1009             interfaces, gso, gso_size, pgmode.PG_API_MODE_IP6
1010         )
1011
1012     @classmethod
1013     def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0):
1014         pgmode = VppEnum.vl_api_pg_interface_mode_t
1015         return cls.create_pg_interfaces_internal(
1016             interfaces, gso, gso_size, pgmode.PG_API_MODE_ETHERNET
1017         )
1018
1019     @classmethod
1020     def create_pg_ethernet_interfaces(cls, interfaces, gso=0, gso_size=0):
1021         pgmode = VppEnum.vl_api_pg_interface_mode_t
1022         return cls.create_pg_interfaces_internal(
1023             interfaces, gso, gso_size, pgmode.PG_API_MODE_ETHERNET
1024         )
1025
1026     @classmethod
1027     def create_loopback_interfaces(cls, count):
1028         """
1029         Create loopback interfaces.
1030
1031         :param count: number of interfaces created.
1032         :returns: List of created interfaces.
1033         """
1034         result = [VppLoInterface(cls) for i in range(count)]
1035         for intf in result:
1036             setattr(cls, intf.name, intf)
1037         cls.lo_interfaces = result
1038         return result
1039
1040     @classmethod
1041     def create_bvi_interfaces(cls, count):
1042         """
1043         Create BVI interfaces.
1044
1045         :param count: number of interfaces created.
1046         :returns: List of created interfaces.
1047         """
1048         result = [VppBviInterface(cls) for i in range(count)]
1049         for intf in result:
1050             setattr(cls, intf.name, intf)
1051         cls.bvi_interfaces = result
1052         return result
1053
1054     @staticmethod
1055     def extend_packet(packet, size, padding=" "):
1056         """
1057         Extend packet to given size by padding with spaces or custom padding
1058         NOTE: Currently works only when Raw layer is present.
1059
1060         :param packet: packet
1061         :param size: target size
1062         :param padding: padding used to extend the payload
1063
1064         """
1065         packet_len = len(packet) + 4
1066         extend = size - packet_len
1067         if extend > 0:
1068             num = (extend // len(padding)) + 1
1069             packet[Raw].load += (padding * num)[:extend].encode("ascii")
1070
1071     @classmethod
1072     def reset_packet_infos(cls):
1073         """Reset the list of packet info objects and packet counts to zero"""
1074         cls._packet_infos = {}
1075         cls._packet_count_for_dst_if_idx = {}
1076
1077     @classmethod
1078     def create_packet_info(cls, src_if, dst_if):
1079         """
1080         Create packet info object containing the source and destination indexes
1081         and add it to the testcase's packet info list
1082
1083         :param VppInterface src_if: source interface
1084         :param VppInterface dst_if: destination interface
1085
1086         :returns: _PacketInfo object
1087
1088         """
1089         info = _PacketInfo()
1090         info.index = len(cls._packet_infos)
1091         info.src = src_if.sw_if_index
1092         info.dst = dst_if.sw_if_index
1093         if isinstance(dst_if, VppSubInterface):
1094             dst_idx = dst_if.parent.sw_if_index
1095         else:
1096             dst_idx = dst_if.sw_if_index
1097         if dst_idx in cls._packet_count_for_dst_if_idx:
1098             cls._packet_count_for_dst_if_idx[dst_idx] += 1
1099         else:
1100             cls._packet_count_for_dst_if_idx[dst_idx] = 1
1101         cls._packet_infos[info.index] = info
1102         return info
1103
1104     @staticmethod
1105     def info_to_payload(info):
1106         """
1107         Convert _PacketInfo object to packet payload
1108
1109         :param info: _PacketInfo object
1110
1111         :returns: string containing serialized data from packet info
1112         """
1113
1114         # retrieve payload, currently 18 bytes (4 x ints + 1 short)
1115         return pack("iiiih", info.index, info.src, info.dst, info.ip, info.proto)
1116
1117     @staticmethod
1118     def payload_to_info(payload, payload_field="load"):
1119         """
1120         Convert packet payload to _PacketInfo object
1121
1122         :param payload: packet payload
1123         :type payload:  <class 'scapy.packet.Raw'>
1124         :param payload_field: packet fieldname of payload "load" for
1125                 <class 'scapy.packet.Raw'>
1126         :type payload_field: str
1127         :returns: _PacketInfo object containing de-serialized data from payload
1128
1129         """
1130
1131         # retrieve payload, currently 18 bytes (4 x ints + 1 short)
1132         payload_b = getattr(payload, payload_field)[:18]
1133
1134         info = _PacketInfo()
1135         info.index, info.src, info.dst, info.ip, info.proto = unpack("iiiih", payload_b)
1136
1137         # some SRv6 TCs depend on get an exception if bad values are detected
1138         if info.index > 0x4000:
1139             raise ValueError("Index value is invalid")
1140
1141         return info
1142
1143     def get_next_packet_info(self, info):
1144         """
1145         Iterate over the packet info list stored in the testcase
1146         Start iteration with first element if info is None
1147         Continue based on index in info if info is specified
1148
1149         :param info: info or None
1150         :returns: next info in list or None if no more infos
1151         """
1152         if info is None:
1153             next_index = 0
1154         else:
1155             next_index = info.index + 1
1156         if next_index == len(self._packet_infos):
1157             return None
1158         else:
1159             return self._packet_infos[next_index]
1160
1161     def get_next_packet_info_for_interface(self, src_index, info):
1162         """
1163         Search the packet info list for the next packet info with same source
1164         interface index
1165
1166         :param src_index: source interface index to search for
1167         :param info: packet info - where to start the search
1168         :returns: packet info or None
1169
1170         """
1171         while True:
1172             info = self.get_next_packet_info(info)
1173             if info is None:
1174                 return None
1175             if info.src == src_index:
1176                 return info
1177
1178     def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
1179         """
1180         Search the packet info list for the next packet info with same source
1181         and destination interface indexes
1182
1183         :param src_index: source interface index to search for
1184         :param dst_index: destination interface index to search for
1185         :param info: packet info - where to start the search
1186         :returns: packet info or None
1187
1188         """
1189         while True:
1190             info = self.get_next_packet_info_for_interface(src_index, info)
1191             if info is None:
1192                 return None
1193             if info.dst == dst_index:
1194                 return info
1195
1196     def assert_equal(self, real_value, expected_value, name_or_class=None):
1197         if name_or_class is None:
1198             self.assertEqual(real_value, expected_value)
1199             return
1200         try:
1201             msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
1202             msg = msg % (
1203                 getdoc(name_or_class).strip(),
1204                 real_value,
1205                 str(name_or_class(real_value)),
1206                 expected_value,
1207                 str(name_or_class(expected_value)),
1208             )
1209         except Exception:
1210             msg = "Invalid %s: %s does not match expected value %s" % (
1211                 name_or_class,
1212                 real_value,
1213                 expected_value,
1214             )
1215
1216         self.assertEqual(real_value, expected_value, msg)
1217
1218     def assert_in_range(self, real_value, expected_min, expected_max, name=None):
1219         if name is None:
1220             msg = None
1221         else:
1222             msg = "Invalid %s: %s out of range <%s,%s>" % (
1223                 name,
1224                 real_value,
1225                 expected_min,
1226                 expected_max,
1227             )
1228         self.assertTrue(expected_min <= real_value <= expected_max, msg)
1229
1230     def assert_packet_checksums_valid(self, packet, ignore_zero_udp_checksums=True):
1231         received = packet.__class__(scapy.compat.raw(packet))
1232         udp_layers = ["UDP", "UDPerror"]
1233         checksum_fields = ["cksum", "chksum"]
1234         checksums = []
1235         counter = 0
1236         temp = received.__class__(scapy.compat.raw(received))
1237         while True:
1238             layer = temp.getlayer(counter)
1239             if layer:
1240                 layer = layer.copy()
1241                 layer.remove_payload()
1242                 for cf in checksum_fields:
1243                     if hasattr(layer, cf):
1244                         if (
1245                             ignore_zero_udp_checksums
1246                             and 0 == getattr(layer, cf)
1247                             and layer.name in udp_layers
1248                         ):
1249                             continue
1250                         delattr(temp.getlayer(counter), cf)
1251                         checksums.append((counter, cf))
1252             else:
1253                 break
1254             counter = counter + 1
1255         if 0 == len(checksums):
1256             return
1257         temp = temp.__class__(scapy.compat.raw(temp))
1258         for layer, cf in checksums:
1259             calc_sum = getattr(temp[layer], cf)
1260             self.assert_equal(
1261                 getattr(received[layer], cf),
1262                 calc_sum,
1263                 "packet checksum on layer #%d: %s" % (layer, temp[layer].name),
1264             )
1265             self.logger.debug(
1266                 "Checksum field `%s` on `%s` layer has correct value `%s`"
1267                 % (cf, temp[layer].name, calc_sum)
1268             )
1269
1270     def assert_checksum_valid(
1271         self, received_packet, layer, field_name="chksum", ignore_zero_checksum=False
1272     ):
1273         """Check checksum of received packet on given layer"""
1274         received_packet_checksum = getattr(received_packet[layer], field_name)
1275         if ignore_zero_checksum and 0 == received_packet_checksum:
1276             return
1277         recalculated = received_packet.__class__(scapy.compat.raw(received_packet))
1278         delattr(recalculated[layer], field_name)
1279         recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
1280         self.assert_equal(
1281             received_packet_checksum,
1282             getattr(recalculated[layer], field_name),
1283             "packet checksum on layer: %s" % layer,
1284         )
1285
1286     def assert_ip_checksum_valid(self, received_packet, ignore_zero_checksum=False):
1287         self.assert_checksum_valid(
1288             received_packet, "IP", ignore_zero_checksum=ignore_zero_checksum
1289         )
1290
1291     def assert_tcp_checksum_valid(self, received_packet, ignore_zero_checksum=False):
1292         self.assert_checksum_valid(
1293             received_packet, "TCP", ignore_zero_checksum=ignore_zero_checksum
1294         )
1295
1296     def assert_udp_checksum_valid(self, received_packet, ignore_zero_checksum=True):
1297         self.assert_checksum_valid(
1298             received_packet, "UDP", ignore_zero_checksum=ignore_zero_checksum
1299         )
1300
1301     def assert_embedded_icmp_checksum_valid(self, received_packet):
1302         if received_packet.haslayer(IPerror):
1303             self.assert_checksum_valid(received_packet, "IPerror")
1304         if received_packet.haslayer(TCPerror):
1305             self.assert_checksum_valid(received_packet, "TCPerror")
1306         if received_packet.haslayer(UDPerror):
1307             self.assert_checksum_valid(
1308                 received_packet, "UDPerror", ignore_zero_checksum=True
1309             )
1310         if received_packet.haslayer(ICMPerror):
1311             self.assert_checksum_valid(received_packet, "ICMPerror")
1312
1313     def assert_icmp_checksum_valid(self, received_packet):
1314         self.assert_checksum_valid(received_packet, "ICMP")
1315         self.assert_embedded_icmp_checksum_valid(received_packet)
1316
1317     def assert_icmpv6_checksum_valid(self, pkt):
1318         if pkt.haslayer(ICMPv6DestUnreach):
1319             self.assert_checksum_valid(pkt, "ICMPv6DestUnreach", "cksum")
1320             self.assert_embedded_icmp_checksum_valid(pkt)
1321         if pkt.haslayer(ICMPv6EchoRequest):
1322             self.assert_checksum_valid(pkt, "ICMPv6EchoRequest", "cksum")
1323         if pkt.haslayer(ICMPv6EchoReply):
1324             self.assert_checksum_valid(pkt, "ICMPv6EchoReply", "cksum")
1325
1326     def get_counter(self, counter):
1327         if counter.startswith("/"):
1328             counter_value = self.statistics.get_counter(counter)
1329         else:
1330             counters = self.vapi.cli("sh errors").split("\n")
1331             counter_value = 0
1332             for i in range(1, len(counters) - 1):
1333                 results = counters[i].split()
1334                 if results[1] == counter:
1335                     counter_value = int(results[0])
1336                     break
1337         return counter_value
1338
1339     def assert_counter_equal(self, counter, expected_value, thread=None, index=0):
1340         c = self.get_counter(counter)
1341         if thread is not None:
1342             c = c[thread][index]
1343         else:
1344             c = sum(x[index] for x in c)
1345         self.assert_equal(c, expected_value, "counter `%s'" % counter)
1346
1347     def assert_packet_counter_equal(self, counter, expected_value):
1348         counter_value = self.get_counter(counter)
1349         self.assert_equal(
1350             counter_value, expected_value, "packet counter `%s'" % counter
1351         )
1352
1353     def assert_error_counter_equal(self, counter, expected_value):
1354         counter_value = self.statistics[counter].sum()
1355         self.assert_equal(counter_value, expected_value, "error counter `%s'" % counter)
1356
1357     @classmethod
1358     def sleep(cls, timeout, remark=None):
1359
1360         # /* Allow sleep(0) to maintain win32 semantics, and as decreed
1361         #  * by Guido, only the main thread can be interrupted.
1362         # */
1363         # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892  # noqa
1364         if timeout == 0:
1365             # yield quantum
1366             if hasattr(os, "sched_yield"):
1367                 os.sched_yield()
1368             else:
1369                 time.sleep(0)
1370             return
1371
1372         cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1373         before = time.time()
1374         time.sleep(timeout)
1375         after = time.time()
1376         if after - before > 2 * timeout:
1377             cls.logger.error(
1378                 "unexpected self.sleep() result - slept for %es instead of ~%es!",
1379                 after - before,
1380                 timeout,
1381             )
1382
1383         cls.logger.debug(
1384             "Finished sleep (%s) - slept %es (wanted %es)",
1385             remark,
1386             after - before,
1387             timeout,
1388         )
1389
1390     def virtual_sleep(self, timeout, remark=None):
1391         self.logger.debug("Moving VPP time by %s (%s)", timeout, remark)
1392         self.vapi.cli("set clock adjust %s" % timeout)
1393
1394     def pg_send(self, intf, pkts, worker=None, trace=True):
1395         intf.add_stream(pkts, worker=worker)
1396         self.pg_enable_capture(self.pg_interfaces)
1397         self.pg_start(trace=trace)
1398
1399     def snapshot_stats(self, stats_diff):
1400         """Return snapshot of interesting stats based on diff dictionary."""
1401         stats_snapshot = {}
1402         for sw_if_index in stats_diff:
1403             for counter in stats_diff[sw_if_index]:
1404                 stats_snapshot[counter] = self.statistics[counter]
1405         self.logger.debug(f"Took statistics stats_snapshot: {stats_snapshot}")
1406         return stats_snapshot
1407
1408     def compare_stats_with_snapshot(self, stats_diff, stats_snapshot):
1409         """Assert appropriate difference between current stats and snapshot."""
1410         for sw_if_index in stats_diff:
1411             for cntr, diff in stats_diff[sw_if_index].items():
1412                 if sw_if_index == "err":
1413                     self.assert_equal(
1414                         self.statistics[cntr].sum(),
1415                         stats_snapshot[cntr].sum() + diff,
1416                         f"'{cntr}' counter value (previous value: "
1417                         f"{stats_snapshot[cntr].sum()}, "
1418                         f"expected diff: {diff})",
1419                     )
1420                 else:
1421                     try:
1422                         self.assert_equal(
1423                             self.statistics[cntr][:, sw_if_index].sum(),
1424                             stats_snapshot[cntr][:, sw_if_index].sum() + diff,
1425                             f"'{cntr}' counter value (previous value: "
1426                             f"{stats_snapshot[cntr][:, sw_if_index].sum()}, "
1427                             f"expected diff: {diff})",
1428                         )
1429                     except IndexError:
1430                         # if diff is 0, then this most probably a case where
1431                         # test declares multiple interfaces but traffic hasn't
1432                         # passed through this one yet - which means the counter
1433                         # value is 0 and can be ignored
1434                         if 0 != diff:
1435                             raise
1436
1437     def send_and_assert_no_replies(
1438         self, intf, pkts, remark="", timeout=None, stats_diff=None, trace=True, msg=None
1439     ):
1440         if stats_diff:
1441             stats_snapshot = self.snapshot_stats(stats_diff)
1442
1443         self.pg_send(intf, pkts)
1444
1445         try:
1446             if not timeout:
1447                 timeout = 1
1448             for i in self.pg_interfaces:
1449                 i.assert_nothing_captured(timeout=timeout, remark=remark)
1450                 timeout = 0.1
1451         finally:
1452             if trace:
1453                 if msg:
1454                     self.logger.debug(f"send_and_assert_no_replies: {msg}")
1455                 self.logger.debug(self.vapi.cli("show trace"))
1456
1457         if stats_diff:
1458             self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1459
1460     def send_and_expect(
1461         self,
1462         intf,
1463         pkts,
1464         output,
1465         n_rx=None,
1466         worker=None,
1467         trace=True,
1468         msg=None,
1469         stats_diff=None,
1470     ):
1471         if stats_diff:
1472             stats_snapshot = self.snapshot_stats(stats_diff)
1473
1474         if not n_rx:
1475             n_rx = 1 if isinstance(pkts, Packet) else len(pkts)
1476         self.pg_send(intf, pkts, worker=worker, trace=trace)
1477         rx = output.get_capture(n_rx)
1478         if trace:
1479             if msg:
1480                 self.logger.debug(f"send_and_expect: {msg}")
1481             self.logger.debug(self.vapi.cli("show trace"))
1482
1483         if stats_diff:
1484             self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1485
1486         return rx
1487
1488     def send_and_expect_load_balancing(
1489         self, input, pkts, outputs, worker=None, trace=True
1490     ):
1491         self.pg_send(input, pkts, worker=worker, trace=trace)
1492         rxs = []
1493         for oo in outputs:
1494             rx = oo._get_capture(1)
1495             self.assertNotEqual(0, len(rx))
1496             rxs.append(rx)
1497         if trace:
1498             self.logger.debug(self.vapi.cli("show trace"))
1499         return rxs
1500
1501     def send_and_expect_some(self, intf, pkts, output, worker=None, trace=True):
1502         self.pg_send(intf, pkts, worker=worker, trace=trace)
1503         rx = output._get_capture(1)
1504         if trace:
1505             self.logger.debug(self.vapi.cli("show trace"))
1506         self.assertTrue(len(rx) > 0)
1507         self.assertTrue(len(rx) < len(pkts))
1508         return rx
1509
1510     def send_and_expect_only(self, intf, pkts, output, timeout=None, stats_diff=None):
1511         if stats_diff:
1512             stats_snapshot = self.snapshot_stats(stats_diff)
1513
1514         self.pg_send(intf, pkts)
1515         rx = output.get_capture(len(pkts))
1516         outputs = [output]
1517         if not timeout:
1518             timeout = 1
1519         for i in self.pg_interfaces:
1520             if i not in outputs:
1521                 i.assert_nothing_captured(timeout=timeout)
1522                 timeout = 0.1
1523
1524         if stats_diff:
1525             self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1526
1527         return rx
1528
1529
1530 def get_testcase_doc_name(test):
1531     return getdoc(test.__class__).splitlines()[0]
1532
1533
1534 def get_test_description(descriptions, test):
1535     short_description = test.shortDescription()
1536     if descriptions and short_description:
1537         return short_description
1538     else:
1539         return str(test)
1540
1541
1542 class TestCaseInfo(object):
1543     def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1544         self.logger = logger
1545         self.tempdir = tempdir
1546         self.vpp_pid = vpp_pid
1547         self.vpp_bin_path = vpp_bin_path
1548         self.core_crash_test = None
1549
1550
1551 class VppTestResult(unittest.TestResult):
1552     """
1553     @property result_string
1554      String variable to store the test case result string.
1555     @property errors
1556      List variable containing 2-tuples of TestCase instances and strings
1557      holding formatted tracebacks. Each tuple represents a test which
1558      raised an unexpected exception.
1559     @property failures
1560      List variable containing 2-tuples of TestCase instances and strings
1561      holding formatted tracebacks. Each tuple represents a test where
1562      a failure was explicitly signalled using the TestCase.assert*()
1563      methods.
1564     """
1565
1566     failed_test_cases_info = set()
1567     core_crash_test_cases_info = set()
1568     current_test_case_info = None
1569
1570     def __init__(self, stream=None, descriptions=None, verbosity=None, runner=None):
1571         """
1572         :param stream File descriptor to store where to report test results.
1573             Set to the standard error stream by default.
1574         :param descriptions Boolean variable to store information if to use
1575             test case descriptions.
1576         :param verbosity Integer variable to store required verbosity level.
1577         """
1578         super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1579         self.stream = stream
1580         self.descriptions = descriptions
1581         self.verbosity = verbosity
1582         self.result_string = None
1583         self.runner = runner
1584         self.printed = []
1585
1586     def addSuccess(self, test):
1587         """
1588         Record a test succeeded result
1589
1590         :param test:
1591
1592         """
1593         if self.current_test_case_info:
1594             self.current_test_case_info.logger.debug(
1595                 "--- addSuccess() %s.%s(%s) called"
1596                 % (test.__class__.__name__, test._testMethodName, test._testMethodDoc)
1597             )
1598         unittest.TestResult.addSuccess(self, test)
1599         self.result_string = colorize("OK", GREEN)
1600
1601         self.send_result_through_pipe(test, PASS)
1602
1603     def addSkip(self, test, reason):
1604         """
1605         Record a test skipped.
1606
1607         :param test:
1608         :param reason:
1609
1610         """
1611         if self.current_test_case_info:
1612             self.current_test_case_info.logger.debug(
1613                 "--- addSkip() %s.%s(%s) called, reason is %s"
1614                 % (
1615                     test.__class__.__name__,
1616                     test._testMethodName,
1617                     test._testMethodDoc,
1618                     reason,
1619                 )
1620             )
1621         unittest.TestResult.addSkip(self, test, reason)
1622         self.result_string = colorize("SKIP", YELLOW)
1623
1624         if reason == "not enough cpus":
1625             self.send_result_through_pipe(test, SKIP_CPU_SHORTAGE)
1626         else:
1627             self.send_result_through_pipe(test, SKIP)
1628
1629     def symlink_failed(self):
1630         if self.current_test_case_info:
1631             try:
1632                 failed_dir = config.failed_dir
1633                 link_path = os.path.join(
1634                     failed_dir,
1635                     "%s-FAILED" % os.path.basename(self.current_test_case_info.tempdir),
1636                 )
1637
1638                 self.current_test_case_info.logger.debug(
1639                     "creating a link to the failed test"
1640                 )
1641                 self.current_test_case_info.logger.debug(
1642                     "os.symlink(%s, %s)"
1643                     % (self.current_test_case_info.tempdir, link_path)
1644                 )
1645                 if os.path.exists(link_path):
1646                     self.current_test_case_info.logger.debug("symlink already exists")
1647                 else:
1648                     os.symlink(self.current_test_case_info.tempdir, link_path)
1649
1650             except Exception as e:
1651                 self.current_test_case_info.logger.error(e)
1652
1653     def send_result_through_pipe(self, test, result):
1654         if hasattr(self, "test_framework_result_pipe"):
1655             pipe = self.test_framework_result_pipe
1656             if pipe:
1657                 pipe.send((test.id(), result))
1658
1659     def log_error(self, test, err, fn_name):
1660         if self.current_test_case_info:
1661             if isinstance(test, unittest.suite._ErrorHolder):
1662                 test_name = test.description
1663             else:
1664                 test_name = "%s.%s(%s)" % (
1665                     test.__class__.__name__,
1666                     test._testMethodName,
1667                     test._testMethodDoc,
1668                 )
1669             self.current_test_case_info.logger.debug(
1670                 "--- %s() %s called, err is %s" % (fn_name, test_name, err)
1671             )
1672             self.current_test_case_info.logger.debug(
1673                 "formatted exception is:\n%s" % "".join(format_exception(*err))
1674             )
1675
1676     def add_error(self, test, err, unittest_fn, error_type):
1677         if error_type == FAIL:
1678             self.log_error(test, err, "addFailure")
1679             error_type_str = colorize("FAIL", RED)
1680         elif error_type == ERROR:
1681             self.log_error(test, err, "addError")
1682             error_type_str = colorize("ERROR", RED)
1683         else:
1684             raise Exception(
1685                 "Error type %s cannot be used to record an "
1686                 "error or a failure" % error_type
1687             )
1688
1689         unittest_fn(self, test, err)
1690         if self.current_test_case_info:
1691             self.result_string = "%s [ temp dir used by test case: %s ]" % (
1692                 error_type_str,
1693                 self.current_test_case_info.tempdir,
1694             )
1695             self.symlink_failed()
1696             self.failed_test_cases_info.add(self.current_test_case_info)
1697             if is_core_present(self.current_test_case_info.tempdir):
1698                 if not self.current_test_case_info.core_crash_test:
1699                     if isinstance(test, unittest.suite._ErrorHolder):
1700                         test_name = str(test)
1701                     else:
1702                         test_name = "'{!s}' ({!s})".format(
1703                             get_testcase_doc_name(test), test.id()
1704                         )
1705                     self.current_test_case_info.core_crash_test = test_name
1706                 self.core_crash_test_cases_info.add(self.current_test_case_info)
1707         else:
1708             self.result_string = "%s [no temp dir]" % error_type_str
1709
1710         self.send_result_through_pipe(test, error_type)
1711
1712     def addFailure(self, test, err):
1713         """
1714         Record a test failed result
1715
1716         :param test:
1717         :param err: error message
1718
1719         """
1720         self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1721
1722     def addError(self, test, err):
1723         """
1724         Record a test error result
1725
1726         :param test:
1727         :param err: error message
1728
1729         """
1730         self.add_error(test, err, unittest.TestResult.addError, ERROR)
1731
1732     def getDescription(self, test):
1733         """
1734         Get test description
1735
1736         :param test:
1737         :returns: test description
1738
1739         """
1740         return get_test_description(self.descriptions, test)
1741
1742     def startTest(self, test):
1743         """
1744         Start a test
1745
1746         :param test:
1747
1748         """
1749
1750         def print_header(test):
1751             if test.__class__ in self.printed:
1752                 return
1753
1754             test_doc = getdoc(test)
1755             if not test_doc:
1756                 raise Exception("No doc string for test '%s'" % test.id())
1757
1758             test_title = test_doc.splitlines()[0].rstrip()
1759             test_title = colorize(test_title, GREEN)
1760             if test.is_tagged_run_solo():
1761                 test_title = colorize(f"SOLO RUN: {test_title}", YELLOW)
1762
1763             # This block may overwrite the colorized title above,
1764             # but we want this to stand out and be fixed
1765             if test.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
1766                 test_title = colorize(f"FIXME with VPP workers: {test_title}", RED)
1767
1768             if test.has_tag(TestCaseTag.FIXME_ASAN):
1769                 test_title = colorize(f"FIXME with ASAN: {test_title}", RED)
1770                 test.skip_fixme_asan()
1771
1772             if is_distro_ubuntu2204 == True and test.has_tag(
1773                 TestCaseTag.FIXME_UBUNTU2204
1774             ):
1775                 test_title = colorize(f"FIXME on Ubuntu-22.04: {test_title}", RED)
1776                 test.skip_fixme_ubuntu2204()
1777
1778             if hasattr(test, "vpp_worker_count"):
1779                 if test.vpp_worker_count == 0:
1780                     test_title += " [main thread only]"
1781                 elif test.vpp_worker_count == 1:
1782                     test_title += " [1 worker thread]"
1783                 else:
1784                     test_title += f" [{test.vpp_worker_count} worker threads]"
1785
1786             if test.__class__.skipped_due_to_cpu_lack:
1787                 test_title = colorize(
1788                     f"{test_title} [skipped - not enough cpus, "
1789                     f"required={test.__class__.get_cpus_required()}, "
1790                     f"available={max_vpp_cpus}]",
1791                     YELLOW,
1792                 )
1793
1794             print(double_line_delim)
1795             print(test_title)
1796             print(double_line_delim)
1797             self.printed.append(test.__class__)
1798
1799         print_header(test)
1800         self.start_test = time.time()
1801         unittest.TestResult.startTest(self, test)
1802         if self.verbosity > 0:
1803             self.stream.writeln("Starting " + self.getDescription(test) + " ...")
1804             self.stream.writeln(single_line_delim)
1805
1806     def stopTest(self, test):
1807         """
1808         Called when the given test has been run
1809
1810         :param test:
1811
1812         """
1813         unittest.TestResult.stopTest(self, test)
1814
1815         if self.verbosity > 0:
1816             self.stream.writeln(single_line_delim)
1817             self.stream.writeln(
1818                 "%-73s%s" % (self.getDescription(test), self.result_string)
1819             )
1820             self.stream.writeln(single_line_delim)
1821         else:
1822             self.stream.writeln(
1823                 "%-68s %4.2f %s"
1824                 % (
1825                     self.getDescription(test),
1826                     time.time() - self.start_test,
1827                     self.result_string,
1828                 )
1829             )
1830
1831         self.send_result_through_pipe(test, TEST_RUN)
1832
1833     def printErrors(self):
1834         """
1835         Print errors from running the test case
1836         """
1837         if len(self.errors) > 0 or len(self.failures) > 0:
1838             self.stream.writeln()
1839             self.printErrorList("ERROR", self.errors)
1840             self.printErrorList("FAIL", self.failures)
1841
1842         # ^^ that is the last output from unittest before summary
1843         if not self.runner.print_summary:
1844             devnull = unittest.runner._WritelnDecorator(open(os.devnull, "w"))
1845             self.stream = devnull
1846             self.runner.stream = devnull
1847
1848     def printErrorList(self, flavour, errors):
1849         """
1850         Print error list to the output stream together with error type
1851         and test case description.
1852
1853         :param flavour: error type
1854         :param errors: iterable errors
1855
1856         """
1857         for test, err in errors:
1858             self.stream.writeln(double_line_delim)
1859             self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
1860             self.stream.writeln(single_line_delim)
1861             self.stream.writeln("%s" % err)
1862
1863
1864 class VppTestRunner(unittest.TextTestRunner):
1865     """
1866     A basic test runner implementation which prints results to standard error.
1867     """
1868
1869     @property
1870     def resultclass(self):
1871         """Class maintaining the results of the tests"""
1872         return VppTestResult
1873
1874     def __init__(
1875         self,
1876         keep_alive_pipe=None,
1877         descriptions=True,
1878         verbosity=1,
1879         result_pipe=None,
1880         failfast=False,
1881         buffer=False,
1882         resultclass=None,
1883         print_summary=True,
1884         **kwargs,
1885     ):
1886         # ignore stream setting here, use hard-coded stdout to be in sync
1887         # with prints from VppTestCase methods ...
1888         super(VppTestRunner, self).__init__(
1889             sys.stdout, descriptions, verbosity, failfast, buffer, resultclass, **kwargs
1890         )
1891         KeepAliveReporter.pipe = keep_alive_pipe
1892
1893         self.orig_stream = self.stream
1894         self.resultclass.test_framework_result_pipe = result_pipe
1895
1896         self.print_summary = print_summary
1897
1898     def _makeResult(self):
1899         return self.resultclass(self.stream, self.descriptions, self.verbosity, self)
1900
1901     def run(self, test):
1902         """
1903         Run the tests
1904
1905         :param test:
1906
1907         """
1908         faulthandler.enable()  # emit stack trace to stderr if killed by signal
1909
1910         result = super(VppTestRunner, self).run(test)
1911         if not self.print_summary:
1912             self.stream = self.orig_stream
1913             result.stream = self.orig_stream
1914         return result
1915
1916
1917 class Worker(Thread):
1918     def __init__(self, executable_args, logger, env=None, *args, **kwargs):
1919         super(Worker, self).__init__(*args, **kwargs)
1920         self.logger = logger
1921         self.args = executable_args
1922         if hasattr(self, "testcase") and self.testcase.debug_all:
1923             if self.testcase.debug_gdbserver:
1924                 self.args = [
1925                     "/usr/bin/gdbserver",
1926                     "localhost:{port}".format(port=self.testcase.gdbserver_port),
1927                 ] + args
1928             elif self.testcase.debug_gdb and hasattr(self, "wait_for_gdb"):
1929                 self.args.append(self.wait_for_gdb)
1930         self.app_bin = executable_args[0]
1931         self.app_name = os.path.basename(self.app_bin)
1932         if hasattr(self, "role"):
1933             self.app_name += " {role}".format(role=self.role)
1934         self.process = None
1935         self.result = None
1936         env = {} if env is None else env
1937         self.env = copy.deepcopy(env)
1938
1939     def wait_for_enter(self):
1940         if not hasattr(self, "testcase"):
1941             return
1942         if self.testcase.debug_all and self.testcase.debug_gdbserver:
1943             print()
1944             print(double_line_delim)
1945             print(
1946                 "Spawned GDB Server for '{app}' with PID: {pid}".format(
1947                     app=self.app_name, pid=self.process.pid
1948                 )
1949             )
1950         elif self.testcase.debug_all and self.testcase.debug_gdb:
1951             print()
1952             print(double_line_delim)
1953             print(
1954                 "Spawned '{app}' with PID: {pid}".format(
1955                     app=self.app_name, pid=self.process.pid
1956                 )
1957             )
1958         else:
1959             return
1960         print(single_line_delim)
1961         print("You can debug '{app}' using:".format(app=self.app_name))
1962         if self.testcase.debug_gdbserver:
1963             print(
1964                 "sudo gdb "
1965                 + self.app_bin
1966                 + " -ex 'target remote localhost:{port}'".format(
1967                     port=self.testcase.gdbserver_port
1968                 )
1969             )
1970             print(
1971                 "Now is the time to attach gdb by running the above "
1972                 "command, set up breakpoints etc., then resume from "
1973                 "within gdb by issuing the 'continue' command"
1974             )
1975             self.testcase.gdbserver_port += 1
1976         elif self.testcase.debug_gdb:
1977             print(
1978                 "sudo gdb "
1979                 + self.app_bin
1980                 + " -ex 'attach {pid}'".format(pid=self.process.pid)
1981             )
1982             print(
1983                 "Now is the time to attach gdb by running the above "
1984                 "command and set up breakpoints etc., then resume from"
1985                 " within gdb by issuing the 'continue' command"
1986             )
1987         print(single_line_delim)
1988         input("Press ENTER to continue running the testcase...")
1989
1990     def run(self):
1991         executable = self.args[0]
1992         if not os.path.exists(executable) or not os.access(
1993             executable, os.F_OK | os.X_OK
1994         ):
1995             # Exit code that means some system file did not exist,
1996             # could not be opened, or had some other kind of error.
1997             self.result = os.EX_OSFILE
1998             raise EnvironmentError(
1999                 "executable '%s' is not found or executable." % executable
2000             )
2001         self.logger.debug(
2002             "Running executable '{app}': '{cmd}'".format(
2003                 app=self.app_name, cmd=" ".join(self.args)
2004             )
2005         )
2006         env = os.environ.copy()
2007         env.update(self.env)
2008         env["CK_LOG_FILE_NAME"] = "-"
2009         self.process = subprocess.Popen(
2010             ["stdbuf", "-o0", "-e0"] + self.args,
2011             shell=False,
2012             env=env,
2013             preexec_fn=os.setpgrp,
2014             stdout=subprocess.PIPE,
2015             stderr=subprocess.PIPE,
2016         )
2017         self.wait_for_enter()
2018         out, err = self.process.communicate()
2019         self.logger.debug("Finished running `{app}'".format(app=self.app_name))
2020         self.logger.info("Return code is `%s'" % self.process.returncode)
2021         self.logger.info(single_line_delim)
2022         self.logger.info(
2023             "Executable `{app}' wrote to stdout:".format(app=self.app_name)
2024         )
2025         self.logger.info(single_line_delim)
2026         self.logger.info(out.decode("utf-8"))
2027         self.logger.info(single_line_delim)
2028         self.logger.info(
2029             "Executable `{app}' wrote to stderr:".format(app=self.app_name)
2030         )
2031         self.logger.info(single_line_delim)
2032         self.logger.info(err.decode("utf-8"))
2033         self.logger.info(single_line_delim)
2034         self.result = self.process.returncode
2035
2036
2037 if __name__ == "__main__":
2038     pass