tests: add a generalised counter assert function
[vpp.git] / test / framework.py
1 #!/usr/bin/env python3
2
3 from __future__ import print_function
4 import gc
5 import logging
6 import sys
7 import os
8 import select
9 import signal
10 import subprocess
11 import unittest
12 import re
13 import time
14 import faulthandler
15 import random
16 import copy
17 import platform
18 import shutil
19 from collections import deque
20 from threading import Thread, Event
21 from inspect import getdoc, isclass
22 from traceback import format_exception
23 from logging import FileHandler, DEBUG, Formatter
24 from enum import Enum
25 from abc import ABC, abstractmethod
26 from struct import pack, unpack
27
28 import scapy.compat
29 from scapy.packet import Raw, Packet
30 from config import config, available_cpus, num_cpus, max_vpp_cpus
31 import hook as hookmodule
32 from vpp_pg_interface import VppPGInterface
33 from vpp_sub_interface import VppSubInterface
34 from vpp_lo_interface import VppLoInterface
35 from vpp_bvi_interface import VppBviInterface
36 from vpp_papi_provider import VppPapiProvider
37 from vpp_papi import VppEnum
38 import vpp_papi
39 from vpp_papi.vpp_stats import VPPStats
40 from vpp_papi.vpp_transport_socket import VppTransportSocketIOError
41 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
42     get_logger, colorize
43 from vpp_object import VppObjectRegistry
44 from util import ppp, is_core_present
45 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
46 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
47 from scapy.layers.inet6 import ICMPv6EchoReply
48
49
50 logger = logging.getLogger(__name__)
51
52 # Set up an empty logger for the testcase that can be overridden as necessary
53 null_logger = logging.getLogger('VppTestCase')
54 null_logger.addHandler(logging.NullHandler())
55
56 PASS = 0
57 FAIL = 1
58 ERROR = 2
59 SKIP = 3
60 TEST_RUN = 4
61 SKIP_CPU_SHORTAGE = 5
62
63
64 if config.debug_framework:
65     import debug_internal
66
67 """
68   Test framework module.
69
70   The module provides a set of tools for constructing and running tests and
71   representing the results.
72 """
73
74
75 class VppDiedError(Exception):
76     """ exception for reporting that the subprocess has died."""
77
78     signals_by_value = {v: k for k, v in signal.__dict__.items() if
79                         k.startswith('SIG') and not k.startswith('SIG_')}
80
81     def __init__(self, rv=None, testcase=None, method_name=None):
82         self.rv = rv
83         self.signal_name = None
84         self.testcase = testcase
85         self.method_name = method_name
86
87         try:
88             self.signal_name = VppDiedError.signals_by_value[-rv]
89         except (KeyError, TypeError):
90             pass
91
92         if testcase is None and method_name is None:
93             in_msg = ''
94         else:
95             in_msg = ' while running %s.%s' % (testcase, method_name)
96
97         if self.rv:
98             msg = "VPP subprocess died unexpectedly%s with return code: %d%s."\
99                 % (in_msg, self.rv, ' [%s]' %
100                    (self.signal_name if
101                     self.signal_name is not None else ''))
102         else:
103             msg = "VPP subprocess died unexpectedly%s." % in_msg
104
105         super(VppDiedError, self).__init__(msg)
106
107
108 class _PacketInfo(object):
109     """Private class to create packet info object.
110
111     Help process information about the next packet.
112     Set variables to default values.
113     """
114     #: Store the index of the packet.
115     index = -1
116     #: Store the index of the source packet generator interface of the packet.
117     src = -1
118     #: Store the index of the destination packet generator interface
119     #: of the packet.
120     dst = -1
121     #: Store expected ip version
122     ip = -1
123     #: Store expected upper protocol
124     proto = -1
125     #: Store the copy of the former packet.
126     data = None
127
128     def __eq__(self, other):
129         index = self.index == other.index
130         src = self.src == other.src
131         dst = self.dst == other.dst
132         data = self.data == other.data
133         return index and src and dst and data
134
135
136 def pump_output(testclass):
137     """ pump output from vpp stdout/stderr to proper queues """
138     stdout_fragment = ""
139     stderr_fragment = ""
140     while not testclass.pump_thread_stop_flag.is_set():
141         readable = select.select([testclass.vpp.stdout.fileno(),
142                                   testclass.vpp.stderr.fileno(),
143                                   testclass.pump_thread_wakeup_pipe[0]],
144                                  [], [])[0]
145         if testclass.vpp.stdout.fileno() in readable:
146             read = os.read(testclass.vpp.stdout.fileno(), 102400)
147             if len(read) > 0:
148                 split = read.decode('ascii',
149                                     errors='backslashreplace').splitlines(True)
150                 if len(stdout_fragment) > 0:
151                     split[0] = "%s%s" % (stdout_fragment, split[0])
152                 if len(split) > 0 and split[-1].endswith("\n"):
153                     limit = None
154                 else:
155                     limit = -1
156                     stdout_fragment = split[-1]
157                 testclass.vpp_stdout_deque.extend(split[:limit])
158                 if not config.cache_vpp_output:
159                     for line in split[:limit]:
160                         testclass.logger.info(
161                             "VPP STDOUT: %s" % line.rstrip("\n"))
162         if testclass.vpp.stderr.fileno() in readable:
163             read = os.read(testclass.vpp.stderr.fileno(), 102400)
164             if len(read) > 0:
165                 split = read.decode('ascii',
166                                     errors='backslashreplace').splitlines(True)
167                 if len(stderr_fragment) > 0:
168                     split[0] = "%s%s" % (stderr_fragment, split[0])
169                 if len(split) > 0 and split[-1].endswith("\n"):
170                     limit = None
171                 else:
172                     limit = -1
173                     stderr_fragment = split[-1]
174
175                 testclass.vpp_stderr_deque.extend(split[:limit])
176                 if not config.cache_vpp_output:
177                     for line in split[:limit]:
178                         testclass.logger.error(
179                             "VPP STDERR: %s" % line.rstrip("\n"))
180                         # ignoring the dummy pipe here intentionally - the
181                         # flag will take care of properly terminating the loop
182
183
184 def _is_platform_aarch64():
185     return platform.machine() == 'aarch64'
186
187
188 is_platform_aarch64 = _is_platform_aarch64()
189
190
191 class KeepAliveReporter(object):
192     """
193     Singleton object which reports test start to parent process
194     """
195     _shared_state = {}
196
197     def __init__(self):
198         self.__dict__ = self._shared_state
199         self._pipe = None
200
201     @property
202     def pipe(self):
203         return self._pipe
204
205     @pipe.setter
206     def pipe(self, pipe):
207         if self._pipe is not None:
208             raise Exception("Internal error - pipe should only be set once.")
209         self._pipe = pipe
210
211     def send_keep_alive(self, test, desc=None):
212         """
213         Write current test tmpdir & desc to keep-alive pipe to signal liveness
214         """
215         if self.pipe is None:
216             # if not running forked..
217             return
218
219         if isclass(test):
220             desc = '%s (%s)' % (desc, unittest.util.strclass(test))
221         else:
222             desc = test.id()
223
224         self.pipe.send((desc, config.vpp, test.tempdir, test.vpp.pid))
225
226
227 class TestCaseTag(Enum):
228     # marks the suites that must run at the end
229     # using only a single test runner
230     RUN_SOLO = 1
231     # marks the suites broken on VPP multi-worker
232     FIXME_VPP_WORKERS = 2
233     # marks the suites broken when ASan is enabled
234     FIXME_ASAN = 3
235
236
237 def create_tag_decorator(e):
238     def decorator(cls):
239         try:
240             cls.test_tags.append(e)
241         except AttributeError:
242             cls.test_tags = [e]
243         return cls
244     return decorator
245
246
247 tag_run_solo = create_tag_decorator(TestCaseTag.RUN_SOLO)
248 tag_fixme_vpp_workers = create_tag_decorator(TestCaseTag.FIXME_VPP_WORKERS)
249 tag_fixme_asan = create_tag_decorator(TestCaseTag.FIXME_ASAN)
250
251
252 class DummyVpp:
253     returncode = None
254     pid = 0xcafebafe
255
256     def poll(self):
257         pass
258
259     def terminate(self):
260         pass
261
262
263 class CPUInterface(ABC):
264     cpus = []
265     skipped_due_to_cpu_lack = False
266
267     @classmethod
268     @abstractmethod
269     def get_cpus_required(cls):
270         pass
271
272     @classmethod
273     def assign_cpus(cls, cpus):
274         cls.cpus = cpus
275
276
277 class VppTestCase(CPUInterface, unittest.TestCase):
278     """This subclass is a base class for VPP test cases that are implemented as
279     classes. It provides methods to create and run test case.
280     """
281
282     extra_vpp_statseg_config = ""
283     extra_vpp_punt_config = []
284     extra_vpp_plugin_config = []
285     logger = null_logger
286     vapi_response_timeout = 5
287     remove_configured_vpp_objects_on_tear_down = True
288
289     @property
290     def packet_infos(self):
291         """List of packet infos"""
292         return self._packet_infos
293
294     @classmethod
295     def get_packet_count_for_if_idx(cls, dst_if_index):
296         """Get the number of packet info for specified destination if index"""
297         if dst_if_index in cls._packet_count_for_dst_if_idx:
298             return cls._packet_count_for_dst_if_idx[dst_if_index]
299         else:
300             return 0
301
302     @classmethod
303     def has_tag(cls, tag):
304         """ if the test case has a given tag - return true """
305         try:
306             return tag in cls.test_tags
307         except AttributeError:
308             pass
309         return False
310
311     @classmethod
312     def is_tagged_run_solo(cls):
313         """ if the test case class is timing-sensitive - return true """
314         return cls.has_tag(TestCaseTag.RUN_SOLO)
315
316     @classmethod
317     def skip_fixme_asan(cls):
318         """ if @tag_fixme_asan & ASan is enabled - mark for skip """
319         if cls.has_tag(TestCaseTag.FIXME_ASAN):
320             vpp_extra_cmake_args = os.environ.get('VPP_EXTRA_CMAKE_ARGS', '')
321             if 'DVPP_ENABLE_SANITIZE_ADDR=ON' in vpp_extra_cmake_args:
322                 cls = unittest.skip("Skipping @tag_fixme_asan tests")(cls)
323
324     @classmethod
325     def instance(cls):
326         """Return the instance of this testcase"""
327         return cls.test_instance
328
329     @classmethod
330     def set_debug_flags(cls, d):
331         cls.gdbserver_port = 7777
332         cls.debug_core = False
333         cls.debug_gdb = False
334         cls.debug_gdbserver = False
335         cls.debug_all = False
336         cls.debug_attach = False
337         if d is None:
338             return
339         dl = d.lower()
340         if dl == "core":
341             cls.debug_core = True
342         elif dl == "gdb" or dl == "gdb-all":
343             cls.debug_gdb = True
344         elif dl == "gdbserver" or dl == "gdbserver-all":
345             cls.debug_gdbserver = True
346         elif dl == "attach":
347             cls.debug_attach = True
348         else:
349             raise Exception("Unrecognized DEBUG option: '%s'" % d)
350         if dl == "gdb-all" or dl == "gdbserver-all":
351             cls.debug_all = True
352
353     @classmethod
354     def get_vpp_worker_count(cls):
355         if not hasattr(cls, "vpp_worker_count"):
356             if cls.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
357                 cls.vpp_worker_count = 0
358             else:
359                 cls.vpp_worker_count = config.vpp_worker_count
360         return cls.vpp_worker_count
361
362     @classmethod
363     def get_cpus_required(cls):
364         return 1 + cls.get_vpp_worker_count()
365
366     @classmethod
367     def setUpConstants(cls):
368         """ Set-up the test case class based on environment variables """
369         cls.step = config.step
370         cls.plugin_path = ":".join(config.vpp_plugin_dir)
371         cls.test_plugin_path = ":".join(config.vpp_test_plugin_dir)
372         cls.extern_plugin_path = ":".join(config.extern_plugin_dir)
373         debug_cli = ""
374         if cls.step or cls.debug_gdb or cls.debug_gdbserver:
375             debug_cli = "cli-listen localhost:5002"
376         size = re.search(r"\d+[gG]", config.coredump_size)
377         if size:
378             coredump_size = f"coredump-size {config.coredump_size}".lower()
379         else:
380             coredump_size = "coredump-size unlimited"
381         default_variant = config.variant
382         if default_variant is not None:
383             default_variant = "defaults { %s 100 }" % default_variant
384         else:
385             default_variant = ""
386
387         api_fuzzing = config.api_fuzz
388         if api_fuzzing is None:
389             api_fuzzing = 'off'
390
391         cls.vpp_cmdline = [
392             config.vpp,
393             "unix", "{", "nodaemon", debug_cli, "full-coredump",
394             coredump_size, "runtime-dir", cls.tempdir, "}",
395             "api-trace", "{", "on", "}",
396             "api-segment", "{", "prefix", cls.get_api_segment_prefix(), "}",
397             "cpu", "{", "main-core", str(cls.cpus[0]), ]
398         if cls.extern_plugin_path not in (None, ""):
399             cls.extra_vpp_plugin_config.append(
400                 "add-path %s" % cls.extern_plugin_path)
401         if cls.get_vpp_worker_count():
402             cls.vpp_cmdline.extend([
403                 "corelist-workers", ",".join([str(x) for x in cls.cpus[1:]])])
404         cls.vpp_cmdline.extend([
405             "}",
406             "physmem", "{", "max-size", "32m", "}",
407             "statseg", "{", "socket-name", cls.get_stats_sock_path(),
408             cls.extra_vpp_statseg_config, "}",
409             "socksvr", "{", "socket-name", cls.get_api_sock_path(), "}",
410             "node { ", default_variant, "}",
411             "api-fuzz {", api_fuzzing, "}",
412             "plugins", "{", "plugin", "dpdk_plugin.so", "{", "disable", "}",
413             "plugin", "rdma_plugin.so", "{", "disable", "}",
414             "plugin", "lisp_unittest_plugin.so", "{", "enable", "}",
415             "plugin", "unittest_plugin.so", "{", "enable", "}"
416         ] + cls.extra_vpp_plugin_config + ["}", ])
417
418         if cls.extra_vpp_punt_config is not None:
419             cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
420
421         if not cls.debug_attach:
422             cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
423             cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
424
425     @classmethod
426     def wait_for_enter(cls):
427         if cls.debug_gdbserver:
428             print(double_line_delim)
429             print("Spawned GDB server with PID: %d" % cls.vpp.pid)
430         elif cls.debug_gdb:
431             print(double_line_delim)
432             print("Spawned VPP with PID: %d" % cls.vpp.pid)
433         else:
434             cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
435             return
436         print(single_line_delim)
437         print("You can debug VPP using:")
438         if cls.debug_gdbserver:
439             print(f"sudo gdb {config.vpp} "
440                   f"-ex 'target remote localhost:{cls.gdbserver_port}'")
441             print("Now is the time to attach gdb by running the above "
442                   "command, set up breakpoints etc., then resume VPP from "
443                   "within gdb by issuing the 'continue' command")
444             cls.gdbserver_port += 1
445         elif cls.debug_gdb:
446             print(f"sudo gdb {config.vpp} -ex 'attach {cls.vpp.pid}'")
447             print("Now is the time to attach gdb by running the above "
448                   "command and set up breakpoints etc., then resume VPP from"
449                   " within gdb by issuing the 'continue' command")
450         print(single_line_delim)
451         input("Press ENTER to continue running the testcase...")
452
453     @classmethod
454     def attach_vpp(cls):
455         cls.vpp = DummyVpp()
456
457     @classmethod
458     def run_vpp(cls):
459         cls.logger.debug(f"Assigned cpus: {cls.cpus}")
460         cmdline = cls.vpp_cmdline
461
462         if cls.debug_gdbserver:
463             gdbserver = '/usr/bin/gdbserver'
464             if not os.path.isfile(gdbserver) or\
465                     not os.access(gdbserver, os.X_OK):
466                 raise Exception("gdbserver binary '%s' does not exist or is "
467                                 "not executable" % gdbserver)
468
469             cmdline = [gdbserver, 'localhost:{port}'
470                        .format(port=cls.gdbserver_port)] + cls.vpp_cmdline
471             cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
472
473         try:
474             cls.vpp = subprocess.Popen(cmdline,
475                                        stdout=subprocess.PIPE,
476                                        stderr=subprocess.PIPE)
477         except subprocess.CalledProcessError as e:
478             cls.logger.critical("Subprocess returned with non-0 return code: ("
479                                 "%s)", e.returncode)
480             raise
481         except OSError as e:
482             cls.logger.critical("Subprocess returned with OS error: "
483                                 "(%s) %s", e.errno, e.strerror)
484             raise
485         except Exception as e:
486             cls.logger.exception("Subprocess returned unexpected from "
487                                  "%s:", cmdline)
488             raise
489
490         cls.wait_for_enter()
491
492     @classmethod
493     def wait_for_coredump(cls):
494         corefile = cls.tempdir + "/core"
495         if os.path.isfile(corefile):
496             cls.logger.error("Waiting for coredump to complete: %s", corefile)
497             curr_size = os.path.getsize(corefile)
498             deadline = time.time() + 60
499             ok = False
500             while time.time() < deadline:
501                 cls.sleep(1)
502                 size = curr_size
503                 curr_size = os.path.getsize(corefile)
504                 if size == curr_size:
505                     ok = True
506                     break
507             if not ok:
508                 cls.logger.error("Timed out waiting for coredump to complete:"
509                                  " %s", corefile)
510             else:
511                 cls.logger.error("Coredump complete: %s, size %d",
512                                  corefile, curr_size)
513
514     @classmethod
515     def get_stats_sock_path(cls):
516         return "%s/stats.sock" % cls.tempdir
517
518     @classmethod
519     def get_api_sock_path(cls):
520         return "%s/api.sock" % cls.tempdir
521
522     @classmethod
523     def get_api_segment_prefix(cls):
524         return os.path.basename(cls.tempdir)  # Only used for VAPI
525
526     @classmethod
527     def get_tempdir(cls):
528         tmpdir = f"{config.tmp_dir}/vpp-unittest-{cls.__name__}"
529         if config.wipe_tmp_dir:
530             shutil.rmtree(tmpdir, ignore_errors=True)
531         os.mkdir(tmpdir)
532         return tmpdir
533
534     @classmethod
535     def create_file_handler(cls):
536         if config.log_dir is None:
537             cls.file_handler = FileHandler(f"{cls.tempdir}/log.txt")
538             return
539
540         logdir = f"{config.log_dir}/vpp-unittest-{cls.__name__}"
541         if config.wipe_tmp_dir:
542             shutil.rmtree(logdir, ignore_errors=True)
543         os.mkdir(logdir)
544         cls.file_handler = FileHandler(f"{logdir}/log.txt")
545
546     @classmethod
547     def setUpClass(cls):
548         """
549         Perform class setup before running the testcase
550         Remove shared memory files, start vpp and connect the vpp-api
551         """
552         super(VppTestCase, cls).setUpClass()
553         cls.logger = get_logger(cls.__name__)
554         random.seed(config.rnd_seed)
555         if hasattr(cls, 'parallel_handler'):
556             cls.logger.addHandler(cls.parallel_handler)
557             cls.logger.propagate = False
558         cls.set_debug_flags(config.debug)
559         cls.tempdir = cls.get_tempdir()
560         cls.create_file_handler()
561         cls.file_handler.setFormatter(
562             Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
563                       datefmt="%H:%M:%S"))
564         cls.file_handler.setLevel(DEBUG)
565         cls.logger.addHandler(cls.file_handler)
566         cls.logger.debug("--- setUpClass() for %s called ---" % cls.__name__)
567         os.chdir(cls.tempdir)
568         cls.logger.info("Temporary dir is %s, api socket is %s",
569                         cls.tempdir, cls.get_api_sock_path())
570         cls.logger.debug("Random seed is %s", config.rnd_seed)
571         cls.setUpConstants()
572         cls.reset_packet_infos()
573         cls._pcaps = []
574         cls._old_pcaps = []
575         cls.verbose = 0
576         cls.vpp_dead = False
577         cls.registry = VppObjectRegistry()
578         cls.vpp_startup_failed = False
579         cls.reporter = KeepAliveReporter()
580         # need to catch exceptions here because if we raise, then the cleanup
581         # doesn't get called and we might end with a zombie vpp
582         try:
583             if cls.debug_attach:
584                 cls.attach_vpp()
585             else:
586                 cls.run_vpp()
587             cls.reporter.send_keep_alive(cls, 'setUpClass')
588             VppTestResult.current_test_case_info = TestCaseInfo(
589                 cls.logger, cls.tempdir, cls.vpp.pid, config.vpp)
590             cls.vpp_stdout_deque = deque()
591             cls.vpp_stderr_deque = deque()
592             if not cls.debug_attach:
593                 cls.pump_thread_stop_flag = Event()
594                 cls.pump_thread_wakeup_pipe = os.pipe()
595                 cls.pump_thread = Thread(target=pump_output, args=(cls,))
596                 cls.pump_thread.daemon = True
597                 cls.pump_thread.start()
598             if cls.debug_gdb or cls.debug_gdbserver or cls.debug_attach:
599                 cls.vapi_response_timeout = 0
600             cls.vapi = VppPapiProvider(cls.__name__, cls,
601                                        cls.vapi_response_timeout)
602             if cls.step:
603                 hook = hookmodule.StepHook(cls)
604             else:
605                 hook = hookmodule.PollHook(cls)
606             cls.vapi.register_hook(hook)
607             cls.statistics = VPPStats(socketname=cls.get_stats_sock_path())
608             try:
609                 hook.poll_vpp()
610             except VppDiedError:
611                 cls.vpp_startup_failed = True
612                 cls.logger.critical(
613                     "VPP died shortly after startup, check the"
614                     " output to standard error for possible cause")
615                 raise
616             try:
617                 cls.vapi.connect()
618             except (vpp_papi.VPPIOError, Exception) as e:
619                 cls.logger.debug("Exception connecting to vapi: %s" % e)
620                 cls.vapi.disconnect()
621
622                 if cls.debug_gdbserver:
623                     print(colorize("You're running VPP inside gdbserver but "
624                                    "VPP-API connection failed, did you forget "
625                                    "to 'continue' VPP from within gdb?", RED))
626                 raise e
627             if cls.debug_attach:
628                 last_line = cls.vapi.cli("show thread").split("\n")[-2]
629                 cls.vpp_worker_count = int(last_line.split(" ")[0])
630                 print("Detected VPP with %s workers." % cls.vpp_worker_count)
631         except vpp_papi.VPPRuntimeError as e:
632             cls.logger.debug("%s" % e)
633             cls.quit()
634             raise e
635         except Exception as e:
636             cls.logger.debug("Exception connecting to VPP: %s" % e)
637             cls.quit()
638             raise e
639
640     @classmethod
641     def _debug_quit(cls):
642         if (cls.debug_gdbserver or cls.debug_gdb):
643             try:
644                 cls.vpp.poll()
645
646                 if cls.vpp.returncode is None:
647                     print()
648                     print(double_line_delim)
649                     print("VPP or GDB server is still running")
650                     print(single_line_delim)
651                     input("When done debugging, press ENTER to kill the "
652                           "process and finish running the testcase...")
653             except AttributeError:
654                 pass
655
656     @classmethod
657     def quit(cls):
658         """
659         Disconnect vpp-api, kill vpp and cleanup shared memory files
660         """
661         cls._debug_quit()
662
663         # first signal that we want to stop the pump thread, then wake it up
664         if hasattr(cls, 'pump_thread_stop_flag'):
665             cls.pump_thread_stop_flag.set()
666         if hasattr(cls, 'pump_thread_wakeup_pipe'):
667             os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
668         if hasattr(cls, 'pump_thread'):
669             cls.logger.debug("Waiting for pump thread to stop")
670             cls.pump_thread.join()
671         if hasattr(cls, 'vpp_stderr_reader_thread'):
672             cls.logger.debug("Waiting for stderr pump to stop")
673             cls.vpp_stderr_reader_thread.join()
674
675         if hasattr(cls, 'vpp'):
676             if hasattr(cls, 'vapi'):
677                 cls.logger.debug(cls.vapi.vpp.get_stats())
678                 cls.logger.debug("Disconnecting class vapi client on %s",
679                                  cls.__name__)
680                 cls.vapi.disconnect()
681                 cls.logger.debug("Deleting class vapi attribute on %s",
682                                  cls.__name__)
683                 del cls.vapi
684             cls.vpp.poll()
685             if not cls.debug_attach and cls.vpp.returncode is None:
686                 cls.wait_for_coredump()
687                 cls.logger.debug("Sending TERM to vpp")
688                 cls.vpp.terminate()
689                 cls.logger.debug("Waiting for vpp to die")
690                 try:
691                     outs, errs = cls.vpp.communicate(timeout=5)
692                 except subprocess.TimeoutExpired:
693                     cls.vpp.kill()
694                     outs, errs = cls.vpp.communicate()
695             cls.logger.debug("Deleting class vpp attribute on %s",
696                              cls.__name__)
697             if not cls.debug_attach:
698                 cls.vpp.stdout.close()
699                 cls.vpp.stderr.close()
700             del cls.vpp
701
702         if cls.vpp_startup_failed:
703             stdout_log = cls.logger.info
704             stderr_log = cls.logger.critical
705         else:
706             stdout_log = cls.logger.info
707             stderr_log = cls.logger.info
708
709         if hasattr(cls, 'vpp_stdout_deque'):
710             stdout_log(single_line_delim)
711             stdout_log('VPP output to stdout while running %s:', cls.__name__)
712             stdout_log(single_line_delim)
713             vpp_output = "".join(cls.vpp_stdout_deque)
714             with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
715                 f.write(vpp_output)
716             stdout_log('\n%s', vpp_output)
717             stdout_log(single_line_delim)
718
719         if hasattr(cls, 'vpp_stderr_deque'):
720             stderr_log(single_line_delim)
721             stderr_log('VPP output to stderr while running %s:', cls.__name__)
722             stderr_log(single_line_delim)
723             vpp_output = "".join(cls.vpp_stderr_deque)
724             with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
725                 f.write(vpp_output)
726             stderr_log('\n%s', vpp_output)
727             stderr_log(single_line_delim)
728
729     @classmethod
730     def tearDownClass(cls):
731         """ Perform final cleanup after running all tests in this test-case """
732         cls.logger.debug("--- tearDownClass() for %s called ---" %
733                          cls.__name__)
734         cls.reporter.send_keep_alive(cls, 'tearDownClass')
735         cls.quit()
736         cls.file_handler.close()
737         cls.reset_packet_infos()
738         if config.debug_framework:
739             debug_internal.on_tear_down_class(cls)
740
741     def show_commands_at_teardown(self):
742         """ Allow subclass specific teardown logging additions."""
743         self.logger.info("--- No test specific show commands provided. ---")
744
745     def tearDown(self):
746         """ Show various debug prints after each test """
747         self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
748                           (self.__class__.__name__, self._testMethodName,
749                            self._testMethodDoc))
750
751         try:
752             if not self.vpp_dead:
753                 self.logger.debug(self.vapi.cli("show trace max 1000"))
754                 self.logger.info(self.vapi.ppcli("show interface"))
755                 self.logger.info(self.vapi.ppcli("show hardware"))
756                 self.logger.info(self.statistics.set_errors_str())
757                 self.logger.info(self.vapi.ppcli("show run"))
758                 self.logger.info(self.vapi.ppcli("show log"))
759                 self.logger.info(self.vapi.ppcli("show bihash"))
760                 self.logger.info("Logging testcase specific show commands.")
761                 self.show_commands_at_teardown()
762                 if self.remove_configured_vpp_objects_on_tear_down:
763                     self.registry.remove_vpp_config(self.logger)
764             # Save/Dump VPP api trace log
765             m = self._testMethodName
766             api_trace = "vpp_api_trace.%s.%d.log" % (m, self.vpp.pid)
767             tmp_api_trace = "/tmp/%s" % api_trace
768             vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
769             self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
770             self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
771                                                     vpp_api_trace_log))
772             os.rename(tmp_api_trace, vpp_api_trace_log)
773         except VppTransportSocketIOError:
774             self.logger.debug("VppTransportSocketIOError: Vpp dead. "
775                               "Cannot log show commands.")
776             self.vpp_dead = True
777         else:
778             self.registry.unregister_all(self.logger)
779
780     def setUp(self):
781         """ Clear trace before running each test"""
782         super(VppTestCase, self).setUp()
783         self.reporter.send_keep_alive(self)
784         if self.vpp_dead:
785             raise VppDiedError(rv=None, testcase=self.__class__.__name__,
786                                method_name=self._testMethodName)
787         self.sleep(.1, "during setUp")
788         self.vpp_stdout_deque.append(
789             "--- test setUp() for %s.%s(%s) starts here ---\n" %
790             (self.__class__.__name__, self._testMethodName,
791              self._testMethodDoc))
792         self.vpp_stderr_deque.append(
793             "--- test setUp() for %s.%s(%s) starts here ---\n" %
794             (self.__class__.__name__, self._testMethodName,
795              self._testMethodDoc))
796         self.vapi.cli("clear trace")
797         # store the test instance inside the test class - so that objects
798         # holding the class can access instance methods (like assertEqual)
799         type(self).test_instance = self
800
801     @classmethod
802     def pg_enable_capture(cls, interfaces=None):
803         """
804         Enable capture on packet-generator interfaces
805
806         :param interfaces: iterable interface indexes (if None,
807                            use self.pg_interfaces)
808
809         """
810         if interfaces is None:
811             interfaces = cls.pg_interfaces
812         for i in interfaces:
813             i.enable_capture()
814
815     @classmethod
816     def register_pcap(cls, intf, worker):
817         """ Register a pcap in the testclass """
818         # add to the list of captures with current timestamp
819         cls._pcaps.append((intf, worker))
820
821     @classmethod
822     def get_vpp_time(cls):
823         # processes e.g. "Time now 2.190522, Wed, 11 Mar 2020 17:29:54 GMT"
824         # returns float("2.190522")
825         timestr = cls.vapi.cli('show clock')
826         head, sep, tail = timestr.partition(',')
827         head, sep, tail = head.partition('Time now')
828         return float(tail)
829
830     @classmethod
831     def sleep_on_vpp_time(cls, sec):
832         """ Sleep according to time in VPP world """
833         # On a busy system with many processes
834         # we might end up with VPP time being slower than real world
835         # So take that into account when waiting for VPP to do something
836         start_time = cls.get_vpp_time()
837         while cls.get_vpp_time() - start_time < sec:
838             cls.sleep(0.1)
839
840     @classmethod
841     def pg_start(cls, trace=True):
842         """ Enable the PG, wait till it is done, then clean up """
843         for (intf, worker) in cls._old_pcaps:
844             intf.handle_old_pcap_file(intf.get_in_path(worker),
845                                       intf.in_history_counter)
846         cls._old_pcaps = []
847         if trace:
848             cls.vapi.cli("clear trace")
849             cls.vapi.cli("trace add pg-input 1000")
850         cls.vapi.cli('packet-generator enable')
851         # PG, when starts, runs to completion -
852         # so let's avoid a race condition,
853         # and wait a little till it's done.
854         # Then clean it up  - and then be gone.
855         deadline = time.time() + 300
856         while cls.vapi.cli('show packet-generator').find("Yes") != -1:
857             cls.sleep(0.01)  # yield
858             if time.time() > deadline:
859                 cls.logger.error("Timeout waiting for pg to stop")
860                 break
861         for intf, worker in cls._pcaps:
862             cls.vapi.cli('packet-generator delete %s' %
863                          intf.get_cap_name(worker))
864         cls._old_pcaps = cls._pcaps
865         cls._pcaps = []
866
867     @classmethod
868     def create_pg_interfaces_internal(cls, interfaces, gso=0, gso_size=0,
869                                       mode=None):
870         """
871         Create packet-generator interfaces.
872
873         :param interfaces: iterable indexes of the interfaces.
874         :returns: List of created interfaces.
875
876         """
877         result = []
878         for i in interfaces:
879             intf = VppPGInterface(cls, i, gso, gso_size, mode)
880             setattr(cls, intf.name, intf)
881             result.append(intf)
882         cls.pg_interfaces = result
883         return result
884
885     @classmethod
886     def create_pg_ip4_interfaces(cls, interfaces, gso=0, gso_size=0):
887         pgmode = VppEnum.vl_api_pg_interface_mode_t
888         return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
889                                                  pgmode.PG_API_MODE_IP4)
890
891     @classmethod
892     def create_pg_ip6_interfaces(cls, interfaces, gso=0, gso_size=0):
893         pgmode = VppEnum.vl_api_pg_interface_mode_t
894         return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
895                                                  pgmode.PG_API_MODE_IP6)
896
897     @classmethod
898     def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0):
899         pgmode = VppEnum.vl_api_pg_interface_mode_t
900         return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
901                                                  pgmode.PG_API_MODE_ETHERNET)
902
903     @classmethod
904     def create_pg_ethernet_interfaces(cls, interfaces, gso=0, gso_size=0):
905         pgmode = VppEnum.vl_api_pg_interface_mode_t
906         return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
907                                                  pgmode.PG_API_MODE_ETHERNET)
908
909     @classmethod
910     def create_loopback_interfaces(cls, count):
911         """
912         Create loopback interfaces.
913
914         :param count: number of interfaces created.
915         :returns: List of created interfaces.
916         """
917         result = [VppLoInterface(cls) for i in range(count)]
918         for intf in result:
919             setattr(cls, intf.name, intf)
920         cls.lo_interfaces = result
921         return result
922
923     @classmethod
924     def create_bvi_interfaces(cls, count):
925         """
926         Create BVI interfaces.
927
928         :param count: number of interfaces created.
929         :returns: List of created interfaces.
930         """
931         result = [VppBviInterface(cls) for i in range(count)]
932         for intf in result:
933             setattr(cls, intf.name, intf)
934         cls.bvi_interfaces = result
935         return result
936
937     @staticmethod
938     def extend_packet(packet, size, padding=' '):
939         """
940         Extend packet to given size by padding with spaces or custom padding
941         NOTE: Currently works only when Raw layer is present.
942
943         :param packet: packet
944         :param size: target size
945         :param padding: padding used to extend the payload
946
947         """
948         packet_len = len(packet) + 4
949         extend = size - packet_len
950         if extend > 0:
951             num = (extend // len(padding)) + 1
952             packet[Raw].load += (padding * num)[:extend].encode("ascii")
953
954     @classmethod
955     def reset_packet_infos(cls):
956         """ Reset the list of packet info objects and packet counts to zero """
957         cls._packet_infos = {}
958         cls._packet_count_for_dst_if_idx = {}
959
960     @classmethod
961     def create_packet_info(cls, src_if, dst_if):
962         """
963         Create packet info object containing the source and destination indexes
964         and add it to the testcase's packet info list
965
966         :param VppInterface src_if: source interface
967         :param VppInterface dst_if: destination interface
968
969         :returns: _PacketInfo object
970
971         """
972         info = _PacketInfo()
973         info.index = len(cls._packet_infos)
974         info.src = src_if.sw_if_index
975         info.dst = dst_if.sw_if_index
976         if isinstance(dst_if, VppSubInterface):
977             dst_idx = dst_if.parent.sw_if_index
978         else:
979             dst_idx = dst_if.sw_if_index
980         if dst_idx in cls._packet_count_for_dst_if_idx:
981             cls._packet_count_for_dst_if_idx[dst_idx] += 1
982         else:
983             cls._packet_count_for_dst_if_idx[dst_idx] = 1
984         cls._packet_infos[info.index] = info
985         return info
986
987     @staticmethod
988     def info_to_payload(info):
989         """
990         Convert _PacketInfo object to packet payload
991
992         :param info: _PacketInfo object
993
994         :returns: string containing serialized data from packet info
995         """
996
997         # retrieve payload, currently 18 bytes (4 x ints + 1 short)
998         return pack('iiiih', info.index, info.src,
999                     info.dst, info.ip, info.proto)
1000
1001     @staticmethod
1002     def payload_to_info(payload, payload_field='load'):
1003         """
1004         Convert packet payload to _PacketInfo object
1005
1006         :param payload: packet payload
1007         :type payload:  <class 'scapy.packet.Raw'>
1008         :param payload_field: packet fieldname of payload "load" for
1009                 <class 'scapy.packet.Raw'>
1010         :type payload_field: str
1011         :returns: _PacketInfo object containing de-serialized data from payload
1012
1013         """
1014
1015         # retrieve payload, currently 18 bytes (4 x ints + 1 short)
1016         payload_b = getattr(payload, payload_field)[:18]
1017
1018         info = _PacketInfo()
1019         info.index, info.src, info.dst, info.ip, info.proto \
1020             = unpack('iiiih', payload_b)
1021
1022         # some SRv6 TCs depend on get an exception if bad values are detected
1023         if info.index > 0x4000:
1024             raise ValueError('Index value is invalid')
1025
1026         return info
1027
1028     def get_next_packet_info(self, info):
1029         """
1030         Iterate over the packet info list stored in the testcase
1031         Start iteration with first element if info is None
1032         Continue based on index in info if info is specified
1033
1034         :param info: info or None
1035         :returns: next info in list or None if no more infos
1036         """
1037         if info is None:
1038             next_index = 0
1039         else:
1040             next_index = info.index + 1
1041         if next_index == len(self._packet_infos):
1042             return None
1043         else:
1044             return self._packet_infos[next_index]
1045
1046     def get_next_packet_info_for_interface(self, src_index, info):
1047         """
1048         Search the packet info list for the next packet info with same source
1049         interface index
1050
1051         :param src_index: source interface index to search for
1052         :param info: packet info - where to start the search
1053         :returns: packet info or None
1054
1055         """
1056         while True:
1057             info = self.get_next_packet_info(info)
1058             if info is None:
1059                 return None
1060             if info.src == src_index:
1061                 return info
1062
1063     def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
1064         """
1065         Search the packet info list for the next packet info with same source
1066         and destination interface indexes
1067
1068         :param src_index: source interface index to search for
1069         :param dst_index: destination interface index to search for
1070         :param info: packet info - where to start the search
1071         :returns: packet info or None
1072
1073         """
1074         while True:
1075             info = self.get_next_packet_info_for_interface(src_index, info)
1076             if info is None:
1077                 return None
1078             if info.dst == dst_index:
1079                 return info
1080
1081     def assert_equal(self, real_value, expected_value, name_or_class=None):
1082         if name_or_class is None:
1083             self.assertEqual(real_value, expected_value)
1084             return
1085         try:
1086             msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
1087             msg = msg % (getdoc(name_or_class).strip(),
1088                          real_value, str(name_or_class(real_value)),
1089                          expected_value, str(name_or_class(expected_value)))
1090         except Exception:
1091             msg = "Invalid %s: %s does not match expected value %s" % (
1092                 name_or_class, real_value, expected_value)
1093
1094         self.assertEqual(real_value, expected_value, msg)
1095
1096     def assert_in_range(self,
1097                         real_value,
1098                         expected_min,
1099                         expected_max,
1100                         name=None):
1101         if name is None:
1102             msg = None
1103         else:
1104             msg = "Invalid %s: %s out of range <%s,%s>" % (
1105                 name, real_value, expected_min, expected_max)
1106         self.assertTrue(expected_min <= real_value <= expected_max, msg)
1107
1108     def assert_packet_checksums_valid(self, packet,
1109                                       ignore_zero_udp_checksums=True):
1110         received = packet.__class__(scapy.compat.raw(packet))
1111         udp_layers = ['UDP', 'UDPerror']
1112         checksum_fields = ['cksum', 'chksum']
1113         checksums = []
1114         counter = 0
1115         temp = received.__class__(scapy.compat.raw(received))
1116         while True:
1117             layer = temp.getlayer(counter)
1118             if layer:
1119                 layer = layer.copy()
1120                 layer.remove_payload()
1121                 for cf in checksum_fields:
1122                     if hasattr(layer, cf):
1123                         if ignore_zero_udp_checksums and \
1124                                 0 == getattr(layer, cf) and \
1125                                 layer.name in udp_layers:
1126                             continue
1127                         delattr(temp.getlayer(counter), cf)
1128                         checksums.append((counter, cf))
1129             else:
1130                 break
1131             counter = counter + 1
1132         if 0 == len(checksums):
1133             return
1134         temp = temp.__class__(scapy.compat.raw(temp))
1135         for layer, cf in checksums:
1136             calc_sum = getattr(temp[layer], cf)
1137             self.assert_equal(
1138                 getattr(received[layer], cf), calc_sum,
1139                 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
1140             self.logger.debug(
1141                 "Checksum field `%s` on `%s` layer has correct value `%s`" %
1142                 (cf, temp[layer].name, calc_sum))
1143
1144     def assert_checksum_valid(self, received_packet, layer,
1145                               field_name='chksum',
1146                               ignore_zero_checksum=False):
1147         """ Check checksum of received packet on given layer """
1148         received_packet_checksum = getattr(received_packet[layer], field_name)
1149         if ignore_zero_checksum and 0 == received_packet_checksum:
1150             return
1151         recalculated = received_packet.__class__(
1152             scapy.compat.raw(received_packet))
1153         delattr(recalculated[layer], field_name)
1154         recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
1155         self.assert_equal(received_packet_checksum,
1156                           getattr(recalculated[layer], field_name),
1157                           "packet checksum on layer: %s" % layer)
1158
1159     def assert_ip_checksum_valid(self, received_packet,
1160                                  ignore_zero_checksum=False):
1161         self.assert_checksum_valid(received_packet, 'IP',
1162                                    ignore_zero_checksum=ignore_zero_checksum)
1163
1164     def assert_tcp_checksum_valid(self, received_packet,
1165                                   ignore_zero_checksum=False):
1166         self.assert_checksum_valid(received_packet, 'TCP',
1167                                    ignore_zero_checksum=ignore_zero_checksum)
1168
1169     def assert_udp_checksum_valid(self, received_packet,
1170                                   ignore_zero_checksum=True):
1171         self.assert_checksum_valid(received_packet, 'UDP',
1172                                    ignore_zero_checksum=ignore_zero_checksum)
1173
1174     def assert_embedded_icmp_checksum_valid(self, received_packet):
1175         if received_packet.haslayer(IPerror):
1176             self.assert_checksum_valid(received_packet, 'IPerror')
1177         if received_packet.haslayer(TCPerror):
1178             self.assert_checksum_valid(received_packet, 'TCPerror')
1179         if received_packet.haslayer(UDPerror):
1180             self.assert_checksum_valid(received_packet, 'UDPerror',
1181                                        ignore_zero_checksum=True)
1182         if received_packet.haslayer(ICMPerror):
1183             self.assert_checksum_valid(received_packet, 'ICMPerror')
1184
1185     def assert_icmp_checksum_valid(self, received_packet):
1186         self.assert_checksum_valid(received_packet, 'ICMP')
1187         self.assert_embedded_icmp_checksum_valid(received_packet)
1188
1189     def assert_icmpv6_checksum_valid(self, pkt):
1190         if pkt.haslayer(ICMPv6DestUnreach):
1191             self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
1192             self.assert_embedded_icmp_checksum_valid(pkt)
1193         if pkt.haslayer(ICMPv6EchoRequest):
1194             self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
1195         if pkt.haslayer(ICMPv6EchoReply):
1196             self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
1197
1198     def get_counter(self, counter):
1199         if counter.startswith("/"):
1200             counter_value = self.statistics.get_counter(counter)
1201         else:
1202             counters = self.vapi.cli("sh errors").split('\n')
1203             counter_value = 0
1204             for i in range(1, len(counters) - 1):
1205                 results = counters[i].split()
1206                 if results[1] == counter:
1207                     counter_value = int(results[0])
1208                     break
1209         return counter_value
1210
1211     def assert_counter_equal(self, counter, expected_value,
1212                              thread=None, index=0):
1213         c = self.get_counter(counter)
1214         if thread is not None:
1215             c = c[thread][index]
1216         else:
1217             c = sum(x[index] for x in c)
1218         self.assert_equal(c, expected_value, "counter `%s'" % counter)
1219
1220     def assert_packet_counter_equal(self, counter, expected_value):
1221         counter_value = self.get_counter(counter)
1222         self.assert_equal(counter_value, expected_value,
1223                           "packet counter `%s'" % counter)
1224
1225     def assert_error_counter_equal(self, counter, expected_value):
1226         counter_value = self.statistics[counter].sum()
1227         self.assert_equal(counter_value, expected_value,
1228                           "error counter `%s'" % counter)
1229
1230     @classmethod
1231     def sleep(cls, timeout, remark=None):
1232
1233         # /* Allow sleep(0) to maintain win32 semantics, and as decreed
1234         #  * by Guido, only the main thread can be interrupted.
1235         # */
1236         # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892  # noqa
1237         if timeout == 0:
1238             # yield quantum
1239             if hasattr(os, 'sched_yield'):
1240                 os.sched_yield()
1241             else:
1242                 time.sleep(0)
1243             return
1244
1245         cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1246         before = time.time()
1247         time.sleep(timeout)
1248         after = time.time()
1249         if after - before > 2 * timeout:
1250             cls.logger.error("unexpected self.sleep() result - "
1251                              "slept for %es instead of ~%es!",
1252                              after - before, timeout)
1253
1254         cls.logger.debug(
1255             "Finished sleep (%s) - slept %es (wanted %es)",
1256             remark, after - before, timeout)
1257
1258     def virtual_sleep(self, timeout, remark=None):
1259         self.logger.debug("Moving VPP time by %s (%s)", timeout, remark)
1260         self.vapi.cli("set clock adjust %s" % timeout)
1261
1262     def pg_send(self, intf, pkts, worker=None, trace=True):
1263         intf.add_stream(pkts, worker=worker)
1264         self.pg_enable_capture(self.pg_interfaces)
1265         self.pg_start(trace=trace)
1266
1267     def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None,
1268                                    trace=True):
1269         self.pg_send(intf, pkts)
1270         if not timeout:
1271             timeout = 1
1272         for i in self.pg_interfaces:
1273             i.get_capture(0, timeout=timeout)
1274             i.assert_nothing_captured(remark=remark)
1275             timeout = 0.1
1276         if trace:
1277             self.logger.debug(self.vapi.cli("show trace"))
1278
1279     def send_and_expect(self, intf, pkts, output, n_rx=None, worker=None,
1280                         trace=True):
1281         if not n_rx:
1282             n_rx = 1 if isinstance(pkts, Packet) else len(pkts)
1283         self.pg_send(intf, pkts, worker=worker, trace=trace)
1284         rx = output.get_capture(n_rx)
1285         if trace:
1286             self.logger.debug(self.vapi.cli("show trace"))
1287         return rx
1288
1289     def send_and_expect_load_balancing(self, input, pkts, outputs,
1290                                        worker=None, trace=True):
1291         self.pg_send(input, pkts, worker=worker, trace=trace)
1292         rxs = []
1293         for oo in outputs:
1294             rx = oo._get_capture(1)
1295             self.assertNotEqual(0, len(rx))
1296             rxs.append(rx)
1297         if trace:
1298             self.logger.debug(self.vapi.cli("show trace"))
1299         return rxs
1300
1301     def send_and_expect_only(self, intf, pkts, output, timeout=None):
1302         self.pg_send(intf, pkts)
1303         rx = output.get_capture(len(pkts))
1304         outputs = [output]
1305         if not timeout:
1306             timeout = 1
1307         for i in self.pg_interfaces:
1308             if i not in outputs:
1309                 i.get_capture(0, timeout=timeout)
1310                 i.assert_nothing_captured()
1311                 timeout = 0.1
1312
1313         return rx
1314
1315
1316 def get_testcase_doc_name(test):
1317     return getdoc(test.__class__).splitlines()[0]
1318
1319
1320 def get_test_description(descriptions, test):
1321     short_description = test.shortDescription()
1322     if descriptions and short_description:
1323         return short_description
1324     else:
1325         return str(test)
1326
1327
1328 class TestCaseInfo(object):
1329     def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1330         self.logger = logger
1331         self.tempdir = tempdir
1332         self.vpp_pid = vpp_pid
1333         self.vpp_bin_path = vpp_bin_path
1334         self.core_crash_test = None
1335
1336
1337 class VppTestResult(unittest.TestResult):
1338     """
1339     @property result_string
1340      String variable to store the test case result string.
1341     @property errors
1342      List variable containing 2-tuples of TestCase instances and strings
1343      holding formatted tracebacks. Each tuple represents a test which
1344      raised an unexpected exception.
1345     @property failures
1346      List variable containing 2-tuples of TestCase instances and strings
1347      holding formatted tracebacks. Each tuple represents a test where
1348      a failure was explicitly signalled using the TestCase.assert*()
1349      methods.
1350     """
1351
1352     failed_test_cases_info = set()
1353     core_crash_test_cases_info = set()
1354     current_test_case_info = None
1355
1356     def __init__(self, stream=None, descriptions=None, verbosity=None,
1357                  runner=None):
1358         """
1359         :param stream File descriptor to store where to report test results.
1360             Set to the standard error stream by default.
1361         :param descriptions Boolean variable to store information if to use
1362             test case descriptions.
1363         :param verbosity Integer variable to store required verbosity level.
1364         """
1365         super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1366         self.stream = stream
1367         self.descriptions = descriptions
1368         self.verbosity = verbosity
1369         self.result_string = None
1370         self.runner = runner
1371         self.printed = []
1372
1373     def addSuccess(self, test):
1374         """
1375         Record a test succeeded result
1376
1377         :param test:
1378
1379         """
1380         if self.current_test_case_info:
1381             self.current_test_case_info.logger.debug(
1382                 "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
1383                                                        test._testMethodName,
1384                                                        test._testMethodDoc))
1385         unittest.TestResult.addSuccess(self, test)
1386         self.result_string = colorize("OK", GREEN)
1387
1388         self.send_result_through_pipe(test, PASS)
1389
1390     def addSkip(self, test, reason):
1391         """
1392         Record a test skipped.
1393
1394         :param test:
1395         :param reason:
1396
1397         """
1398         if self.current_test_case_info:
1399             self.current_test_case_info.logger.debug(
1400                 "--- addSkip() %s.%s(%s) called, reason is %s" %
1401                 (test.__class__.__name__, test._testMethodName,
1402                  test._testMethodDoc, reason))
1403         unittest.TestResult.addSkip(self, test, reason)
1404         self.result_string = colorize("SKIP", YELLOW)
1405
1406         if reason == "not enough cpus":
1407             self.send_result_through_pipe(test, SKIP_CPU_SHORTAGE)
1408         else:
1409             self.send_result_through_pipe(test, SKIP)
1410
1411     def symlink_failed(self):
1412         if self.current_test_case_info:
1413             try:
1414                 failed_dir = config.failed_dir
1415                 link_path = os.path.join(
1416                     failed_dir,
1417                     '%s-FAILED' %
1418                     os.path.basename(self.current_test_case_info.tempdir))
1419
1420                 self.current_test_case_info.logger.debug(
1421                     "creating a link to the failed test")
1422                 self.current_test_case_info.logger.debug(
1423                     "os.symlink(%s, %s)" %
1424                     (self.current_test_case_info.tempdir, link_path))
1425                 if os.path.exists(link_path):
1426                     self.current_test_case_info.logger.debug(
1427                         'symlink already exists')
1428                 else:
1429                     os.symlink(self.current_test_case_info.tempdir, link_path)
1430
1431             except Exception as e:
1432                 self.current_test_case_info.logger.error(e)
1433
1434     def send_result_through_pipe(self, test, result):
1435         if hasattr(self, 'test_framework_result_pipe'):
1436             pipe = self.test_framework_result_pipe
1437             if pipe:
1438                 pipe.send((test.id(), result))
1439
1440     def log_error(self, test, err, fn_name):
1441         if self.current_test_case_info:
1442             if isinstance(test, unittest.suite._ErrorHolder):
1443                 test_name = test.description
1444             else:
1445                 test_name = '%s.%s(%s)' % (test.__class__.__name__,
1446                                            test._testMethodName,
1447                                            test._testMethodDoc)
1448             self.current_test_case_info.logger.debug(
1449                 "--- %s() %s called, err is %s" %
1450                 (fn_name, test_name, err))
1451             self.current_test_case_info.logger.debug(
1452                 "formatted exception is:\n%s" %
1453                 "".join(format_exception(*err)))
1454
1455     def add_error(self, test, err, unittest_fn, error_type):
1456         if error_type == FAIL:
1457             self.log_error(test, err, 'addFailure')
1458             error_type_str = colorize("FAIL", RED)
1459         elif error_type == ERROR:
1460             self.log_error(test, err, 'addError')
1461             error_type_str = colorize("ERROR", RED)
1462         else:
1463             raise Exception('Error type %s cannot be used to record an '
1464                             'error or a failure' % error_type)
1465
1466         unittest_fn(self, test, err)
1467         if self.current_test_case_info:
1468             self.result_string = "%s [ temp dir used by test case: %s ]" % \
1469                                  (error_type_str,
1470                                   self.current_test_case_info.tempdir)
1471             self.symlink_failed()
1472             self.failed_test_cases_info.add(self.current_test_case_info)
1473             if is_core_present(self.current_test_case_info.tempdir):
1474                 if not self.current_test_case_info.core_crash_test:
1475                     if isinstance(test, unittest.suite._ErrorHolder):
1476                         test_name = str(test)
1477                     else:
1478                         test_name = "'{!s}' ({!s})".format(
1479                             get_testcase_doc_name(test), test.id())
1480                     self.current_test_case_info.core_crash_test = test_name
1481                 self.core_crash_test_cases_info.add(
1482                     self.current_test_case_info)
1483         else:
1484             self.result_string = '%s [no temp dir]' % error_type_str
1485
1486         self.send_result_through_pipe(test, error_type)
1487
1488     def addFailure(self, test, err):
1489         """
1490         Record a test failed result
1491
1492         :param test:
1493         :param err: error message
1494
1495         """
1496         self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1497
1498     def addError(self, test, err):
1499         """
1500         Record a test error result
1501
1502         :param test:
1503         :param err: error message
1504
1505         """
1506         self.add_error(test, err, unittest.TestResult.addError, ERROR)
1507
1508     def getDescription(self, test):
1509         """
1510         Get test description
1511
1512         :param test:
1513         :returns: test description
1514
1515         """
1516         return get_test_description(self.descriptions, test)
1517
1518     def startTest(self, test):
1519         """
1520         Start a test
1521
1522         :param test:
1523
1524         """
1525
1526         def print_header(test):
1527             if test.__class__ in self.printed:
1528                 return
1529
1530             test_doc = getdoc(test)
1531             if not test_doc:
1532                 raise Exception("No doc string for test '%s'" % test.id())
1533
1534             test_title = test_doc.splitlines()[0].rstrip()
1535             test_title = colorize(test_title, GREEN)
1536             if test.is_tagged_run_solo():
1537                 test_title = colorize(f"SOLO RUN: {test_title}", YELLOW)
1538
1539             # This block may overwrite the colorized title above,
1540             # but we want this to stand out and be fixed
1541             if test.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
1542                 test_title = colorize(
1543                     f"FIXME with VPP workers: {test_title}", RED)
1544
1545             if test.has_tag(TestCaseTag.FIXME_ASAN):
1546                 test_title = colorize(
1547                     f"FIXME with ASAN: {test_title}", RED)
1548                 test.skip_fixme_asan()
1549
1550             if hasattr(test, 'vpp_worker_count'):
1551                 if test.vpp_worker_count == 0:
1552                     test_title += " [main thread only]"
1553                 elif test.vpp_worker_count == 1:
1554                     test_title += " [1 worker thread]"
1555                 else:
1556                     test_title += f" [{test.vpp_worker_count} worker threads]"
1557
1558             if test.__class__.skipped_due_to_cpu_lack:
1559                 test_title = colorize(
1560                     f"{test_title} [skipped - not enough cpus, "
1561                     f"required={test.__class__.get_cpus_required()}, "
1562                     f"available={max_vpp_cpus}]", YELLOW)
1563
1564             print(double_line_delim)
1565             print(test_title)
1566             print(double_line_delim)
1567             self.printed.append(test.__class__)
1568
1569         print_header(test)
1570         self.start_test = time.time()
1571         unittest.TestResult.startTest(self, test)
1572         if self.verbosity > 0:
1573             self.stream.writeln(
1574                 "Starting " + self.getDescription(test) + " ...")
1575             self.stream.writeln(single_line_delim)
1576
1577     def stopTest(self, test):
1578         """
1579         Called when the given test has been run
1580
1581         :param test:
1582
1583         """
1584         unittest.TestResult.stopTest(self, test)
1585
1586         if self.verbosity > 0:
1587             self.stream.writeln(single_line_delim)
1588             self.stream.writeln("%-73s%s" % (self.getDescription(test),
1589                                              self.result_string))
1590             self.stream.writeln(single_line_delim)
1591         else:
1592             self.stream.writeln("%-68s %4.2f %s" %
1593                                 (self.getDescription(test),
1594                                  time.time() - self.start_test,
1595                                  self.result_string))
1596
1597         self.send_result_through_pipe(test, TEST_RUN)
1598
1599     def printErrors(self):
1600         """
1601         Print errors from running the test case
1602         """
1603         if len(self.errors) > 0 or len(self.failures) > 0:
1604             self.stream.writeln()
1605             self.printErrorList('ERROR', self.errors)
1606             self.printErrorList('FAIL', self.failures)
1607
1608         # ^^ that is the last output from unittest before summary
1609         if not self.runner.print_summary:
1610             devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
1611             self.stream = devnull
1612             self.runner.stream = devnull
1613
1614     def printErrorList(self, flavour, errors):
1615         """
1616         Print error list to the output stream together with error type
1617         and test case description.
1618
1619         :param flavour: error type
1620         :param errors: iterable errors
1621
1622         """
1623         for test, err in errors:
1624             self.stream.writeln(double_line_delim)
1625             self.stream.writeln("%s: %s" %
1626                                 (flavour, self.getDescription(test)))
1627             self.stream.writeln(single_line_delim)
1628             self.stream.writeln("%s" % err)
1629
1630
1631 class VppTestRunner(unittest.TextTestRunner):
1632     """
1633     A basic test runner implementation which prints results to standard error.
1634     """
1635
1636     @property
1637     def resultclass(self):
1638         """Class maintaining the results of the tests"""
1639         return VppTestResult
1640
1641     def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1642                  result_pipe=None, failfast=False, buffer=False,
1643                  resultclass=None, print_summary=True, **kwargs):
1644         # ignore stream setting here, use hard-coded stdout to be in sync
1645         # with prints from VppTestCase methods ...
1646         super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1647                                             verbosity, failfast, buffer,
1648                                             resultclass, **kwargs)
1649         KeepAliveReporter.pipe = keep_alive_pipe
1650
1651         self.orig_stream = self.stream
1652         self.resultclass.test_framework_result_pipe = result_pipe
1653
1654         self.print_summary = print_summary
1655
1656     def _makeResult(self):
1657         return self.resultclass(self.stream,
1658                                 self.descriptions,
1659                                 self.verbosity,
1660                                 self)
1661
1662     def run(self, test):
1663         """
1664         Run the tests
1665
1666         :param test:
1667
1668         """
1669         faulthandler.enable()  # emit stack trace to stderr if killed by signal
1670
1671         result = super(VppTestRunner, self).run(test)
1672         if not self.print_summary:
1673             self.stream = self.orig_stream
1674             result.stream = self.orig_stream
1675         return result
1676
1677
1678 class Worker(Thread):
1679     def __init__(self, executable_args, logger, env=None, *args, **kwargs):
1680         super(Worker, self).__init__(*args, **kwargs)
1681         self.logger = logger
1682         self.args = executable_args
1683         if hasattr(self, 'testcase') and self.testcase.debug_all:
1684             if self.testcase.debug_gdbserver:
1685                 self.args = ['/usr/bin/gdbserver', 'localhost:{port}'
1686                              .format(port=self.testcase.gdbserver_port)] + args
1687             elif self.testcase.debug_gdb and hasattr(self, 'wait_for_gdb'):
1688                 self.args.append(self.wait_for_gdb)
1689         self.app_bin = executable_args[0]
1690         self.app_name = os.path.basename(self.app_bin)
1691         if hasattr(self, 'role'):
1692             self.app_name += ' {role}'.format(role=self.role)
1693         self.process = None
1694         self.result = None
1695         env = {} if env is None else env
1696         self.env = copy.deepcopy(env)
1697
1698     def wait_for_enter(self):
1699         if not hasattr(self, 'testcase'):
1700             return
1701         if self.testcase.debug_all and self.testcase.debug_gdbserver:
1702             print()
1703             print(double_line_delim)
1704             print("Spawned GDB Server for '{app}' with PID: {pid}"
1705                   .format(app=self.app_name, pid=self.process.pid))
1706         elif self.testcase.debug_all and self.testcase.debug_gdb:
1707             print()
1708             print(double_line_delim)
1709             print("Spawned '{app}' with PID: {pid}"
1710                   .format(app=self.app_name, pid=self.process.pid))
1711         else:
1712             return
1713         print(single_line_delim)
1714         print("You can debug '{app}' using:".format(app=self.app_name))
1715         if self.testcase.debug_gdbserver:
1716             print("sudo gdb " + self.app_bin +
1717                   " -ex 'target remote localhost:{port}'"
1718                   .format(port=self.testcase.gdbserver_port))
1719             print("Now is the time to attach gdb by running the above "
1720                   "command, set up breakpoints etc., then resume from "
1721                   "within gdb by issuing the 'continue' command")
1722             self.testcase.gdbserver_port += 1
1723         elif self.testcase.debug_gdb:
1724             print("sudo gdb " + self.app_bin +
1725                   " -ex 'attach {pid}'".format(pid=self.process.pid))
1726             print("Now is the time to attach gdb by running the above "
1727                   "command and set up breakpoints etc., then resume from"
1728                   " within gdb by issuing the 'continue' command")
1729         print(single_line_delim)
1730         input("Press ENTER to continue running the testcase...")
1731
1732     def run(self):
1733         executable = self.args[0]
1734         if not os.path.exists(executable) or not os.access(
1735                 executable, os.F_OK | os.X_OK):
1736             # Exit code that means some system file did not exist,
1737             # could not be opened, or had some other kind of error.
1738             self.result = os.EX_OSFILE
1739             raise EnvironmentError(
1740                 "executable '%s' is not found or executable." % executable)
1741         self.logger.debug("Running executable '{app}': '{cmd}'"
1742                           .format(app=self.app_name,
1743                                   cmd=' '.join(self.args)))
1744         env = os.environ.copy()
1745         env.update(self.env)
1746         env["CK_LOG_FILE_NAME"] = "-"
1747         self.process = subprocess.Popen(
1748             ['stdbuf', '-o0', '-e0'] + self.args, shell=False, env=env,
1749             preexec_fn=os.setpgrp, stdout=subprocess.PIPE,
1750             stderr=subprocess.PIPE)
1751         self.wait_for_enter()
1752         out, err = self.process.communicate()
1753         self.logger.debug("Finished running `{app}'".format(app=self.app_name))
1754         self.logger.info("Return code is `%s'" % self.process.returncode)
1755         self.logger.info(single_line_delim)
1756         self.logger.info("Executable `{app}' wrote to stdout:"
1757                          .format(app=self.app_name))
1758         self.logger.info(single_line_delim)
1759         self.logger.info(out.decode('utf-8'))
1760         self.logger.info(single_line_delim)
1761         self.logger.info("Executable `{app}' wrote to stderr:"
1762                          .format(app=self.app_name))
1763         self.logger.info(single_line_delim)
1764         self.logger.info(err.decode('utf-8'))
1765         self.logger.info(single_line_delim)
1766         self.result = self.process.returncode
1767
1768
1769 if __name__ == '__main__':
1770     pass