tests: run tests against a running VPP
[vpp.git] / test / framework.py
1 #!/usr/bin/env python3
2
3 from __future__ import print_function
4 import logging
5 import sys
6 import os
7 import select
8 import signal
9 import subprocess
10 import unittest
11 import re
12 import time
13 import faulthandler
14 import random
15 import copy
16 import platform
17 import shutil
18 from collections import deque
19 from threading import Thread, Event
20 from inspect import getdoc, isclass
21 from traceback import format_exception
22 from logging import FileHandler, DEBUG, Formatter
23 from enum import Enum
24 from abc import ABC, abstractmethod
25 from struct import pack, unpack
26
27 import scapy.compat
28 from scapy.packet import Raw, Packet
29 from config import config, available_cpus, num_cpus, max_vpp_cpus
30 import hook as hookmodule
31 from vpp_pg_interface import VppPGInterface
32 from vpp_sub_interface import VppSubInterface
33 from vpp_lo_interface import VppLoInterface
34 from vpp_bvi_interface import VppBviInterface
35 from vpp_papi_provider import VppPapiProvider
36 from vpp_papi import VppEnum
37 import vpp_papi
38 from vpp_papi.vpp_stats import VPPStats
39 from vpp_papi.vpp_transport_socket import VppTransportSocketIOError
40 from log import (
41     RED,
42     GREEN,
43     YELLOW,
44     double_line_delim,
45     single_line_delim,
46     get_logger,
47     colorize,
48 )
49 from vpp_object import VppObjectRegistry
50 from util import ppp, is_core_present
51 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
52 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
53 from scapy.layers.inet6 import ICMPv6EchoReply
54 from vpp_running import use_running
55
56
57 logger = logging.getLogger(__name__)
58
59 # Set up an empty logger for the testcase that can be overridden as necessary
60 null_logger = logging.getLogger("VppTestCase")
61 null_logger.addHandler(logging.NullHandler())
62
63 PASS = 0
64 FAIL = 1
65 ERROR = 2
66 SKIP = 3
67 TEST_RUN = 4
68 SKIP_CPU_SHORTAGE = 5
69
70
71 if config.debug_framework:
72     import debug_internal
73
74 """
75   Test framework module.
76
77   The module provides a set of tools for constructing and running tests and
78   representing the results.
79 """
80
81
82 class VppDiedError(Exception):
83     """exception for reporting that the subprocess has died."""
84
85     signals_by_value = {
86         v: k
87         for k, v in signal.__dict__.items()
88         if k.startswith("SIG") and not k.startswith("SIG_")
89     }
90
91     def __init__(self, rv=None, testcase=None, method_name=None):
92         self.rv = rv
93         self.signal_name = None
94         self.testcase = testcase
95         self.method_name = method_name
96
97         try:
98             self.signal_name = VppDiedError.signals_by_value[-rv]
99         except (KeyError, TypeError):
100             pass
101
102         if testcase is None and method_name is None:
103             in_msg = ""
104         else:
105             in_msg = " while running %s.%s" % (testcase, method_name)
106
107         if self.rv:
108             msg = "VPP subprocess died unexpectedly%s with return code: %d%s." % (
109                 in_msg,
110                 self.rv,
111                 " [%s]" % (self.signal_name if self.signal_name is not None else ""),
112             )
113         else:
114             msg = "VPP subprocess died unexpectedly%s." % in_msg
115
116         super(VppDiedError, self).__init__(msg)
117
118
119 class _PacketInfo(object):
120     """Private class to create packet info object.
121
122     Help process information about the next packet.
123     Set variables to default values.
124     """
125
126     #: Store the index of the packet.
127     index = -1
128     #: Store the index of the source packet generator interface of the packet.
129     src = -1
130     #: Store the index of the destination packet generator interface
131     #: of the packet.
132     dst = -1
133     #: Store expected ip version
134     ip = -1
135     #: Store expected upper protocol
136     proto = -1
137     #: Store the copy of the former packet.
138     data = None
139
140     def __eq__(self, other):
141         index = self.index == other.index
142         src = self.src == other.src
143         dst = self.dst == other.dst
144         data = self.data == other.data
145         return index and src and dst and data
146
147
148 def pump_output(testclass):
149     """pump output from vpp stdout/stderr to proper queues"""
150     stdout_fragment = ""
151     stderr_fragment = ""
152     while not testclass.pump_thread_stop_flag.is_set():
153         readable = select.select(
154             [
155                 testclass.vpp.stdout.fileno(),
156                 testclass.vpp.stderr.fileno(),
157                 testclass.pump_thread_wakeup_pipe[0],
158             ],
159             [],
160             [],
161         )[0]
162         if testclass.vpp.stdout.fileno() in readable:
163             read = os.read(testclass.vpp.stdout.fileno(), 102400)
164             if len(read) > 0:
165                 split = read.decode("ascii", errors="backslashreplace").splitlines(True)
166                 if len(stdout_fragment) > 0:
167                     split[0] = "%s%s" % (stdout_fragment, split[0])
168                 if len(split) > 0 and split[-1].endswith("\n"):
169                     limit = None
170                 else:
171                     limit = -1
172                     stdout_fragment = split[-1]
173                 testclass.vpp_stdout_deque.extend(split[:limit])
174                 if not config.cache_vpp_output:
175                     for line in split[:limit]:
176                         testclass.logger.info("VPP STDOUT: %s" % line.rstrip("\n"))
177         if testclass.vpp.stderr.fileno() in readable:
178             read = os.read(testclass.vpp.stderr.fileno(), 102400)
179             if len(read) > 0:
180                 split = read.decode("ascii", errors="backslashreplace").splitlines(True)
181                 if len(stderr_fragment) > 0:
182                     split[0] = "%s%s" % (stderr_fragment, split[0])
183                 if len(split) > 0 and split[-1].endswith("\n"):
184                     limit = None
185                 else:
186                     limit = -1
187                     stderr_fragment = split[-1]
188
189                 testclass.vpp_stderr_deque.extend(split[:limit])
190                 if not config.cache_vpp_output:
191                     for line in split[:limit]:
192                         testclass.logger.error("VPP STDERR: %s" % line.rstrip("\n"))
193                         # ignoring the dummy pipe here intentionally - the
194                         # flag will take care of properly terminating the loop
195
196
197 def _is_platform_aarch64():
198     return platform.machine() == "aarch64"
199
200
201 is_platform_aarch64 = _is_platform_aarch64()
202
203
204 def _is_distro_ubuntu2204():
205     with open("/etc/os-release") as f:
206         for line in f.readlines():
207             if "jammy" in line:
208                 return True
209     return False
210
211
212 is_distro_ubuntu2204 = _is_distro_ubuntu2204()
213
214
215 class KeepAliveReporter(object):
216     """
217     Singleton object which reports test start to parent process
218     """
219
220     _shared_state = {}
221
222     def __init__(self):
223         self.__dict__ = self._shared_state
224         self._pipe = None
225
226     @property
227     def pipe(self):
228         return self._pipe
229
230     @pipe.setter
231     def pipe(self, pipe):
232         if self._pipe is not None:
233             raise Exception("Internal error - pipe should only be set once.")
234         self._pipe = pipe
235
236     def send_keep_alive(self, test, desc=None):
237         """
238         Write current test tmpdir & desc to keep-alive pipe to signal liveness
239         """
240         if self.pipe is None:
241             # if not running forked..
242             return
243
244         if isclass(test):
245             desc = "%s (%s)" % (desc, unittest.util.strclass(test))
246         else:
247             desc = test.id()
248
249         self.pipe.send((desc, config.vpp, test.tempdir, test.vpp.pid))
250
251
252 class TestCaseTag(Enum):
253     # marks the suites that must run at the end
254     # using only a single test runner
255     RUN_SOLO = 1
256     # marks the suites broken on VPP multi-worker
257     FIXME_VPP_WORKERS = 2
258     # marks the suites broken when ASan is enabled
259     FIXME_ASAN = 3
260     # marks suites broken on Ubuntu-22.04
261     FIXME_UBUNTU2204 = 4
262
263
264 def create_tag_decorator(e):
265     def decorator(cls):
266         try:
267             cls.test_tags.append(e)
268         except AttributeError:
269             cls.test_tags = [e]
270         return cls
271
272     return decorator
273
274
275 tag_run_solo = create_tag_decorator(TestCaseTag.RUN_SOLO)
276 tag_fixme_vpp_workers = create_tag_decorator(TestCaseTag.FIXME_VPP_WORKERS)
277 tag_fixme_asan = create_tag_decorator(TestCaseTag.FIXME_ASAN)
278 tag_fixme_ubuntu2204 = create_tag_decorator(TestCaseTag.FIXME_UBUNTU2204)
279
280
281 class DummyVpp:
282     returncode = None
283     pid = 0xCAFEBAFE
284
285     def poll(self):
286         pass
287
288     def terminate(self):
289         pass
290
291
292 class CPUInterface(ABC):
293     cpus = []
294     skipped_due_to_cpu_lack = False
295
296     @classmethod
297     @abstractmethod
298     def get_cpus_required(cls):
299         pass
300
301     @classmethod
302     def assign_cpus(cls, cpus):
303         cls.cpus = cpus
304
305
306 @use_running
307 class VppTestCase(CPUInterface, unittest.TestCase):
308     """This subclass is a base class for VPP test cases that are implemented as
309     classes. It provides methods to create and run test case.
310     """
311
312     extra_vpp_statseg_config = ""
313     extra_vpp_punt_config = []
314     extra_vpp_plugin_config = []
315     logger = null_logger
316     vapi_response_timeout = 5
317     remove_configured_vpp_objects_on_tear_down = True
318
319     @property
320     def packet_infos(self):
321         """List of packet infos"""
322         return self._packet_infos
323
324     @classmethod
325     def get_packet_count_for_if_idx(cls, dst_if_index):
326         """Get the number of packet info for specified destination if index"""
327         if dst_if_index in cls._packet_count_for_dst_if_idx:
328             return cls._packet_count_for_dst_if_idx[dst_if_index]
329         else:
330             return 0
331
332     @classmethod
333     def has_tag(cls, tag):
334         """if the test case has a given tag - return true"""
335         try:
336             return tag in cls.test_tags
337         except AttributeError:
338             pass
339         return False
340
341     @classmethod
342     def is_tagged_run_solo(cls):
343         """if the test case class is timing-sensitive - return true"""
344         return cls.has_tag(TestCaseTag.RUN_SOLO)
345
346     @classmethod
347     def skip_fixme_asan(cls):
348         """if @tag_fixme_asan & ASan is enabled - mark for skip"""
349         if cls.has_tag(TestCaseTag.FIXME_ASAN):
350             vpp_extra_cmake_args = os.environ.get("VPP_EXTRA_CMAKE_ARGS", "")
351             if "DVPP_ENABLE_SANITIZE_ADDR=ON" in vpp_extra_cmake_args:
352                 cls = unittest.skip("Skipping @tag_fixme_asan tests")(cls)
353
354     @classmethod
355     def skip_fixme_ubuntu2204(cls):
356         """if distro is ubuntu 22.04 and @tag_fixme_ubuntu2204 mark for skip"""
357         if cls.has_tag(TestCaseTag.FIXME_UBUNTU2204):
358             cls = unittest.skip("Skipping @tag_fixme_ubuntu2204 tests")(cls)
359
360     @classmethod
361     def instance(cls):
362         """Return the instance of this testcase"""
363         return cls.test_instance
364
365     @classmethod
366     def set_debug_flags(cls, d):
367         cls.gdbserver_port = 7777
368         cls.debug_core = False
369         cls.debug_gdb = False
370         cls.debug_gdbserver = False
371         cls.debug_all = False
372         cls.debug_attach = False
373         if d is None:
374             return
375         dl = d.lower()
376         if dl == "core":
377             cls.debug_core = True
378         elif dl == "gdb" or dl == "gdb-all":
379             cls.debug_gdb = True
380         elif dl == "gdbserver" or dl == "gdbserver-all":
381             cls.debug_gdbserver = True
382         elif dl == "attach":
383             cls.debug_attach = True
384         else:
385             raise Exception("Unrecognized DEBUG option: '%s'" % d)
386         if dl == "gdb-all" or dl == "gdbserver-all":
387             cls.debug_all = True
388
389     @classmethod
390     def get_vpp_worker_count(cls):
391         if not hasattr(cls, "vpp_worker_count"):
392             if cls.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
393                 cls.vpp_worker_count = 0
394             else:
395                 cls.vpp_worker_count = config.vpp_worker_count
396         return cls.vpp_worker_count
397
398     @classmethod
399     def get_cpus_required(cls):
400         return 1 + cls.get_vpp_worker_count()
401
402     @classmethod
403     def setUpConstants(cls):
404         """Set-up the test case class based on environment variables"""
405         cls.step = config.step
406         cls.plugin_path = ":".join(config.vpp_plugin_dir)
407         cls.test_plugin_path = ":".join(config.vpp_test_plugin_dir)
408         cls.extern_plugin_path = ":".join(config.extern_plugin_dir)
409         debug_cli = ""
410         if cls.step or cls.debug_gdb or cls.debug_gdbserver:
411             debug_cli = "cli-listen localhost:5002"
412         size = re.search(r"\d+[gG]", config.coredump_size)
413         if size:
414             coredump_size = f"coredump-size {config.coredump_size}".lower()
415         else:
416             coredump_size = "coredump-size unlimited"
417         default_variant = config.variant
418         if default_variant is not None:
419             default_variant = "default { variant %s 100 }" % default_variant
420         else:
421             default_variant = ""
422
423         api_fuzzing = config.api_fuzz
424         if api_fuzzing is None:
425             api_fuzzing = "off"
426
427         cls.vpp_cmdline = [
428             config.vpp,
429             "unix",
430             "{",
431             "nodaemon",
432             debug_cli,
433             "full-coredump",
434             coredump_size,
435             "runtime-dir",
436             cls.tempdir,
437             "}",
438             "api-trace",
439             "{",
440             "on",
441             "}",
442             "api-segment",
443             "{",
444             "prefix",
445             cls.get_api_segment_prefix(),
446             "}",
447             "cpu",
448             "{",
449             "main-core",
450             str(cls.cpus[0]),
451         ]
452         if cls.extern_plugin_path not in (None, ""):
453             cls.extra_vpp_plugin_config.append("add-path %s" % cls.extern_plugin_path)
454         if cls.get_vpp_worker_count():
455             cls.vpp_cmdline.extend(
456                 ["corelist-workers", ",".join([str(x) for x in cls.cpus[1:]])]
457             )
458         cls.vpp_cmdline.extend(
459             [
460                 "}",
461                 "physmem",
462                 "{",
463                 "max-size",
464                 "32m",
465                 "}",
466                 "statseg",
467                 "{",
468                 "socket-name",
469                 cls.get_stats_sock_path(),
470                 cls.extra_vpp_statseg_config,
471                 "}",
472                 "socksvr",
473                 "{",
474                 "socket-name",
475                 cls.get_api_sock_path(),
476                 "}",
477                 "node { ",
478                 default_variant,
479                 "}",
480                 "api-fuzz {",
481                 api_fuzzing,
482                 "}",
483                 "plugins",
484                 "{",
485                 "plugin",
486                 "dpdk_plugin.so",
487                 "{",
488                 "disable",
489                 "}",
490                 "plugin",
491                 "rdma_plugin.so",
492                 "{",
493                 "disable",
494                 "}",
495                 "plugin",
496                 "lisp_unittest_plugin.so",
497                 "{",
498                 "enable",
499                 "}",
500                 "plugin",
501                 "unittest_plugin.so",
502                 "{",
503                 "enable",
504                 "}",
505             ]
506             + cls.extra_vpp_plugin_config
507             + [
508                 "}",
509             ]
510         )
511
512         if cls.extra_vpp_punt_config is not None:
513             cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
514
515         if not cls.debug_attach:
516             cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
517             cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
518
519     @classmethod
520     def wait_for_enter(cls):
521         if cls.debug_gdbserver:
522             print(double_line_delim)
523             print("Spawned GDB server with PID: %d" % cls.vpp.pid)
524         elif cls.debug_gdb:
525             print(double_line_delim)
526             print("Spawned VPP with PID: %d" % cls.vpp.pid)
527         else:
528             cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
529             return
530         print(single_line_delim)
531         print("You can debug VPP using:")
532         if cls.debug_gdbserver:
533             print(
534                 f"sudo gdb {config.vpp} "
535                 f"-ex 'target remote localhost:{cls.gdbserver_port}'"
536             )
537             print(
538                 "Now is the time to attach gdb by running the above "
539                 "command, set up breakpoints etc., then resume VPP from "
540                 "within gdb by issuing the 'continue' command"
541             )
542             cls.gdbserver_port += 1
543         elif cls.debug_gdb:
544             print(f"sudo gdb {config.vpp} -ex 'attach {cls.vpp.pid}'")
545             print(
546                 "Now is the time to attach gdb by running the above "
547                 "command and set up breakpoints etc., then resume VPP from"
548                 " within gdb by issuing the 'continue' command"
549             )
550         print(single_line_delim)
551         input("Press ENTER to continue running the testcase...")
552
553     @classmethod
554     def attach_vpp(cls):
555         cls.vpp = DummyVpp()
556
557     @classmethod
558     def run_vpp(cls):
559         cls.logger.debug(f"Assigned cpus: {cls.cpus}")
560         cmdline = cls.vpp_cmdline
561
562         if cls.debug_gdbserver:
563             gdbserver = "/usr/bin/gdbserver"
564             if not os.path.isfile(gdbserver) or not os.access(gdbserver, os.X_OK):
565                 raise Exception(
566                     "gdbserver binary '%s' does not exist or is "
567                     "not executable" % gdbserver
568                 )
569
570             cmdline = [
571                 gdbserver,
572                 "localhost:{port}".format(port=cls.gdbserver_port),
573             ] + cls.vpp_cmdline
574             cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
575
576         try:
577             cls.vpp = subprocess.Popen(
578                 cmdline, stdout=subprocess.PIPE, stderr=subprocess.PIPE
579             )
580         except subprocess.CalledProcessError as e:
581             cls.logger.critical(
582                 "Subprocess returned with non-0 return code: (%s)", e.returncode
583             )
584             raise
585         except OSError as e:
586             cls.logger.critical(
587                 "Subprocess returned with OS error: (%s) %s", e.errno, e.strerror
588             )
589             raise
590         except Exception as e:
591             cls.logger.exception("Subprocess returned unexpected from %s:", cmdline)
592             raise
593
594         cls.wait_for_enter()
595
596     @classmethod
597     def wait_for_coredump(cls):
598         corefile = cls.tempdir + "/core"
599         if os.path.isfile(corefile):
600             cls.logger.error("Waiting for coredump to complete: %s", corefile)
601             curr_size = os.path.getsize(corefile)
602             deadline = time.time() + 60
603             ok = False
604             while time.time() < deadline:
605                 cls.sleep(1)
606                 size = curr_size
607                 curr_size = os.path.getsize(corefile)
608                 if size == curr_size:
609                     ok = True
610                     break
611             if not ok:
612                 cls.logger.error(
613                     "Timed out waiting for coredump to complete: %s", corefile
614                 )
615             else:
616                 cls.logger.error("Coredump complete: %s, size %d", corefile, curr_size)
617
618     @classmethod
619     def get_stats_sock_path(cls):
620         return "%s/stats.sock" % cls.tempdir
621
622     @classmethod
623     def get_api_sock_path(cls):
624         return "%s/api.sock" % cls.tempdir
625
626     @classmethod
627     def get_api_segment_prefix(cls):
628         return os.path.basename(cls.tempdir)  # Only used for VAPI
629
630     @classmethod
631     def get_tempdir(cls):
632         if cls.debug_attach:
633             tmpdir = f"{config.tmp_dir}/unittest-attach-gdb"
634         else:
635             tmpdir = f"{config.tmp_dir}/vpp-unittest-{cls.__name__}"
636             if config.wipe_tmp_dir:
637                 shutil.rmtree(tmpdir, ignore_errors=True)
638             os.mkdir(tmpdir)
639         return tmpdir
640
641     @classmethod
642     def create_file_handler(cls):
643         if config.log_dir is None:
644             cls.file_handler = FileHandler(f"{cls.tempdir}/log.txt")
645             return
646
647         logdir = f"{config.log_dir}/vpp-unittest-{cls.__name__}"
648         if config.wipe_tmp_dir:
649             shutil.rmtree(logdir, ignore_errors=True)
650         os.mkdir(logdir)
651         cls.file_handler = FileHandler(f"{logdir}/log.txt")
652
653     @classmethod
654     def setUpClass(cls):
655         """
656         Perform class setup before running the testcase
657         Remove shared memory files, start vpp and connect the vpp-api
658         """
659         super(VppTestCase, cls).setUpClass()
660         cls.logger = get_logger(cls.__name__)
661         random.seed(config.rnd_seed)
662         if hasattr(cls, "parallel_handler"):
663             cls.logger.addHandler(cls.parallel_handler)
664             cls.logger.propagate = False
665         cls.set_debug_flags(config.debug)
666         cls.tempdir = cls.get_tempdir()
667         cls.create_file_handler()
668         cls.file_handler.setFormatter(
669             Formatter(fmt="%(asctime)s,%(msecs)03d %(message)s", datefmt="%H:%M:%S")
670         )
671         cls.file_handler.setLevel(DEBUG)
672         cls.logger.addHandler(cls.file_handler)
673         cls.logger.debug("--- setUpClass() for %s called ---" % cls.__name__)
674         os.chdir(cls.tempdir)
675         cls.logger.info(
676             "Temporary dir is %s, api socket is %s",
677             cls.tempdir,
678             cls.get_api_sock_path(),
679         )
680         cls.logger.debug("Random seed is %s", config.rnd_seed)
681         cls.setUpConstants()
682         cls.reset_packet_infos()
683         cls._pcaps = []
684         cls._old_pcaps = []
685         cls.verbose = 0
686         cls.vpp_dead = False
687         cls.registry = VppObjectRegistry()
688         cls.vpp_startup_failed = False
689         cls.reporter = KeepAliveReporter()
690         # need to catch exceptions here because if we raise, then the cleanup
691         # doesn't get called and we might end with a zombie vpp
692         try:
693             if cls.debug_attach:
694                 cls.attach_vpp()
695             else:
696                 cls.run_vpp()
697             cls.reporter.send_keep_alive(cls, "setUpClass")
698             VppTestResult.current_test_case_info = TestCaseInfo(
699                 cls.logger, cls.tempdir, cls.vpp.pid, config.vpp
700             )
701             cls.vpp_stdout_deque = deque()
702             cls.vpp_stderr_deque = deque()
703             # Pump thread in a non-debug-attached & not running-vpp
704             if not cls.debug_attach and not hasattr(cls, "running_vpp"):
705                 cls.pump_thread_stop_flag = Event()
706                 cls.pump_thread_wakeup_pipe = os.pipe()
707                 cls.pump_thread = Thread(target=pump_output, args=(cls,))
708                 cls.pump_thread.daemon = True
709                 cls.pump_thread.start()
710             if cls.debug_gdb or cls.debug_gdbserver or cls.debug_attach:
711                 cls.vapi_response_timeout = 0
712             cls.vapi = VppPapiProvider(cls.__name__, cls, cls.vapi_response_timeout)
713             if cls.step:
714                 hook = hookmodule.StepHook(cls)
715             else:
716                 hook = hookmodule.PollHook(cls)
717             cls.vapi.register_hook(hook)
718             cls.statistics = VPPStats(socketname=cls.get_stats_sock_path())
719             try:
720                 hook.poll_vpp()
721             except VppDiedError:
722                 cls.vpp_startup_failed = True
723                 cls.logger.critical(
724                     "VPP died shortly after startup, check the"
725                     " output to standard error for possible cause"
726                 )
727                 raise
728             try:
729                 cls.vapi.connect()
730             except (vpp_papi.VPPIOError, Exception) as e:
731                 cls.logger.debug("Exception connecting to vapi: %s" % e)
732                 cls.vapi.disconnect()
733
734                 if cls.debug_gdbserver:
735                     print(
736                         colorize(
737                             "You're running VPP inside gdbserver but "
738                             "VPP-API connection failed, did you forget "
739                             "to 'continue' VPP from within gdb?",
740                             RED,
741                         )
742                     )
743                 raise e
744             if cls.debug_attach:
745                 last_line = cls.vapi.cli("show thread").split("\n")[-2]
746                 cls.vpp_worker_count = int(last_line.split(" ")[0])
747                 print("Detected VPP with %s workers." % cls.vpp_worker_count)
748         except vpp_papi.VPPRuntimeError as e:
749             cls.logger.debug("%s" % e)
750             cls.quit()
751             raise e
752         except Exception as e:
753             cls.logger.debug("Exception connecting to VPP: %s" % e)
754             cls.quit()
755             raise e
756
757     @classmethod
758     def _debug_quit(cls):
759         if cls.debug_gdbserver or cls.debug_gdb:
760             try:
761                 cls.vpp.poll()
762
763                 if cls.vpp.returncode is None:
764                     print()
765                     print(double_line_delim)
766                     print("VPP or GDB server is still running")
767                     print(single_line_delim)
768                     input(
769                         "When done debugging, press ENTER to kill the "
770                         "process and finish running the testcase..."
771                     )
772             except AttributeError:
773                 pass
774
775     @classmethod
776     def quit(cls):
777         """
778         Disconnect vpp-api, kill vpp and cleanup shared memory files
779         """
780         cls._debug_quit()
781         if hasattr(cls, "running_vpp"):
782             cls.vpp.quit_vpp()
783
784         # first signal that we want to stop the pump thread, then wake it up
785         if hasattr(cls, "pump_thread_stop_flag"):
786             cls.pump_thread_stop_flag.set()
787         if hasattr(cls, "pump_thread_wakeup_pipe"):
788             os.write(cls.pump_thread_wakeup_pipe[1], b"ding dong wake up")
789         if hasattr(cls, "pump_thread"):
790             cls.logger.debug("Waiting for pump thread to stop")
791             cls.pump_thread.join()
792         if hasattr(cls, "vpp_stderr_reader_thread"):
793             cls.logger.debug("Waiting for stderr pump to stop")
794             cls.vpp_stderr_reader_thread.join()
795
796         if hasattr(cls, "vpp"):
797             if hasattr(cls, "vapi"):
798                 cls.logger.debug(cls.vapi.vpp.get_stats())
799                 cls.logger.debug("Disconnecting class vapi client on %s", cls.__name__)
800                 cls.vapi.disconnect()
801                 cls.logger.debug("Deleting class vapi attribute on %s", cls.__name__)
802                 del cls.vapi
803             cls.vpp.poll()
804             if not cls.debug_attach and cls.vpp.returncode is None:
805                 cls.wait_for_coredump()
806                 cls.logger.debug("Sending TERM to vpp")
807                 cls.vpp.terminate()
808                 cls.logger.debug("Waiting for vpp to die")
809                 try:
810                     outs, errs = cls.vpp.communicate(timeout=5)
811                 except subprocess.TimeoutExpired:
812                     cls.vpp.kill()
813                     outs, errs = cls.vpp.communicate()
814             cls.logger.debug("Deleting class vpp attribute on %s", cls.__name__)
815             if not cls.debug_attach and not hasattr(cls, "running_vpp"):
816                 cls.vpp.stdout.close()
817                 cls.vpp.stderr.close()
818             # If vpp is a dynamic attribute set by the func use_running,
819             # deletion will result in an AttributeError that we can
820             # safetly pass.
821             try:
822                 del cls.vpp
823             except AttributeError:
824                 pass
825
826         if cls.vpp_startup_failed:
827             stdout_log = cls.logger.info
828             stderr_log = cls.logger.critical
829         else:
830             stdout_log = cls.logger.info
831             stderr_log = cls.logger.info
832
833         if hasattr(cls, "vpp_stdout_deque"):
834             stdout_log(single_line_delim)
835             stdout_log("VPP output to stdout while running %s:", cls.__name__)
836             stdout_log(single_line_delim)
837             vpp_output = "".join(cls.vpp_stdout_deque)
838             with open(cls.tempdir + "/vpp_stdout.txt", "w") as f:
839                 f.write(vpp_output)
840             stdout_log("\n%s", vpp_output)
841             stdout_log(single_line_delim)
842
843         if hasattr(cls, "vpp_stderr_deque"):
844             stderr_log(single_line_delim)
845             stderr_log("VPP output to stderr while running %s:", cls.__name__)
846             stderr_log(single_line_delim)
847             vpp_output = "".join(cls.vpp_stderr_deque)
848             with open(cls.tempdir + "/vpp_stderr.txt", "w") as f:
849                 f.write(vpp_output)
850             stderr_log("\n%s", vpp_output)
851             stderr_log(single_line_delim)
852
853     @classmethod
854     def tearDownClass(cls):
855         """Perform final cleanup after running all tests in this test-case"""
856         cls.logger.debug("--- tearDownClass() for %s called ---" % cls.__name__)
857         cls.reporter.send_keep_alive(cls, "tearDownClass")
858         cls.quit()
859         cls.file_handler.close()
860         cls.reset_packet_infos()
861         if config.debug_framework:
862             debug_internal.on_tear_down_class(cls)
863
864     def show_commands_at_teardown(self):
865         """Allow subclass specific teardown logging additions."""
866         self.logger.info("--- No test specific show commands provided. ---")
867
868     def tearDown(self):
869         """Show various debug prints after each test"""
870         self.logger.debug(
871             "--- tearDown() for %s.%s(%s) called ---"
872             % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
873         )
874
875         try:
876             if not self.vpp_dead:
877                 self.logger.debug(self.vapi.cli("show trace max 1000"))
878                 self.logger.info(self.vapi.ppcli("show interface"))
879                 self.logger.info(self.vapi.ppcli("show hardware"))
880                 self.logger.info(self.statistics.set_errors_str())
881                 self.logger.info(self.vapi.ppcli("show run"))
882                 self.logger.info(self.vapi.ppcli("show log"))
883                 self.logger.info(self.vapi.ppcli("show bihash"))
884                 self.logger.info("Logging testcase specific show commands.")
885                 self.show_commands_at_teardown()
886                 if self.remove_configured_vpp_objects_on_tear_down:
887                     self.registry.remove_vpp_config(self.logger)
888             # Save/Dump VPP api trace log
889             m = self._testMethodName
890             api_trace = "vpp_api_trace.%s.%d.log" % (m, self.vpp.pid)
891             tmp_api_trace = "/tmp/%s" % api_trace
892             vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
893             self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
894             self.logger.info("Moving %s to %s\n" % (tmp_api_trace, vpp_api_trace_log))
895             os.rename(tmp_api_trace, vpp_api_trace_log)
896         except VppTransportSocketIOError:
897             self.logger.debug(
898                 "VppTransportSocketIOError: Vpp dead. Cannot log show commands."
899             )
900             self.vpp_dead = True
901         else:
902             self.registry.unregister_all(self.logger)
903
904     def setUp(self):
905         """Clear trace before running each test"""
906         super(VppTestCase, self).setUp()
907         self.reporter.send_keep_alive(self)
908         if self.vpp_dead:
909             raise VppDiedError(
910                 rv=None,
911                 testcase=self.__class__.__name__,
912                 method_name=self._testMethodName,
913             )
914         self.sleep(0.1, "during setUp")
915         self.vpp_stdout_deque.append(
916             "--- test setUp() for %s.%s(%s) starts here ---\n"
917             % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
918         )
919         self.vpp_stderr_deque.append(
920             "--- test setUp() for %s.%s(%s) starts here ---\n"
921             % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
922         )
923         self.vapi.cli("clear trace")
924         # store the test instance inside the test class - so that objects
925         # holding the class can access instance methods (like assertEqual)
926         type(self).test_instance = self
927
928     @classmethod
929     def pg_enable_capture(cls, interfaces=None):
930         """
931         Enable capture on packet-generator interfaces
932
933         :param interfaces: iterable interface indexes (if None,
934                            use self.pg_interfaces)
935
936         """
937         if interfaces is None:
938             interfaces = cls.pg_interfaces
939         for i in interfaces:
940             i.enable_capture()
941
942     @classmethod
943     def register_pcap(cls, intf, worker):
944         """Register a pcap in the testclass"""
945         # add to the list of captures with current timestamp
946         cls._pcaps.append((intf, worker))
947
948     @classmethod
949     def get_vpp_time(cls):
950         # processes e.g. "Time now 2.190522, Wed, 11 Mar 2020 17:29:54 GMT"
951         # returns float("2.190522")
952         timestr = cls.vapi.cli("show clock")
953         head, sep, tail = timestr.partition(",")
954         head, sep, tail = head.partition("Time now")
955         return float(tail)
956
957     @classmethod
958     def sleep_on_vpp_time(cls, sec):
959         """Sleep according to time in VPP world"""
960         # On a busy system with many processes
961         # we might end up with VPP time being slower than real world
962         # So take that into account when waiting for VPP to do something
963         start_time = cls.get_vpp_time()
964         while cls.get_vpp_time() - start_time < sec:
965             cls.sleep(0.1)
966
967     @classmethod
968     def pg_start(cls, trace=True):
969         """Enable the PG, wait till it is done, then clean up"""
970         for (intf, worker) in cls._old_pcaps:
971             intf.handle_old_pcap_file(intf.get_in_path(worker), intf.in_history_counter)
972         cls._old_pcaps = []
973         if trace:
974             cls.vapi.cli("clear trace")
975             cls.vapi.cli("trace add pg-input 1000")
976         cls.vapi.cli("packet-generator enable")
977         # PG, when starts, runs to completion -
978         # so let's avoid a race condition,
979         # and wait a little till it's done.
980         # Then clean it up  - and then be gone.
981         deadline = time.time() + 300
982         while cls.vapi.cli("show packet-generator").find("Yes") != -1:
983             cls.sleep(0.01)  # yield
984             if time.time() > deadline:
985                 cls.logger.error("Timeout waiting for pg to stop")
986                 break
987         for intf, worker in cls._pcaps:
988             cls.vapi.cli("packet-generator delete %s" % intf.get_cap_name(worker))
989         cls._old_pcaps = cls._pcaps
990         cls._pcaps = []
991
992     @classmethod
993     def create_pg_interfaces_internal(cls, interfaces, gso=0, gso_size=0, mode=None):
994         """
995         Create packet-generator interfaces.
996
997         :param interfaces: iterable indexes of the interfaces.
998         :returns: List of created interfaces.
999
1000         """
1001         result = []
1002         for i in interfaces:
1003             intf = VppPGInterface(cls, i, gso, gso_size, mode)
1004             setattr(cls, intf.name, intf)
1005             result.append(intf)
1006         cls.pg_interfaces = result
1007         return result
1008
1009     @classmethod
1010     def create_pg_ip4_interfaces(cls, interfaces, gso=0, gso_size=0):
1011         pgmode = VppEnum.vl_api_pg_interface_mode_t
1012         return cls.create_pg_interfaces_internal(
1013             interfaces, gso, gso_size, pgmode.PG_API_MODE_IP4
1014         )
1015
1016     @classmethod
1017     def create_pg_ip6_interfaces(cls, interfaces, gso=0, gso_size=0):
1018         pgmode = VppEnum.vl_api_pg_interface_mode_t
1019         return cls.create_pg_interfaces_internal(
1020             interfaces, gso, gso_size, pgmode.PG_API_MODE_IP6
1021         )
1022
1023     @classmethod
1024     def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0):
1025         pgmode = VppEnum.vl_api_pg_interface_mode_t
1026         return cls.create_pg_interfaces_internal(
1027             interfaces, gso, gso_size, pgmode.PG_API_MODE_ETHERNET
1028         )
1029
1030     @classmethod
1031     def create_pg_ethernet_interfaces(cls, interfaces, gso=0, gso_size=0):
1032         pgmode = VppEnum.vl_api_pg_interface_mode_t
1033         return cls.create_pg_interfaces_internal(
1034             interfaces, gso, gso_size, pgmode.PG_API_MODE_ETHERNET
1035         )
1036
1037     @classmethod
1038     def create_loopback_interfaces(cls, count):
1039         """
1040         Create loopback interfaces.
1041
1042         :param count: number of interfaces created.
1043         :returns: List of created interfaces.
1044         """
1045         result = [VppLoInterface(cls) for i in range(count)]
1046         for intf in result:
1047             setattr(cls, intf.name, intf)
1048         cls.lo_interfaces = result
1049         return result
1050
1051     @classmethod
1052     def create_bvi_interfaces(cls, count):
1053         """
1054         Create BVI interfaces.
1055
1056         :param count: number of interfaces created.
1057         :returns: List of created interfaces.
1058         """
1059         result = [VppBviInterface(cls) for i in range(count)]
1060         for intf in result:
1061             setattr(cls, intf.name, intf)
1062         cls.bvi_interfaces = result
1063         return result
1064
1065     @staticmethod
1066     def extend_packet(packet, size, padding=" "):
1067         """
1068         Extend packet to given size by padding with spaces or custom padding
1069         NOTE: Currently works only when Raw layer is present.
1070
1071         :param packet: packet
1072         :param size: target size
1073         :param padding: padding used to extend the payload
1074
1075         """
1076         packet_len = len(packet) + 4
1077         extend = size - packet_len
1078         if extend > 0:
1079             num = (extend // len(padding)) + 1
1080             packet[Raw].load += (padding * num)[:extend].encode("ascii")
1081
1082     @classmethod
1083     def reset_packet_infos(cls):
1084         """Reset the list of packet info objects and packet counts to zero"""
1085         cls._packet_infos = {}
1086         cls._packet_count_for_dst_if_idx = {}
1087
1088     @classmethod
1089     def create_packet_info(cls, src_if, dst_if):
1090         """
1091         Create packet info object containing the source and destination indexes
1092         and add it to the testcase's packet info list
1093
1094         :param VppInterface src_if: source interface
1095         :param VppInterface dst_if: destination interface
1096
1097         :returns: _PacketInfo object
1098
1099         """
1100         info = _PacketInfo()
1101         info.index = len(cls._packet_infos)
1102         info.src = src_if.sw_if_index
1103         info.dst = dst_if.sw_if_index
1104         if isinstance(dst_if, VppSubInterface):
1105             dst_idx = dst_if.parent.sw_if_index
1106         else:
1107             dst_idx = dst_if.sw_if_index
1108         if dst_idx in cls._packet_count_for_dst_if_idx:
1109             cls._packet_count_for_dst_if_idx[dst_idx] += 1
1110         else:
1111             cls._packet_count_for_dst_if_idx[dst_idx] = 1
1112         cls._packet_infos[info.index] = info
1113         return info
1114
1115     @staticmethod
1116     def info_to_payload(info):
1117         """
1118         Convert _PacketInfo object to packet payload
1119
1120         :param info: _PacketInfo object
1121
1122         :returns: string containing serialized data from packet info
1123         """
1124
1125         # retrieve payload, currently 18 bytes (4 x ints + 1 short)
1126         return pack("iiiih", info.index, info.src, info.dst, info.ip, info.proto)
1127
1128     @staticmethod
1129     def payload_to_info(payload, payload_field="load"):
1130         """
1131         Convert packet payload to _PacketInfo object
1132
1133         :param payload: packet payload
1134         :type payload:  <class 'scapy.packet.Raw'>
1135         :param payload_field: packet fieldname of payload "load" for
1136                 <class 'scapy.packet.Raw'>
1137         :type payload_field: str
1138         :returns: _PacketInfo object containing de-serialized data from payload
1139
1140         """
1141
1142         # retrieve payload, currently 18 bytes (4 x ints + 1 short)
1143         payload_b = getattr(payload, payload_field)[:18]
1144
1145         info = _PacketInfo()
1146         info.index, info.src, info.dst, info.ip, info.proto = unpack("iiiih", payload_b)
1147
1148         # some SRv6 TCs depend on get an exception if bad values are detected
1149         if info.index > 0x4000:
1150             raise ValueError("Index value is invalid")
1151
1152         return info
1153
1154     def get_next_packet_info(self, info):
1155         """
1156         Iterate over the packet info list stored in the testcase
1157         Start iteration with first element if info is None
1158         Continue based on index in info if info is specified
1159
1160         :param info: info or None
1161         :returns: next info in list or None if no more infos
1162         """
1163         if info is None:
1164             next_index = 0
1165         else:
1166             next_index = info.index + 1
1167         if next_index == len(self._packet_infos):
1168             return None
1169         else:
1170             return self._packet_infos[next_index]
1171
1172     def get_next_packet_info_for_interface(self, src_index, info):
1173         """
1174         Search the packet info list for the next packet info with same source
1175         interface index
1176
1177         :param src_index: source interface index to search for
1178         :param info: packet info - where to start the search
1179         :returns: packet info or None
1180
1181         """
1182         while True:
1183             info = self.get_next_packet_info(info)
1184             if info is None:
1185                 return None
1186             if info.src == src_index:
1187                 return info
1188
1189     def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
1190         """
1191         Search the packet info list for the next packet info with same source
1192         and destination interface indexes
1193
1194         :param src_index: source interface index to search for
1195         :param dst_index: destination interface index to search for
1196         :param info: packet info - where to start the search
1197         :returns: packet info or None
1198
1199         """
1200         while True:
1201             info = self.get_next_packet_info_for_interface(src_index, info)
1202             if info is None:
1203                 return None
1204             if info.dst == dst_index:
1205                 return info
1206
1207     def assert_equal(self, real_value, expected_value, name_or_class=None):
1208         if name_or_class is None:
1209             self.assertEqual(real_value, expected_value)
1210             return
1211         try:
1212             msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
1213             msg = msg % (
1214                 getdoc(name_or_class).strip(),
1215                 real_value,
1216                 str(name_or_class(real_value)),
1217                 expected_value,
1218                 str(name_or_class(expected_value)),
1219             )
1220         except Exception:
1221             msg = "Invalid %s: %s does not match expected value %s" % (
1222                 name_or_class,
1223                 real_value,
1224                 expected_value,
1225             )
1226
1227         self.assertEqual(real_value, expected_value, msg)
1228
1229     def assert_in_range(self, real_value, expected_min, expected_max, name=None):
1230         if name is None:
1231             msg = None
1232         else:
1233             msg = "Invalid %s: %s out of range <%s,%s>" % (
1234                 name,
1235                 real_value,
1236                 expected_min,
1237                 expected_max,
1238             )
1239         self.assertTrue(expected_min <= real_value <= expected_max, msg)
1240
1241     def assert_packet_checksums_valid(self, packet, ignore_zero_udp_checksums=True):
1242         received = packet.__class__(scapy.compat.raw(packet))
1243         udp_layers = ["UDP", "UDPerror"]
1244         checksum_fields = ["cksum", "chksum"]
1245         checksums = []
1246         counter = 0
1247         temp = received.__class__(scapy.compat.raw(received))
1248         while True:
1249             layer = temp.getlayer(counter)
1250             if layer:
1251                 layer = layer.copy()
1252                 layer.remove_payload()
1253                 for cf in checksum_fields:
1254                     if hasattr(layer, cf):
1255                         if (
1256                             ignore_zero_udp_checksums
1257                             and 0 == getattr(layer, cf)
1258                             and layer.name in udp_layers
1259                         ):
1260                             continue
1261                         delattr(temp.getlayer(counter), cf)
1262                         checksums.append((counter, cf))
1263             else:
1264                 break
1265             counter = counter + 1
1266         if 0 == len(checksums):
1267             return
1268         temp = temp.__class__(scapy.compat.raw(temp))
1269         for layer, cf in checksums:
1270             calc_sum = getattr(temp[layer], cf)
1271             self.assert_equal(
1272                 getattr(received[layer], cf),
1273                 calc_sum,
1274                 "packet checksum on layer #%d: %s" % (layer, temp[layer].name),
1275             )
1276             self.logger.debug(
1277                 "Checksum field `%s` on `%s` layer has correct value `%s`"
1278                 % (cf, temp[layer].name, calc_sum)
1279             )
1280
1281     def assert_checksum_valid(
1282         self, received_packet, layer, field_name="chksum", ignore_zero_checksum=False
1283     ):
1284         """Check checksum of received packet on given layer"""
1285         received_packet_checksum = getattr(received_packet[layer], field_name)
1286         if ignore_zero_checksum and 0 == received_packet_checksum:
1287             return
1288         recalculated = received_packet.__class__(scapy.compat.raw(received_packet))
1289         delattr(recalculated[layer], field_name)
1290         recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
1291         self.assert_equal(
1292             received_packet_checksum,
1293             getattr(recalculated[layer], field_name),
1294             "packet checksum on layer: %s" % layer,
1295         )
1296
1297     def assert_ip_checksum_valid(self, received_packet, ignore_zero_checksum=False):
1298         self.assert_checksum_valid(
1299             received_packet, "IP", ignore_zero_checksum=ignore_zero_checksum
1300         )
1301
1302     def assert_tcp_checksum_valid(self, received_packet, ignore_zero_checksum=False):
1303         self.assert_checksum_valid(
1304             received_packet, "TCP", ignore_zero_checksum=ignore_zero_checksum
1305         )
1306
1307     def assert_udp_checksum_valid(self, received_packet, ignore_zero_checksum=True):
1308         self.assert_checksum_valid(
1309             received_packet, "UDP", ignore_zero_checksum=ignore_zero_checksum
1310         )
1311
1312     def assert_embedded_icmp_checksum_valid(self, received_packet):
1313         if received_packet.haslayer(IPerror):
1314             self.assert_checksum_valid(received_packet, "IPerror")
1315         if received_packet.haslayer(TCPerror):
1316             self.assert_checksum_valid(received_packet, "TCPerror")
1317         if received_packet.haslayer(UDPerror):
1318             self.assert_checksum_valid(
1319                 received_packet, "UDPerror", ignore_zero_checksum=True
1320             )
1321         if received_packet.haslayer(ICMPerror):
1322             self.assert_checksum_valid(received_packet, "ICMPerror")
1323
1324     def assert_icmp_checksum_valid(self, received_packet):
1325         self.assert_checksum_valid(received_packet, "ICMP")
1326         self.assert_embedded_icmp_checksum_valid(received_packet)
1327
1328     def assert_icmpv6_checksum_valid(self, pkt):
1329         if pkt.haslayer(ICMPv6DestUnreach):
1330             self.assert_checksum_valid(pkt, "ICMPv6DestUnreach", "cksum")
1331             self.assert_embedded_icmp_checksum_valid(pkt)
1332         if pkt.haslayer(ICMPv6EchoRequest):
1333             self.assert_checksum_valid(pkt, "ICMPv6EchoRequest", "cksum")
1334         if pkt.haslayer(ICMPv6EchoReply):
1335             self.assert_checksum_valid(pkt, "ICMPv6EchoReply", "cksum")
1336
1337     def get_counter(self, counter):
1338         if counter.startswith("/"):
1339             counter_value = self.statistics.get_counter(counter)
1340         else:
1341             counters = self.vapi.cli("sh errors").split("\n")
1342             counter_value = 0
1343             for i in range(1, len(counters) - 1):
1344                 results = counters[i].split()
1345                 if results[1] == counter:
1346                     counter_value = int(results[0])
1347                     break
1348         return counter_value
1349
1350     def assert_counter_equal(self, counter, expected_value, thread=None, index=0):
1351         c = self.get_counter(counter)
1352         if thread is not None:
1353             c = c[thread][index]
1354         else:
1355             c = sum(x[index] for x in c)
1356         self.assert_equal(c, expected_value, "counter `%s'" % counter)
1357
1358     def assert_packet_counter_equal(self, counter, expected_value):
1359         counter_value = self.get_counter(counter)
1360         self.assert_equal(
1361             counter_value, expected_value, "packet counter `%s'" % counter
1362         )
1363
1364     def assert_error_counter_equal(self, counter, expected_value):
1365         counter_value = self.statistics[counter].sum()
1366         self.assert_equal(counter_value, expected_value, "error counter `%s'" % counter)
1367
1368     @classmethod
1369     def sleep(cls, timeout, remark=None):
1370
1371         # /* Allow sleep(0) to maintain win32 semantics, and as decreed
1372         #  * by Guido, only the main thread can be interrupted.
1373         # */
1374         # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892  # noqa
1375         if timeout == 0:
1376             # yield quantum
1377             if hasattr(os, "sched_yield"):
1378                 os.sched_yield()
1379             else:
1380                 time.sleep(0)
1381             return
1382
1383         cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1384         before = time.time()
1385         time.sleep(timeout)
1386         after = time.time()
1387         if after - before > 2 * timeout:
1388             cls.logger.error(
1389                 "unexpected self.sleep() result - slept for %es instead of ~%es!",
1390                 after - before,
1391                 timeout,
1392             )
1393
1394         cls.logger.debug(
1395             "Finished sleep (%s) - slept %es (wanted %es)",
1396             remark,
1397             after - before,
1398             timeout,
1399         )
1400
1401     def virtual_sleep(self, timeout, remark=None):
1402         self.logger.debug("Moving VPP time by %s (%s)", timeout, remark)
1403         self.vapi.cli("set clock adjust %s" % timeout)
1404
1405     def pg_send(self, intf, pkts, worker=None, trace=True):
1406         intf.add_stream(pkts, worker=worker)
1407         self.pg_enable_capture(self.pg_interfaces)
1408         self.pg_start(trace=trace)
1409
1410     def snapshot_stats(self, stats_diff):
1411         """Return snapshot of interesting stats based on diff dictionary."""
1412         stats_snapshot = {}
1413         for sw_if_index in stats_diff:
1414             for counter in stats_diff[sw_if_index]:
1415                 stats_snapshot[counter] = self.statistics[counter]
1416         self.logger.debug(f"Took statistics stats_snapshot: {stats_snapshot}")
1417         return stats_snapshot
1418
1419     def compare_stats_with_snapshot(self, stats_diff, stats_snapshot):
1420         """Assert appropriate difference between current stats and snapshot."""
1421         for sw_if_index in stats_diff:
1422             for cntr, diff in stats_diff[sw_if_index].items():
1423                 if sw_if_index == "err":
1424                     self.assert_equal(
1425                         self.statistics[cntr].sum(),
1426                         stats_snapshot[cntr].sum() + diff,
1427                         f"'{cntr}' counter value (previous value: "
1428                         f"{stats_snapshot[cntr].sum()}, "
1429                         f"expected diff: {diff})",
1430                     )
1431                 else:
1432                     try:
1433                         self.assert_equal(
1434                             self.statistics[cntr][:, sw_if_index].sum(),
1435                             stats_snapshot[cntr][:, sw_if_index].sum() + diff,
1436                             f"'{cntr}' counter value (previous value: "
1437                             f"{stats_snapshot[cntr][:, sw_if_index].sum()}, "
1438                             f"expected diff: {diff})",
1439                         )
1440                     except IndexError:
1441                         # if diff is 0, then this most probably a case where
1442                         # test declares multiple interfaces but traffic hasn't
1443                         # passed through this one yet - which means the counter
1444                         # value is 0 and can be ignored
1445                         if 0 != diff:
1446                             raise
1447
1448     def send_and_assert_no_replies(
1449         self, intf, pkts, remark="", timeout=None, stats_diff=None, trace=True, msg=None
1450     ):
1451         if stats_diff:
1452             stats_snapshot = self.snapshot_stats(stats_diff)
1453
1454         self.pg_send(intf, pkts)
1455
1456         try:
1457             if not timeout:
1458                 timeout = 1
1459             for i in self.pg_interfaces:
1460                 i.assert_nothing_captured(timeout=timeout, remark=remark)
1461                 timeout = 0.1
1462         finally:
1463             if trace:
1464                 if msg:
1465                     self.logger.debug(f"send_and_assert_no_replies: {msg}")
1466                 self.logger.debug(self.vapi.cli("show trace"))
1467
1468         if stats_diff:
1469             self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1470
1471     def send_and_expect(
1472         self,
1473         intf,
1474         pkts,
1475         output,
1476         n_rx=None,
1477         worker=None,
1478         trace=True,
1479         msg=None,
1480         stats_diff=None,
1481     ):
1482         if stats_diff:
1483             stats_snapshot = self.snapshot_stats(stats_diff)
1484
1485         if not n_rx:
1486             n_rx = 1 if isinstance(pkts, Packet) else len(pkts)
1487         self.pg_send(intf, pkts, worker=worker, trace=trace)
1488         rx = output.get_capture(n_rx)
1489         if trace:
1490             if msg:
1491                 self.logger.debug(f"send_and_expect: {msg}")
1492             self.logger.debug(self.vapi.cli("show trace"))
1493
1494         if stats_diff:
1495             self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1496
1497         return rx
1498
1499     def send_and_expect_load_balancing(
1500         self, input, pkts, outputs, worker=None, trace=True
1501     ):
1502         self.pg_send(input, pkts, worker=worker, trace=trace)
1503         rxs = []
1504         for oo in outputs:
1505             rx = oo._get_capture(1)
1506             self.assertNotEqual(0, len(rx))
1507             rxs.append(rx)
1508         if trace:
1509             self.logger.debug(self.vapi.cli("show trace"))
1510         return rxs
1511
1512     def send_and_expect_some(self, intf, pkts, output, worker=None, trace=True):
1513         self.pg_send(intf, pkts, worker=worker, trace=trace)
1514         rx = output._get_capture(1)
1515         if trace:
1516             self.logger.debug(self.vapi.cli("show trace"))
1517         self.assertTrue(len(rx) > 0)
1518         self.assertTrue(len(rx) < len(pkts))
1519         return rx
1520
1521     def send_and_expect_only(self, intf, pkts, output, timeout=None, stats_diff=None):
1522         if stats_diff:
1523             stats_snapshot = self.snapshot_stats(stats_diff)
1524
1525         self.pg_send(intf, pkts)
1526         rx = output.get_capture(len(pkts))
1527         outputs = [output]
1528         if not timeout:
1529             timeout = 1
1530         for i in self.pg_interfaces:
1531             if i not in outputs:
1532                 i.assert_nothing_captured(timeout=timeout)
1533                 timeout = 0.1
1534
1535         if stats_diff:
1536             self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1537
1538         return rx
1539
1540
1541 def get_testcase_doc_name(test):
1542     return getdoc(test.__class__).splitlines()[0]
1543
1544
1545 def get_test_description(descriptions, test):
1546     short_description = test.shortDescription()
1547     if descriptions and short_description:
1548         return short_description
1549     else:
1550         return str(test)
1551
1552
1553 class TestCaseInfo(object):
1554     def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1555         self.logger = logger
1556         self.tempdir = tempdir
1557         self.vpp_pid = vpp_pid
1558         self.vpp_bin_path = vpp_bin_path
1559         self.core_crash_test = None
1560
1561
1562 class VppTestResult(unittest.TestResult):
1563     """
1564     @property result_string
1565      String variable to store the test case result string.
1566     @property errors
1567      List variable containing 2-tuples of TestCase instances and strings
1568      holding formatted tracebacks. Each tuple represents a test which
1569      raised an unexpected exception.
1570     @property failures
1571      List variable containing 2-tuples of TestCase instances and strings
1572      holding formatted tracebacks. Each tuple represents a test where
1573      a failure was explicitly signalled using the TestCase.assert*()
1574      methods.
1575     """
1576
1577     failed_test_cases_info = set()
1578     core_crash_test_cases_info = set()
1579     current_test_case_info = None
1580
1581     def __init__(self, stream=None, descriptions=None, verbosity=None, runner=None):
1582         """
1583         :param stream File descriptor to store where to report test results.
1584             Set to the standard error stream by default.
1585         :param descriptions Boolean variable to store information if to use
1586             test case descriptions.
1587         :param verbosity Integer variable to store required verbosity level.
1588         """
1589         super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1590         self.stream = stream
1591         self.descriptions = descriptions
1592         self.verbosity = verbosity
1593         self.result_string = None
1594         self.runner = runner
1595         self.printed = []
1596
1597     def addSuccess(self, test):
1598         """
1599         Record a test succeeded result
1600
1601         :param test:
1602
1603         """
1604         if self.current_test_case_info:
1605             self.current_test_case_info.logger.debug(
1606                 "--- addSuccess() %s.%s(%s) called"
1607                 % (test.__class__.__name__, test._testMethodName, test._testMethodDoc)
1608             )
1609         unittest.TestResult.addSuccess(self, test)
1610         self.result_string = colorize("OK", GREEN)
1611
1612         self.send_result_through_pipe(test, PASS)
1613
1614     def addSkip(self, test, reason):
1615         """
1616         Record a test skipped.
1617
1618         :param test:
1619         :param reason:
1620
1621         """
1622         if self.current_test_case_info:
1623             self.current_test_case_info.logger.debug(
1624                 "--- addSkip() %s.%s(%s) called, reason is %s"
1625                 % (
1626                     test.__class__.__name__,
1627                     test._testMethodName,
1628                     test._testMethodDoc,
1629                     reason,
1630                 )
1631             )
1632         unittest.TestResult.addSkip(self, test, reason)
1633         self.result_string = colorize("SKIP", YELLOW)
1634
1635         if reason == "not enough cpus":
1636             self.send_result_through_pipe(test, SKIP_CPU_SHORTAGE)
1637         else:
1638             self.send_result_through_pipe(test, SKIP)
1639
1640     def symlink_failed(self):
1641         if self.current_test_case_info:
1642             try:
1643                 failed_dir = config.failed_dir
1644                 link_path = os.path.join(
1645                     failed_dir,
1646                     "%s-FAILED" % os.path.basename(self.current_test_case_info.tempdir),
1647                 )
1648
1649                 self.current_test_case_info.logger.debug(
1650                     "creating a link to the failed test"
1651                 )
1652                 self.current_test_case_info.logger.debug(
1653                     "os.symlink(%s, %s)"
1654                     % (self.current_test_case_info.tempdir, link_path)
1655                 )
1656                 if os.path.exists(link_path):
1657                     self.current_test_case_info.logger.debug("symlink already exists")
1658                 else:
1659                     os.symlink(self.current_test_case_info.tempdir, link_path)
1660
1661             except Exception as e:
1662                 self.current_test_case_info.logger.error(e)
1663
1664     def send_result_through_pipe(self, test, result):
1665         if hasattr(self, "test_framework_result_pipe"):
1666             pipe = self.test_framework_result_pipe
1667             if pipe:
1668                 pipe.send((test.id(), result))
1669
1670     def log_error(self, test, err, fn_name):
1671         if self.current_test_case_info:
1672             if isinstance(test, unittest.suite._ErrorHolder):
1673                 test_name = test.description
1674             else:
1675                 test_name = "%s.%s(%s)" % (
1676                     test.__class__.__name__,
1677                     test._testMethodName,
1678                     test._testMethodDoc,
1679                 )
1680             self.current_test_case_info.logger.debug(
1681                 "--- %s() %s called, err is %s" % (fn_name, test_name, err)
1682             )
1683             self.current_test_case_info.logger.debug(
1684                 "formatted exception is:\n%s" % "".join(format_exception(*err))
1685             )
1686
1687     def add_error(self, test, err, unittest_fn, error_type):
1688         if error_type == FAIL:
1689             self.log_error(test, err, "addFailure")
1690             error_type_str = colorize("FAIL", RED)
1691         elif error_type == ERROR:
1692             self.log_error(test, err, "addError")
1693             error_type_str = colorize("ERROR", RED)
1694         else:
1695             raise Exception(
1696                 "Error type %s cannot be used to record an "
1697                 "error or a failure" % error_type
1698             )
1699
1700         unittest_fn(self, test, err)
1701         if self.current_test_case_info:
1702             self.result_string = "%s [ temp dir used by test case: %s ]" % (
1703                 error_type_str,
1704                 self.current_test_case_info.tempdir,
1705             )
1706             self.symlink_failed()
1707             self.failed_test_cases_info.add(self.current_test_case_info)
1708             if is_core_present(self.current_test_case_info.tempdir):
1709                 if not self.current_test_case_info.core_crash_test:
1710                     if isinstance(test, unittest.suite._ErrorHolder):
1711                         test_name = str(test)
1712                     else:
1713                         test_name = "'{!s}' ({!s})".format(
1714                             get_testcase_doc_name(test), test.id()
1715                         )
1716                     self.current_test_case_info.core_crash_test = test_name
1717                 self.core_crash_test_cases_info.add(self.current_test_case_info)
1718         else:
1719             self.result_string = "%s [no temp dir]" % error_type_str
1720
1721         self.send_result_through_pipe(test, error_type)
1722
1723     def addFailure(self, test, err):
1724         """
1725         Record a test failed result
1726
1727         :param test:
1728         :param err: error message
1729
1730         """
1731         self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1732
1733     def addError(self, test, err):
1734         """
1735         Record a test error result
1736
1737         :param test:
1738         :param err: error message
1739
1740         """
1741         self.add_error(test, err, unittest.TestResult.addError, ERROR)
1742
1743     def getDescription(self, test):
1744         """
1745         Get test description
1746
1747         :param test:
1748         :returns: test description
1749
1750         """
1751         return get_test_description(self.descriptions, test)
1752
1753     def startTest(self, test):
1754         """
1755         Start a test
1756
1757         :param test:
1758
1759         """
1760
1761         def print_header(test):
1762             if test.__class__ in self.printed:
1763                 return
1764
1765             test_doc = getdoc(test)
1766             if not test_doc:
1767                 raise Exception("No doc string for test '%s'" % test.id())
1768
1769             test_title = test_doc.splitlines()[0].rstrip()
1770             test_title = colorize(test_title, GREEN)
1771             if test.is_tagged_run_solo():
1772                 test_title = colorize(f"SOLO RUN: {test_title}", YELLOW)
1773
1774             # This block may overwrite the colorized title above,
1775             # but we want this to stand out and be fixed
1776             if test.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
1777                 test_title = colorize(f"FIXME with VPP workers: {test_title}", RED)
1778
1779             if test.has_tag(TestCaseTag.FIXME_ASAN):
1780                 test_title = colorize(f"FIXME with ASAN: {test_title}", RED)
1781                 test.skip_fixme_asan()
1782
1783             if is_distro_ubuntu2204 == True and test.has_tag(
1784                 TestCaseTag.FIXME_UBUNTU2204
1785             ):
1786                 test_title = colorize(f"FIXME on Ubuntu-22.04: {test_title}", RED)
1787                 test.skip_fixme_ubuntu2204()
1788
1789             if hasattr(test, "vpp_worker_count"):
1790                 if test.vpp_worker_count == 0:
1791                     test_title += " [main thread only]"
1792                 elif test.vpp_worker_count == 1:
1793                     test_title += " [1 worker thread]"
1794                 else:
1795                     test_title += f" [{test.vpp_worker_count} worker threads]"
1796
1797             if test.__class__.skipped_due_to_cpu_lack:
1798                 test_title = colorize(
1799                     f"{test_title} [skipped - not enough cpus, "
1800                     f"required={test.__class__.get_cpus_required()}, "
1801                     f"available={max_vpp_cpus}]",
1802                     YELLOW,
1803                 )
1804
1805             print(double_line_delim)
1806             print(test_title)
1807             print(double_line_delim)
1808             self.printed.append(test.__class__)
1809
1810         print_header(test)
1811         self.start_test = time.time()
1812         unittest.TestResult.startTest(self, test)
1813         if self.verbosity > 0:
1814             self.stream.writeln("Starting " + self.getDescription(test) + " ...")
1815             self.stream.writeln(single_line_delim)
1816
1817     def stopTest(self, test):
1818         """
1819         Called when the given test has been run
1820
1821         :param test:
1822
1823         """
1824         unittest.TestResult.stopTest(self, test)
1825
1826         if self.verbosity > 0:
1827             self.stream.writeln(single_line_delim)
1828             self.stream.writeln(
1829                 "%-73s%s" % (self.getDescription(test), self.result_string)
1830             )
1831             self.stream.writeln(single_line_delim)
1832         else:
1833             self.stream.writeln(
1834                 "%-68s %4.2f %s"
1835                 % (
1836                     self.getDescription(test),
1837                     time.time() - self.start_test,
1838                     self.result_string,
1839                 )
1840             )
1841
1842         self.send_result_through_pipe(test, TEST_RUN)
1843
1844     def printErrors(self):
1845         """
1846         Print errors from running the test case
1847         """
1848         if len(self.errors) > 0 or len(self.failures) > 0:
1849             self.stream.writeln()
1850             self.printErrorList("ERROR", self.errors)
1851             self.printErrorList("FAIL", self.failures)
1852
1853         # ^^ that is the last output from unittest before summary
1854         if not self.runner.print_summary:
1855             devnull = unittest.runner._WritelnDecorator(open(os.devnull, "w"))
1856             self.stream = devnull
1857             self.runner.stream = devnull
1858
1859     def printErrorList(self, flavour, errors):
1860         """
1861         Print error list to the output stream together with error type
1862         and test case description.
1863
1864         :param flavour: error type
1865         :param errors: iterable errors
1866
1867         """
1868         for test, err in errors:
1869             self.stream.writeln(double_line_delim)
1870             self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
1871             self.stream.writeln(single_line_delim)
1872             self.stream.writeln("%s" % err)
1873
1874
1875 class VppTestRunner(unittest.TextTestRunner):
1876     """
1877     A basic test runner implementation which prints results to standard error.
1878     """
1879
1880     @property
1881     def resultclass(self):
1882         """Class maintaining the results of the tests"""
1883         return VppTestResult
1884
1885     def __init__(
1886         self,
1887         keep_alive_pipe=None,
1888         descriptions=True,
1889         verbosity=1,
1890         result_pipe=None,
1891         failfast=False,
1892         buffer=False,
1893         resultclass=None,
1894         print_summary=True,
1895         **kwargs,
1896     ):
1897         # ignore stream setting here, use hard-coded stdout to be in sync
1898         # with prints from VppTestCase methods ...
1899         super(VppTestRunner, self).__init__(
1900             sys.stdout, descriptions, verbosity, failfast, buffer, resultclass, **kwargs
1901         )
1902         KeepAliveReporter.pipe = keep_alive_pipe
1903
1904         self.orig_stream = self.stream
1905         self.resultclass.test_framework_result_pipe = result_pipe
1906
1907         self.print_summary = print_summary
1908
1909     def _makeResult(self):
1910         return self.resultclass(self.stream, self.descriptions, self.verbosity, self)
1911
1912     def run(self, test):
1913         """
1914         Run the tests
1915
1916         :param test:
1917
1918         """
1919         faulthandler.enable()  # emit stack trace to stderr if killed by signal
1920
1921         result = super(VppTestRunner, self).run(test)
1922         if not self.print_summary:
1923             self.stream = self.orig_stream
1924             result.stream = self.orig_stream
1925         return result
1926
1927
1928 class Worker(Thread):
1929     def __init__(self, executable_args, logger, env=None, *args, **kwargs):
1930         super(Worker, self).__init__(*args, **kwargs)
1931         self.logger = logger
1932         self.args = executable_args
1933         if hasattr(self, "testcase") and self.testcase.debug_all:
1934             if self.testcase.debug_gdbserver:
1935                 self.args = [
1936                     "/usr/bin/gdbserver",
1937                     "localhost:{port}".format(port=self.testcase.gdbserver_port),
1938                 ] + args
1939             elif self.testcase.debug_gdb and hasattr(self, "wait_for_gdb"):
1940                 self.args.append(self.wait_for_gdb)
1941         self.app_bin = executable_args[0]
1942         self.app_name = os.path.basename(self.app_bin)
1943         if hasattr(self, "role"):
1944             self.app_name += " {role}".format(role=self.role)
1945         self.process = None
1946         self.result = None
1947         env = {} if env is None else env
1948         self.env = copy.deepcopy(env)
1949
1950     def wait_for_enter(self):
1951         if not hasattr(self, "testcase"):
1952             return
1953         if self.testcase.debug_all and self.testcase.debug_gdbserver:
1954             print()
1955             print(double_line_delim)
1956             print(
1957                 "Spawned GDB Server for '{app}' with PID: {pid}".format(
1958                     app=self.app_name, pid=self.process.pid
1959                 )
1960             )
1961         elif self.testcase.debug_all and self.testcase.debug_gdb:
1962             print()
1963             print(double_line_delim)
1964             print(
1965                 "Spawned '{app}' with PID: {pid}".format(
1966                     app=self.app_name, pid=self.process.pid
1967                 )
1968             )
1969         else:
1970             return
1971         print(single_line_delim)
1972         print("You can debug '{app}' using:".format(app=self.app_name))
1973         if self.testcase.debug_gdbserver:
1974             print(
1975                 "sudo gdb "
1976                 + self.app_bin
1977                 + " -ex 'target remote localhost:{port}'".format(
1978                     port=self.testcase.gdbserver_port
1979                 )
1980             )
1981             print(
1982                 "Now is the time to attach gdb by running the above "
1983                 "command, set up breakpoints etc., then resume from "
1984                 "within gdb by issuing the 'continue' command"
1985             )
1986             self.testcase.gdbserver_port += 1
1987         elif self.testcase.debug_gdb:
1988             print(
1989                 "sudo gdb "
1990                 + self.app_bin
1991                 + " -ex 'attach {pid}'".format(pid=self.process.pid)
1992             )
1993             print(
1994                 "Now is the time to attach gdb by running the above "
1995                 "command and set up breakpoints etc., then resume from"
1996                 " within gdb by issuing the 'continue' command"
1997             )
1998         print(single_line_delim)
1999         input("Press ENTER to continue running the testcase...")
2000
2001     def run(self):
2002         executable = self.args[0]
2003         if not os.path.exists(executable) or not os.access(
2004             executable, os.F_OK | os.X_OK
2005         ):
2006             # Exit code that means some system file did not exist,
2007             # could not be opened, or had some other kind of error.
2008             self.result = os.EX_OSFILE
2009             raise EnvironmentError(
2010                 "executable '%s' is not found or executable." % executable
2011             )
2012         self.logger.debug(
2013             "Running executable '{app}': '{cmd}'".format(
2014                 app=self.app_name, cmd=" ".join(self.args)
2015             )
2016         )
2017         env = os.environ.copy()
2018         env.update(self.env)
2019         env["CK_LOG_FILE_NAME"] = "-"
2020         self.process = subprocess.Popen(
2021             ["stdbuf", "-o0", "-e0"] + self.args,
2022             shell=False,
2023             env=env,
2024             preexec_fn=os.setpgrp,
2025             stdout=subprocess.PIPE,
2026             stderr=subprocess.PIPE,
2027         )
2028         self.wait_for_enter()
2029         out, err = self.process.communicate()
2030         self.logger.debug("Finished running `{app}'".format(app=self.app_name))
2031         self.logger.info("Return code is `%s'" % self.process.returncode)
2032         self.logger.info(single_line_delim)
2033         self.logger.info(
2034             "Executable `{app}' wrote to stdout:".format(app=self.app_name)
2035         )
2036         self.logger.info(single_line_delim)
2037         self.logger.info(out.decode("utf-8"))
2038         self.logger.info(single_line_delim)
2039         self.logger.info(
2040             "Executable `{app}' wrote to stderr:".format(app=self.app_name)
2041         )
2042         self.logger.info(single_line_delim)
2043         self.logger.info(err.decode("utf-8"))
2044         self.logger.info(single_line_delim)
2045         self.result = self.process.returncode
2046
2047
2048 if __name__ == "__main__":
2049     pass