8065518ff7a3cfe2bea102f47e5909e2bcc8bdb5
[vpp.git] / test / framework.py
1 #!/usr/bin/env python3
2
3 from __future__ import print_function
4 import logging
5 import sys
6 import os
7 import select
8 import signal
9 import subprocess
10 import unittest
11 import re
12 import time
13 import faulthandler
14 import random
15 import copy
16 import platform
17 import shutil
18 from collections import deque
19 from threading import Thread, Event
20 from inspect import getdoc, isclass
21 from traceback import format_exception
22 from logging import FileHandler, DEBUG, Formatter
23 from enum import Enum
24 from abc import ABC, abstractmethod
25 from struct import pack, unpack
26
27 import scapy.compat
28 from scapy.packet import Raw, Packet
29 from config import config, available_cpus, num_cpus, max_vpp_cpus
30 import hook as hookmodule
31 from vpp_pg_interface import VppPGInterface
32 from vpp_sub_interface import VppSubInterface
33 from vpp_lo_interface import VppLoInterface
34 from vpp_bvi_interface import VppBviInterface
35 from vpp_papi_provider import VppPapiProvider
36 from vpp_papi import VppEnum
37 import vpp_papi
38 from vpp_papi.vpp_stats import VPPStats
39 from vpp_papi.vpp_transport_socket import VppTransportSocketIOError
40 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
41     get_logger, colorize
42 from vpp_object import VppObjectRegistry
43 from util import ppp, is_core_present
44 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
45 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
46 from scapy.layers.inet6 import ICMPv6EchoReply
47
48
49 logger = logging.getLogger(__name__)
50
51 # Set up an empty logger for the testcase that can be overridden as necessary
52 null_logger = logging.getLogger('VppTestCase')
53 null_logger.addHandler(logging.NullHandler())
54
55 PASS = 0
56 FAIL = 1
57 ERROR = 2
58 SKIP = 3
59 TEST_RUN = 4
60 SKIP_CPU_SHORTAGE = 5
61
62
63 if config.debug_framework:
64     import debug_internal
65
66 """
67   Test framework module.
68
69   The module provides a set of tools for constructing and running tests and
70   representing the results.
71 """
72
73
74 class VppDiedError(Exception):
75     """ exception for reporting that the subprocess has died."""
76
77     signals_by_value = {v: k for k, v in signal.__dict__.items() if
78                         k.startswith('SIG') and not k.startswith('SIG_')}
79
80     def __init__(self, rv=None, testcase=None, method_name=None):
81         self.rv = rv
82         self.signal_name = None
83         self.testcase = testcase
84         self.method_name = method_name
85
86         try:
87             self.signal_name = VppDiedError.signals_by_value[-rv]
88         except (KeyError, TypeError):
89             pass
90
91         if testcase is None and method_name is None:
92             in_msg = ''
93         else:
94             in_msg = ' while running %s.%s' % (testcase, method_name)
95
96         if self.rv:
97             msg = "VPP subprocess died unexpectedly%s with return code: %d%s."\
98                 % (in_msg, self.rv, ' [%s]' %
99                    (self.signal_name if
100                     self.signal_name is not None else ''))
101         else:
102             msg = "VPP subprocess died unexpectedly%s." % in_msg
103
104         super(VppDiedError, self).__init__(msg)
105
106
107 class _PacketInfo(object):
108     """Private class to create packet info object.
109
110     Help process information about the next packet.
111     Set variables to default values.
112     """
113     #: Store the index of the packet.
114     index = -1
115     #: Store the index of the source packet generator interface of the packet.
116     src = -1
117     #: Store the index of the destination packet generator interface
118     #: of the packet.
119     dst = -1
120     #: Store expected ip version
121     ip = -1
122     #: Store expected upper protocol
123     proto = -1
124     #: Store the copy of the former packet.
125     data = None
126
127     def __eq__(self, other):
128         index = self.index == other.index
129         src = self.src == other.src
130         dst = self.dst == other.dst
131         data = self.data == other.data
132         return index and src and dst and data
133
134
135 def pump_output(testclass):
136     """ pump output from vpp stdout/stderr to proper queues """
137     stdout_fragment = ""
138     stderr_fragment = ""
139     while not testclass.pump_thread_stop_flag.is_set():
140         readable = select.select([testclass.vpp.stdout.fileno(),
141                                   testclass.vpp.stderr.fileno(),
142                                   testclass.pump_thread_wakeup_pipe[0]],
143                                  [], [])[0]
144         if testclass.vpp.stdout.fileno() in readable:
145             read = os.read(testclass.vpp.stdout.fileno(), 102400)
146             if len(read) > 0:
147                 split = read.decode('ascii',
148                                     errors='backslashreplace').splitlines(True)
149                 if len(stdout_fragment) > 0:
150                     split[0] = "%s%s" % (stdout_fragment, split[0])
151                 if len(split) > 0 and split[-1].endswith("\n"):
152                     limit = None
153                 else:
154                     limit = -1
155                     stdout_fragment = split[-1]
156                 testclass.vpp_stdout_deque.extend(split[:limit])
157                 if not config.cache_vpp_output:
158                     for line in split[:limit]:
159                         testclass.logger.info(
160                             "VPP STDOUT: %s" % line.rstrip("\n"))
161         if testclass.vpp.stderr.fileno() in readable:
162             read = os.read(testclass.vpp.stderr.fileno(), 102400)
163             if len(read) > 0:
164                 split = read.decode('ascii',
165                                     errors='backslashreplace').splitlines(True)
166                 if len(stderr_fragment) > 0:
167                     split[0] = "%s%s" % (stderr_fragment, split[0])
168                 if len(split) > 0 and split[-1].endswith("\n"):
169                     limit = None
170                 else:
171                     limit = -1
172                     stderr_fragment = split[-1]
173
174                 testclass.vpp_stderr_deque.extend(split[:limit])
175                 if not config.cache_vpp_output:
176                     for line in split[:limit]:
177                         testclass.logger.error(
178                             "VPP STDERR: %s" % line.rstrip("\n"))
179                         # ignoring the dummy pipe here intentionally - the
180                         # flag will take care of properly terminating the loop
181
182
183 def _is_platform_aarch64():
184     return platform.machine() == 'aarch64'
185
186
187 is_platform_aarch64 = _is_platform_aarch64()
188
189
190 class KeepAliveReporter(object):
191     """
192     Singleton object which reports test start to parent process
193     """
194     _shared_state = {}
195
196     def __init__(self):
197         self.__dict__ = self._shared_state
198         self._pipe = None
199
200     @property
201     def pipe(self):
202         return self._pipe
203
204     @pipe.setter
205     def pipe(self, pipe):
206         if self._pipe is not None:
207             raise Exception("Internal error - pipe should only be set once.")
208         self._pipe = pipe
209
210     def send_keep_alive(self, test, desc=None):
211         """
212         Write current test tmpdir & desc to keep-alive pipe to signal liveness
213         """
214         if self.pipe is None:
215             # if not running forked..
216             return
217
218         if isclass(test):
219             desc = '%s (%s)' % (desc, unittest.util.strclass(test))
220         else:
221             desc = test.id()
222
223         self.pipe.send((desc, config.vpp, test.tempdir, test.vpp.pid))
224
225
226 class TestCaseTag(Enum):
227     # marks the suites that must run at the end
228     # using only a single test runner
229     RUN_SOLO = 1
230     # marks the suites broken on VPP multi-worker
231     FIXME_VPP_WORKERS = 2
232     # marks the suites broken when ASan is enabled
233     FIXME_ASAN = 3
234
235
236 def create_tag_decorator(e):
237     def decorator(cls):
238         try:
239             cls.test_tags.append(e)
240         except AttributeError:
241             cls.test_tags = [e]
242         return cls
243     return decorator
244
245
246 tag_run_solo = create_tag_decorator(TestCaseTag.RUN_SOLO)
247 tag_fixme_vpp_workers = create_tag_decorator(TestCaseTag.FIXME_VPP_WORKERS)
248 tag_fixme_asan = create_tag_decorator(TestCaseTag.FIXME_ASAN)
249
250
251 class DummyVpp:
252     returncode = None
253     pid = 0xcafebafe
254
255     def poll(self):
256         pass
257
258     def terminate(self):
259         pass
260
261
262 class CPUInterface(ABC):
263     cpus = []
264     skipped_due_to_cpu_lack = False
265
266     @classmethod
267     @abstractmethod
268     def get_cpus_required(cls):
269         pass
270
271     @classmethod
272     def assign_cpus(cls, cpus):
273         cls.cpus = cpus
274
275
276 class VppTestCase(CPUInterface, unittest.TestCase):
277     """This subclass is a base class for VPP test cases that are implemented as
278     classes. It provides methods to create and run test case.
279     """
280
281     extra_vpp_statseg_config = ""
282     extra_vpp_punt_config = []
283     extra_vpp_plugin_config = []
284     logger = null_logger
285     vapi_response_timeout = 5
286     remove_configured_vpp_objects_on_tear_down = True
287
288     @property
289     def packet_infos(self):
290         """List of packet infos"""
291         return self._packet_infos
292
293     @classmethod
294     def get_packet_count_for_if_idx(cls, dst_if_index):
295         """Get the number of packet info for specified destination if index"""
296         if dst_if_index in cls._packet_count_for_dst_if_idx:
297             return cls._packet_count_for_dst_if_idx[dst_if_index]
298         else:
299             return 0
300
301     @classmethod
302     def has_tag(cls, tag):
303         """ if the test case has a given tag - return true """
304         try:
305             return tag in cls.test_tags
306         except AttributeError:
307             pass
308         return False
309
310     @classmethod
311     def is_tagged_run_solo(cls):
312         """ if the test case class is timing-sensitive - return true """
313         return cls.has_tag(TestCaseTag.RUN_SOLO)
314
315     @classmethod
316     def skip_fixme_asan(cls):
317         """ if @tag_fixme_asan & ASan is enabled - mark for skip """
318         if cls.has_tag(TestCaseTag.FIXME_ASAN):
319             vpp_extra_cmake_args = os.environ.get('VPP_EXTRA_CMAKE_ARGS', '')
320             if 'DVPP_ENABLE_SANITIZE_ADDR=ON' in vpp_extra_cmake_args:
321                 cls = unittest.skip("Skipping @tag_fixme_asan tests")(cls)
322
323     @classmethod
324     def instance(cls):
325         """Return the instance of this testcase"""
326         return cls.test_instance
327
328     @classmethod
329     def set_debug_flags(cls, d):
330         cls.gdbserver_port = 7777
331         cls.debug_core = False
332         cls.debug_gdb = False
333         cls.debug_gdbserver = False
334         cls.debug_all = False
335         cls.debug_attach = False
336         if d is None:
337             return
338         dl = d.lower()
339         if dl == "core":
340             cls.debug_core = True
341         elif dl == "gdb" or dl == "gdb-all":
342             cls.debug_gdb = True
343         elif dl == "gdbserver" or dl == "gdbserver-all":
344             cls.debug_gdbserver = True
345         elif dl == "attach":
346             cls.debug_attach = True
347         else:
348             raise Exception("Unrecognized DEBUG option: '%s'" % d)
349         if dl == "gdb-all" or dl == "gdbserver-all":
350             cls.debug_all = True
351
352     @classmethod
353     def get_vpp_worker_count(cls):
354         if not hasattr(cls, "vpp_worker_count"):
355             if cls.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
356                 cls.vpp_worker_count = 0
357             else:
358                 cls.vpp_worker_count = config.vpp_worker_count
359         return cls.vpp_worker_count
360
361     @classmethod
362     def get_cpus_required(cls):
363         return 1 + cls.get_vpp_worker_count()
364
365     @classmethod
366     def setUpConstants(cls):
367         """ Set-up the test case class based on environment variables """
368         cls.step = config.step
369         cls.plugin_path = ":".join(config.vpp_plugin_dir)
370         cls.test_plugin_path = ":".join(config.vpp_test_plugin_dir)
371         cls.extern_plugin_path = ":".join(config.extern_plugin_dir)
372         debug_cli = ""
373         if cls.step or cls.debug_gdb or cls.debug_gdbserver:
374             debug_cli = "cli-listen localhost:5002"
375         size = re.search(r"\d+[gG]", config.coredump_size)
376         if size:
377             coredump_size = f"coredump-size {config.coredump_size}".lower()
378         else:
379             coredump_size = "coredump-size unlimited"
380         default_variant = config.variant
381         if default_variant is not None:
382             default_variant = "defaults { %s 100 }" % default_variant
383         else:
384             default_variant = ""
385
386         api_fuzzing = config.api_fuzz
387         if api_fuzzing is None:
388             api_fuzzing = 'off'
389
390         cls.vpp_cmdline = [
391             config.vpp,
392             "unix", "{", "nodaemon", debug_cli, "full-coredump",
393             coredump_size, "runtime-dir", cls.tempdir, "}",
394             "api-trace", "{", "on", "}",
395             "api-segment", "{", "prefix", cls.get_api_segment_prefix(), "}",
396             "cpu", "{", "main-core", str(cls.cpus[0]), ]
397         if cls.extern_plugin_path not in (None, ""):
398             cls.extra_vpp_plugin_config.append(
399                 "add-path %s" % cls.extern_plugin_path)
400         if cls.get_vpp_worker_count():
401             cls.vpp_cmdline.extend([
402                 "corelist-workers", ",".join([str(x) for x in cls.cpus[1:]])])
403         cls.vpp_cmdline.extend([
404             "}",
405             "physmem", "{", "max-size", "32m", "}",
406             "statseg", "{", "socket-name", cls.get_stats_sock_path(),
407             cls.extra_vpp_statseg_config, "}",
408             "socksvr", "{", "socket-name", cls.get_api_sock_path(), "}",
409             "node { ", default_variant, "}",
410             "api-fuzz {", api_fuzzing, "}",
411             "plugins", "{", "plugin", "dpdk_plugin.so", "{", "disable", "}",
412             "plugin", "rdma_plugin.so", "{", "disable", "}",
413             "plugin", "lisp_unittest_plugin.so", "{", "enable", "}",
414             "plugin", "unittest_plugin.so", "{", "enable", "}"
415         ] + cls.extra_vpp_plugin_config + ["}", ])
416
417         if cls.extra_vpp_punt_config is not None:
418             cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
419
420         if not cls.debug_attach:
421             cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
422             cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
423
424     @classmethod
425     def wait_for_enter(cls):
426         if cls.debug_gdbserver:
427             print(double_line_delim)
428             print("Spawned GDB server with PID: %d" % cls.vpp.pid)
429         elif cls.debug_gdb:
430             print(double_line_delim)
431             print("Spawned VPP with PID: %d" % cls.vpp.pid)
432         else:
433             cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
434             return
435         print(single_line_delim)
436         print("You can debug VPP using:")
437         if cls.debug_gdbserver:
438             print(f"sudo gdb {config.vpp} "
439                   f"-ex 'target remote localhost:{cls.gdbserver_port}'")
440             print("Now is the time to attach gdb by running the above "
441                   "command, set up breakpoints etc., then resume VPP from "
442                   "within gdb by issuing the 'continue' command")
443             cls.gdbserver_port += 1
444         elif cls.debug_gdb:
445             print(f"sudo gdb {config.vpp} -ex 'attach {cls.vpp.pid}'")
446             print("Now is the time to attach gdb by running the above "
447                   "command and set up breakpoints etc., then resume VPP from"
448                   " within gdb by issuing the 'continue' command")
449         print(single_line_delim)
450         input("Press ENTER to continue running the testcase...")
451
452     @classmethod
453     def attach_vpp(cls):
454         cls.vpp = DummyVpp()
455
456     @classmethod
457     def run_vpp(cls):
458         cls.logger.debug(f"Assigned cpus: {cls.cpus}")
459         cmdline = cls.vpp_cmdline
460
461         if cls.debug_gdbserver:
462             gdbserver = '/usr/bin/gdbserver'
463             if not os.path.isfile(gdbserver) or\
464                     not os.access(gdbserver, os.X_OK):
465                 raise Exception("gdbserver binary '%s' does not exist or is "
466                                 "not executable" % gdbserver)
467
468             cmdline = [gdbserver, 'localhost:{port}'
469                        .format(port=cls.gdbserver_port)] + cls.vpp_cmdline
470             cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
471
472         try:
473             cls.vpp = subprocess.Popen(cmdline,
474                                        stdout=subprocess.PIPE,
475                                        stderr=subprocess.PIPE)
476         except subprocess.CalledProcessError as e:
477             cls.logger.critical("Subprocess returned with non-0 return code: ("
478                                 "%s)", e.returncode)
479             raise
480         except OSError as e:
481             cls.logger.critical("Subprocess returned with OS error: "
482                                 "(%s) %s", e.errno, e.strerror)
483             raise
484         except Exception as e:
485             cls.logger.exception("Subprocess returned unexpected from "
486                                  "%s:", cmdline)
487             raise
488
489         cls.wait_for_enter()
490
491     @classmethod
492     def wait_for_coredump(cls):
493         corefile = cls.tempdir + "/core"
494         if os.path.isfile(corefile):
495             cls.logger.error("Waiting for coredump to complete: %s", corefile)
496             curr_size = os.path.getsize(corefile)
497             deadline = time.time() + 60
498             ok = False
499             while time.time() < deadline:
500                 cls.sleep(1)
501                 size = curr_size
502                 curr_size = os.path.getsize(corefile)
503                 if size == curr_size:
504                     ok = True
505                     break
506             if not ok:
507                 cls.logger.error("Timed out waiting for coredump to complete:"
508                                  " %s", corefile)
509             else:
510                 cls.logger.error("Coredump complete: %s, size %d",
511                                  corefile, curr_size)
512
513     @classmethod
514     def get_stats_sock_path(cls):
515         return "%s/stats.sock" % cls.tempdir
516
517     @classmethod
518     def get_api_sock_path(cls):
519         return "%s/api.sock" % cls.tempdir
520
521     @classmethod
522     def get_api_segment_prefix(cls):
523         return os.path.basename(cls.tempdir)  # Only used for VAPI
524
525     @classmethod
526     def get_tempdir(cls):
527         if cls.debug_attach:
528             tmpdir = f"{config.tmp_dir}/unittest-attach-gdb"
529         else:
530             tmpdir = f"{config.tmp_dir}/vpp-unittest-{cls.__name__}"
531             if config.wipe_tmp_dir:
532                 shutil.rmtree(tmpdir, ignore_errors=True)
533             os.mkdir(tmpdir)
534         return tmpdir
535
536     @classmethod
537     def create_file_handler(cls):
538         if config.log_dir is None:
539             cls.file_handler = FileHandler(f"{cls.tempdir}/log.txt")
540             return
541
542         logdir = f"{config.log_dir}/vpp-unittest-{cls.__name__}"
543         if config.wipe_tmp_dir:
544             shutil.rmtree(logdir, ignore_errors=True)
545         os.mkdir(logdir)
546         cls.file_handler = FileHandler(f"{logdir}/log.txt")
547
548     @classmethod
549     def setUpClass(cls):
550         """
551         Perform class setup before running the testcase
552         Remove shared memory files, start vpp and connect the vpp-api
553         """
554         super(VppTestCase, cls).setUpClass()
555         cls.logger = get_logger(cls.__name__)
556         random.seed(config.rnd_seed)
557         if hasattr(cls, 'parallel_handler'):
558             cls.logger.addHandler(cls.parallel_handler)
559             cls.logger.propagate = False
560         cls.set_debug_flags(config.debug)
561         cls.tempdir = cls.get_tempdir()
562         cls.create_file_handler()
563         cls.file_handler.setFormatter(
564             Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
565                       datefmt="%H:%M:%S"))
566         cls.file_handler.setLevel(DEBUG)
567         cls.logger.addHandler(cls.file_handler)
568         cls.logger.debug("--- setUpClass() for %s called ---" % cls.__name__)
569         os.chdir(cls.tempdir)
570         cls.logger.info("Temporary dir is %s, api socket is %s",
571                         cls.tempdir, cls.get_api_sock_path())
572         cls.logger.debug("Random seed is %s", config.rnd_seed)
573         cls.setUpConstants()
574         cls.reset_packet_infos()
575         cls._pcaps = []
576         cls._old_pcaps = []
577         cls.verbose = 0
578         cls.vpp_dead = False
579         cls.registry = VppObjectRegistry()
580         cls.vpp_startup_failed = False
581         cls.reporter = KeepAliveReporter()
582         # need to catch exceptions here because if we raise, then the cleanup
583         # doesn't get called and we might end with a zombie vpp
584         try:
585             if cls.debug_attach:
586                 cls.attach_vpp()
587             else:
588                 cls.run_vpp()
589             cls.reporter.send_keep_alive(cls, 'setUpClass')
590             VppTestResult.current_test_case_info = TestCaseInfo(
591                 cls.logger, cls.tempdir, cls.vpp.pid, config.vpp)
592             cls.vpp_stdout_deque = deque()
593             cls.vpp_stderr_deque = deque()
594             if not cls.debug_attach:
595                 cls.pump_thread_stop_flag = Event()
596                 cls.pump_thread_wakeup_pipe = os.pipe()
597                 cls.pump_thread = Thread(target=pump_output, args=(cls,))
598                 cls.pump_thread.daemon = True
599                 cls.pump_thread.start()
600             if cls.debug_gdb or cls.debug_gdbserver or cls.debug_attach:
601                 cls.vapi_response_timeout = 0
602             cls.vapi = VppPapiProvider(cls.__name__, cls,
603                                        cls.vapi_response_timeout)
604             if cls.step:
605                 hook = hookmodule.StepHook(cls)
606             else:
607                 hook = hookmodule.PollHook(cls)
608             cls.vapi.register_hook(hook)
609             cls.statistics = VPPStats(socketname=cls.get_stats_sock_path())
610             try:
611                 hook.poll_vpp()
612             except VppDiedError:
613                 cls.vpp_startup_failed = True
614                 cls.logger.critical(
615                     "VPP died shortly after startup, check the"
616                     " output to standard error for possible cause")
617                 raise
618             try:
619                 cls.vapi.connect()
620             except (vpp_papi.VPPIOError, Exception) as e:
621                 cls.logger.debug("Exception connecting to vapi: %s" % e)
622                 cls.vapi.disconnect()
623
624                 if cls.debug_gdbserver:
625                     print(colorize("You're running VPP inside gdbserver but "
626                                    "VPP-API connection failed, did you forget "
627                                    "to 'continue' VPP from within gdb?", RED))
628                 raise e
629             if cls.debug_attach:
630                 last_line = cls.vapi.cli("show thread").split("\n")[-2]
631                 cls.vpp_worker_count = int(last_line.split(" ")[0])
632                 print("Detected VPP with %s workers." % cls.vpp_worker_count)
633         except vpp_papi.VPPRuntimeError as e:
634             cls.logger.debug("%s" % e)
635             cls.quit()
636             raise e
637         except Exception as e:
638             cls.logger.debug("Exception connecting to VPP: %s" % e)
639             cls.quit()
640             raise e
641
642     @classmethod
643     def _debug_quit(cls):
644         if (cls.debug_gdbserver or cls.debug_gdb):
645             try:
646                 cls.vpp.poll()
647
648                 if cls.vpp.returncode is None:
649                     print()
650                     print(double_line_delim)
651                     print("VPP or GDB server is still running")
652                     print(single_line_delim)
653                     input("When done debugging, press ENTER to kill the "
654                           "process and finish running the testcase...")
655             except AttributeError:
656                 pass
657
658     @classmethod
659     def quit(cls):
660         """
661         Disconnect vpp-api, kill vpp and cleanup shared memory files
662         """
663         cls._debug_quit()
664
665         # first signal that we want to stop the pump thread, then wake it up
666         if hasattr(cls, 'pump_thread_stop_flag'):
667             cls.pump_thread_stop_flag.set()
668         if hasattr(cls, 'pump_thread_wakeup_pipe'):
669             os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
670         if hasattr(cls, 'pump_thread'):
671             cls.logger.debug("Waiting for pump thread to stop")
672             cls.pump_thread.join()
673         if hasattr(cls, 'vpp_stderr_reader_thread'):
674             cls.logger.debug("Waiting for stderr pump to stop")
675             cls.vpp_stderr_reader_thread.join()
676
677         if hasattr(cls, 'vpp'):
678             if hasattr(cls, 'vapi'):
679                 cls.logger.debug(cls.vapi.vpp.get_stats())
680                 cls.logger.debug("Disconnecting class vapi client on %s",
681                                  cls.__name__)
682                 cls.vapi.disconnect()
683                 cls.logger.debug("Deleting class vapi attribute on %s",
684                                  cls.__name__)
685                 del cls.vapi
686             cls.vpp.poll()
687             if not cls.debug_attach and cls.vpp.returncode is None:
688                 cls.wait_for_coredump()
689                 cls.logger.debug("Sending TERM to vpp")
690                 cls.vpp.terminate()
691                 cls.logger.debug("Waiting for vpp to die")
692                 try:
693                     outs, errs = cls.vpp.communicate(timeout=5)
694                 except subprocess.TimeoutExpired:
695                     cls.vpp.kill()
696                     outs, errs = cls.vpp.communicate()
697             cls.logger.debug("Deleting class vpp attribute on %s",
698                              cls.__name__)
699             if not cls.debug_attach:
700                 cls.vpp.stdout.close()
701                 cls.vpp.stderr.close()
702             del cls.vpp
703
704         if cls.vpp_startup_failed:
705             stdout_log = cls.logger.info
706             stderr_log = cls.logger.critical
707         else:
708             stdout_log = cls.logger.info
709             stderr_log = cls.logger.info
710
711         if hasattr(cls, 'vpp_stdout_deque'):
712             stdout_log(single_line_delim)
713             stdout_log('VPP output to stdout while running %s:', cls.__name__)
714             stdout_log(single_line_delim)
715             vpp_output = "".join(cls.vpp_stdout_deque)
716             with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
717                 f.write(vpp_output)
718             stdout_log('\n%s', vpp_output)
719             stdout_log(single_line_delim)
720
721         if hasattr(cls, 'vpp_stderr_deque'):
722             stderr_log(single_line_delim)
723             stderr_log('VPP output to stderr while running %s:', cls.__name__)
724             stderr_log(single_line_delim)
725             vpp_output = "".join(cls.vpp_stderr_deque)
726             with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
727                 f.write(vpp_output)
728             stderr_log('\n%s', vpp_output)
729             stderr_log(single_line_delim)
730
731     @classmethod
732     def tearDownClass(cls):
733         """ Perform final cleanup after running all tests in this test-case """
734         cls.logger.debug("--- tearDownClass() for %s called ---" %
735                          cls.__name__)
736         cls.reporter.send_keep_alive(cls, 'tearDownClass')
737         cls.quit()
738         cls.file_handler.close()
739         cls.reset_packet_infos()
740         if config.debug_framework:
741             debug_internal.on_tear_down_class(cls)
742
743     def show_commands_at_teardown(self):
744         """ Allow subclass specific teardown logging additions."""
745         self.logger.info("--- No test specific show commands provided. ---")
746
747     def tearDown(self):
748         """ Show various debug prints after each test """
749         self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
750                           (self.__class__.__name__, self._testMethodName,
751                            self._testMethodDoc))
752
753         try:
754             if not self.vpp_dead:
755                 self.logger.debug(self.vapi.cli("show trace max 1000"))
756                 self.logger.info(self.vapi.ppcli("show interface"))
757                 self.logger.info(self.vapi.ppcli("show hardware"))
758                 self.logger.info(self.statistics.set_errors_str())
759                 self.logger.info(self.vapi.ppcli("show run"))
760                 self.logger.info(self.vapi.ppcli("show log"))
761                 self.logger.info(self.vapi.ppcli("show bihash"))
762                 self.logger.info("Logging testcase specific show commands.")
763                 self.show_commands_at_teardown()
764                 if self.remove_configured_vpp_objects_on_tear_down:
765                     self.registry.remove_vpp_config(self.logger)
766             # Save/Dump VPP api trace log
767             m = self._testMethodName
768             api_trace = "vpp_api_trace.%s.%d.log" % (m, self.vpp.pid)
769             tmp_api_trace = "/tmp/%s" % api_trace
770             vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
771             self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
772             self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
773                                                     vpp_api_trace_log))
774             os.rename(tmp_api_trace, vpp_api_trace_log)
775         except VppTransportSocketIOError:
776             self.logger.debug("VppTransportSocketIOError: Vpp dead. "
777                               "Cannot log show commands.")
778             self.vpp_dead = True
779         else:
780             self.registry.unregister_all(self.logger)
781
782     def setUp(self):
783         """ Clear trace before running each test"""
784         super(VppTestCase, self).setUp()
785         self.reporter.send_keep_alive(self)
786         if self.vpp_dead:
787             raise VppDiedError(rv=None, testcase=self.__class__.__name__,
788                                method_name=self._testMethodName)
789         self.sleep(.1, "during setUp")
790         self.vpp_stdout_deque.append(
791             "--- test setUp() for %s.%s(%s) starts here ---\n" %
792             (self.__class__.__name__, self._testMethodName,
793              self._testMethodDoc))
794         self.vpp_stderr_deque.append(
795             "--- test setUp() for %s.%s(%s) starts here ---\n" %
796             (self.__class__.__name__, self._testMethodName,
797              self._testMethodDoc))
798         self.vapi.cli("clear trace")
799         # store the test instance inside the test class - so that objects
800         # holding the class can access instance methods (like assertEqual)
801         type(self).test_instance = self
802
803     @classmethod
804     def pg_enable_capture(cls, interfaces=None):
805         """
806         Enable capture on packet-generator interfaces
807
808         :param interfaces: iterable interface indexes (if None,
809                            use self.pg_interfaces)
810
811         """
812         if interfaces is None:
813             interfaces = cls.pg_interfaces
814         for i in interfaces:
815             i.enable_capture()
816
817     @classmethod
818     def register_pcap(cls, intf, worker):
819         """ Register a pcap in the testclass """
820         # add to the list of captures with current timestamp
821         cls._pcaps.append((intf, worker))
822
823     @classmethod
824     def get_vpp_time(cls):
825         # processes e.g. "Time now 2.190522, Wed, 11 Mar 2020 17:29:54 GMT"
826         # returns float("2.190522")
827         timestr = cls.vapi.cli('show clock')
828         head, sep, tail = timestr.partition(',')
829         head, sep, tail = head.partition('Time now')
830         return float(tail)
831
832     @classmethod
833     def sleep_on_vpp_time(cls, sec):
834         """ Sleep according to time in VPP world """
835         # On a busy system with many processes
836         # we might end up with VPP time being slower than real world
837         # So take that into account when waiting for VPP to do something
838         start_time = cls.get_vpp_time()
839         while cls.get_vpp_time() - start_time < sec:
840             cls.sleep(0.1)
841
842     @classmethod
843     def pg_start(cls, trace=True):
844         """ Enable the PG, wait till it is done, then clean up """
845         for (intf, worker) in cls._old_pcaps:
846             intf.handle_old_pcap_file(intf.get_in_path(worker),
847                                       intf.in_history_counter)
848         cls._old_pcaps = []
849         if trace:
850             cls.vapi.cli("clear trace")
851             cls.vapi.cli("trace add pg-input 1000")
852         cls.vapi.cli('packet-generator enable')
853         # PG, when starts, runs to completion -
854         # so let's avoid a race condition,
855         # and wait a little till it's done.
856         # Then clean it up  - and then be gone.
857         deadline = time.time() + 300
858         while cls.vapi.cli('show packet-generator').find("Yes") != -1:
859             cls.sleep(0.01)  # yield
860             if time.time() > deadline:
861                 cls.logger.error("Timeout waiting for pg to stop")
862                 break
863         for intf, worker in cls._pcaps:
864             cls.vapi.cli('packet-generator delete %s' %
865                          intf.get_cap_name(worker))
866         cls._old_pcaps = cls._pcaps
867         cls._pcaps = []
868
869     @classmethod
870     def create_pg_interfaces_internal(cls, interfaces, gso=0, gso_size=0,
871                                       mode=None):
872         """
873         Create packet-generator interfaces.
874
875         :param interfaces: iterable indexes of the interfaces.
876         :returns: List of created interfaces.
877
878         """
879         result = []
880         for i in interfaces:
881             intf = VppPGInterface(cls, i, gso, gso_size, mode)
882             setattr(cls, intf.name, intf)
883             result.append(intf)
884         cls.pg_interfaces = result
885         return result
886
887     @classmethod
888     def create_pg_ip4_interfaces(cls, interfaces, gso=0, gso_size=0):
889         pgmode = VppEnum.vl_api_pg_interface_mode_t
890         return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
891                                                  pgmode.PG_API_MODE_IP4)
892
893     @classmethod
894     def create_pg_ip6_interfaces(cls, interfaces, gso=0, gso_size=0):
895         pgmode = VppEnum.vl_api_pg_interface_mode_t
896         return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
897                                                  pgmode.PG_API_MODE_IP6)
898
899     @classmethod
900     def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0):
901         pgmode = VppEnum.vl_api_pg_interface_mode_t
902         return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
903                                                  pgmode.PG_API_MODE_ETHERNET)
904
905     @classmethod
906     def create_pg_ethernet_interfaces(cls, interfaces, gso=0, gso_size=0):
907         pgmode = VppEnum.vl_api_pg_interface_mode_t
908         return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
909                                                  pgmode.PG_API_MODE_ETHERNET)
910
911     @classmethod
912     def create_loopback_interfaces(cls, count):
913         """
914         Create loopback interfaces.
915
916         :param count: number of interfaces created.
917         :returns: List of created interfaces.
918         """
919         result = [VppLoInterface(cls) for i in range(count)]
920         for intf in result:
921             setattr(cls, intf.name, intf)
922         cls.lo_interfaces = result
923         return result
924
925     @classmethod
926     def create_bvi_interfaces(cls, count):
927         """
928         Create BVI interfaces.
929
930         :param count: number of interfaces created.
931         :returns: List of created interfaces.
932         """
933         result = [VppBviInterface(cls) for i in range(count)]
934         for intf in result:
935             setattr(cls, intf.name, intf)
936         cls.bvi_interfaces = result
937         return result
938
939     @staticmethod
940     def extend_packet(packet, size, padding=' '):
941         """
942         Extend packet to given size by padding with spaces or custom padding
943         NOTE: Currently works only when Raw layer is present.
944
945         :param packet: packet
946         :param size: target size
947         :param padding: padding used to extend the payload
948
949         """
950         packet_len = len(packet) + 4
951         extend = size - packet_len
952         if extend > 0:
953             num = (extend // len(padding)) + 1
954             packet[Raw].load += (padding * num)[:extend].encode("ascii")
955
956     @classmethod
957     def reset_packet_infos(cls):
958         """ Reset the list of packet info objects and packet counts to zero """
959         cls._packet_infos = {}
960         cls._packet_count_for_dst_if_idx = {}
961
962     @classmethod
963     def create_packet_info(cls, src_if, dst_if):
964         """
965         Create packet info object containing the source and destination indexes
966         and add it to the testcase's packet info list
967
968         :param VppInterface src_if: source interface
969         :param VppInterface dst_if: destination interface
970
971         :returns: _PacketInfo object
972
973         """
974         info = _PacketInfo()
975         info.index = len(cls._packet_infos)
976         info.src = src_if.sw_if_index
977         info.dst = dst_if.sw_if_index
978         if isinstance(dst_if, VppSubInterface):
979             dst_idx = dst_if.parent.sw_if_index
980         else:
981             dst_idx = dst_if.sw_if_index
982         if dst_idx in cls._packet_count_for_dst_if_idx:
983             cls._packet_count_for_dst_if_idx[dst_idx] += 1
984         else:
985             cls._packet_count_for_dst_if_idx[dst_idx] = 1
986         cls._packet_infos[info.index] = info
987         return info
988
989     @staticmethod
990     def info_to_payload(info):
991         """
992         Convert _PacketInfo object to packet payload
993
994         :param info: _PacketInfo object
995
996         :returns: string containing serialized data from packet info
997         """
998
999         # retrieve payload, currently 18 bytes (4 x ints + 1 short)
1000         return pack('iiiih', info.index, info.src,
1001                     info.dst, info.ip, info.proto)
1002
1003     @staticmethod
1004     def payload_to_info(payload, payload_field='load'):
1005         """
1006         Convert packet payload to _PacketInfo object
1007
1008         :param payload: packet payload
1009         :type payload:  <class 'scapy.packet.Raw'>
1010         :param payload_field: packet fieldname of payload "load" for
1011                 <class 'scapy.packet.Raw'>
1012         :type payload_field: str
1013         :returns: _PacketInfo object containing de-serialized data from payload
1014
1015         """
1016
1017         # retrieve payload, currently 18 bytes (4 x ints + 1 short)
1018         payload_b = getattr(payload, payload_field)[:18]
1019
1020         info = _PacketInfo()
1021         info.index, info.src, info.dst, info.ip, info.proto \
1022             = unpack('iiiih', payload_b)
1023
1024         # some SRv6 TCs depend on get an exception if bad values are detected
1025         if info.index > 0x4000:
1026             raise ValueError('Index value is invalid')
1027
1028         return info
1029
1030     def get_next_packet_info(self, info):
1031         """
1032         Iterate over the packet info list stored in the testcase
1033         Start iteration with first element if info is None
1034         Continue based on index in info if info is specified
1035
1036         :param info: info or None
1037         :returns: next info in list or None if no more infos
1038         """
1039         if info is None:
1040             next_index = 0
1041         else:
1042             next_index = info.index + 1
1043         if next_index == len(self._packet_infos):
1044             return None
1045         else:
1046             return self._packet_infos[next_index]
1047
1048     def get_next_packet_info_for_interface(self, src_index, info):
1049         """
1050         Search the packet info list for the next packet info with same source
1051         interface index
1052
1053         :param src_index: source interface index to search for
1054         :param info: packet info - where to start the search
1055         :returns: packet info or None
1056
1057         """
1058         while True:
1059             info = self.get_next_packet_info(info)
1060             if info is None:
1061                 return None
1062             if info.src == src_index:
1063                 return info
1064
1065     def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
1066         """
1067         Search the packet info list for the next packet info with same source
1068         and destination interface indexes
1069
1070         :param src_index: source interface index to search for
1071         :param dst_index: destination interface index to search for
1072         :param info: packet info - where to start the search
1073         :returns: packet info or None
1074
1075         """
1076         while True:
1077             info = self.get_next_packet_info_for_interface(src_index, info)
1078             if info is None:
1079                 return None
1080             if info.dst == dst_index:
1081                 return info
1082
1083     def assert_equal(self, real_value, expected_value, name_or_class=None):
1084         if name_or_class is None:
1085             self.assertEqual(real_value, expected_value)
1086             return
1087         try:
1088             msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
1089             msg = msg % (getdoc(name_or_class).strip(),
1090                          real_value, str(name_or_class(real_value)),
1091                          expected_value, str(name_or_class(expected_value)))
1092         except Exception:
1093             msg = "Invalid %s: %s does not match expected value %s" % (
1094                 name_or_class, real_value, expected_value)
1095
1096         self.assertEqual(real_value, expected_value, msg)
1097
1098     def assert_in_range(self,
1099                         real_value,
1100                         expected_min,
1101                         expected_max,
1102                         name=None):
1103         if name is None:
1104             msg = None
1105         else:
1106             msg = "Invalid %s: %s out of range <%s,%s>" % (
1107                 name, real_value, expected_min, expected_max)
1108         self.assertTrue(expected_min <= real_value <= expected_max, msg)
1109
1110     def assert_packet_checksums_valid(self, packet,
1111                                       ignore_zero_udp_checksums=True):
1112         received = packet.__class__(scapy.compat.raw(packet))
1113         udp_layers = ['UDP', 'UDPerror']
1114         checksum_fields = ['cksum', 'chksum']
1115         checksums = []
1116         counter = 0
1117         temp = received.__class__(scapy.compat.raw(received))
1118         while True:
1119             layer = temp.getlayer(counter)
1120             if layer:
1121                 layer = layer.copy()
1122                 layer.remove_payload()
1123                 for cf in checksum_fields:
1124                     if hasattr(layer, cf):
1125                         if ignore_zero_udp_checksums and \
1126                                 0 == getattr(layer, cf) and \
1127                                 layer.name in udp_layers:
1128                             continue
1129                         delattr(temp.getlayer(counter), cf)
1130                         checksums.append((counter, cf))
1131             else:
1132                 break
1133             counter = counter + 1
1134         if 0 == len(checksums):
1135             return
1136         temp = temp.__class__(scapy.compat.raw(temp))
1137         for layer, cf in checksums:
1138             calc_sum = getattr(temp[layer], cf)
1139             self.assert_equal(
1140                 getattr(received[layer], cf), calc_sum,
1141                 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
1142             self.logger.debug(
1143                 "Checksum field `%s` on `%s` layer has correct value `%s`" %
1144                 (cf, temp[layer].name, calc_sum))
1145
1146     def assert_checksum_valid(self, received_packet, layer,
1147                               field_name='chksum',
1148                               ignore_zero_checksum=False):
1149         """ Check checksum of received packet on given layer """
1150         received_packet_checksum = getattr(received_packet[layer], field_name)
1151         if ignore_zero_checksum and 0 == received_packet_checksum:
1152             return
1153         recalculated = received_packet.__class__(
1154             scapy.compat.raw(received_packet))
1155         delattr(recalculated[layer], field_name)
1156         recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
1157         self.assert_equal(received_packet_checksum,
1158                           getattr(recalculated[layer], field_name),
1159                           "packet checksum on layer: %s" % layer)
1160
1161     def assert_ip_checksum_valid(self, received_packet,
1162                                  ignore_zero_checksum=False):
1163         self.assert_checksum_valid(received_packet, 'IP',
1164                                    ignore_zero_checksum=ignore_zero_checksum)
1165
1166     def assert_tcp_checksum_valid(self, received_packet,
1167                                   ignore_zero_checksum=False):
1168         self.assert_checksum_valid(received_packet, 'TCP',
1169                                    ignore_zero_checksum=ignore_zero_checksum)
1170
1171     def assert_udp_checksum_valid(self, received_packet,
1172                                   ignore_zero_checksum=True):
1173         self.assert_checksum_valid(received_packet, 'UDP',
1174                                    ignore_zero_checksum=ignore_zero_checksum)
1175
1176     def assert_embedded_icmp_checksum_valid(self, received_packet):
1177         if received_packet.haslayer(IPerror):
1178             self.assert_checksum_valid(received_packet, 'IPerror')
1179         if received_packet.haslayer(TCPerror):
1180             self.assert_checksum_valid(received_packet, 'TCPerror')
1181         if received_packet.haslayer(UDPerror):
1182             self.assert_checksum_valid(received_packet, 'UDPerror',
1183                                        ignore_zero_checksum=True)
1184         if received_packet.haslayer(ICMPerror):
1185             self.assert_checksum_valid(received_packet, 'ICMPerror')
1186
1187     def assert_icmp_checksum_valid(self, received_packet):
1188         self.assert_checksum_valid(received_packet, 'ICMP')
1189         self.assert_embedded_icmp_checksum_valid(received_packet)
1190
1191     def assert_icmpv6_checksum_valid(self, pkt):
1192         if pkt.haslayer(ICMPv6DestUnreach):
1193             self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
1194             self.assert_embedded_icmp_checksum_valid(pkt)
1195         if pkt.haslayer(ICMPv6EchoRequest):
1196             self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
1197         if pkt.haslayer(ICMPv6EchoReply):
1198             self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
1199
1200     def get_counter(self, counter):
1201         if counter.startswith("/"):
1202             counter_value = self.statistics.get_counter(counter)
1203         else:
1204             counters = self.vapi.cli("sh errors").split('\n')
1205             counter_value = 0
1206             for i in range(1, len(counters) - 1):
1207                 results = counters[i].split()
1208                 if results[1] == counter:
1209                     counter_value = int(results[0])
1210                     break
1211         return counter_value
1212
1213     def assert_counter_equal(self, counter, expected_value,
1214                              thread=None, index=0):
1215         c = self.get_counter(counter)
1216         if thread is not None:
1217             c = c[thread][index]
1218         else:
1219             c = sum(x[index] for x in c)
1220         self.assert_equal(c, expected_value, "counter `%s'" % counter)
1221
1222     def assert_packet_counter_equal(self, counter, expected_value):
1223         counter_value = self.get_counter(counter)
1224         self.assert_equal(counter_value, expected_value,
1225                           "packet counter `%s'" % counter)
1226
1227     def assert_error_counter_equal(self, counter, expected_value):
1228         counter_value = self.statistics[counter].sum()
1229         self.assert_equal(counter_value, expected_value,
1230                           "error counter `%s'" % counter)
1231
1232     @classmethod
1233     def sleep(cls, timeout, remark=None):
1234
1235         # /* Allow sleep(0) to maintain win32 semantics, and as decreed
1236         #  * by Guido, only the main thread can be interrupted.
1237         # */
1238         # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892  # noqa
1239         if timeout == 0:
1240             # yield quantum
1241             if hasattr(os, 'sched_yield'):
1242                 os.sched_yield()
1243             else:
1244                 time.sleep(0)
1245             return
1246
1247         cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1248         before = time.time()
1249         time.sleep(timeout)
1250         after = time.time()
1251         if after - before > 2 * timeout:
1252             cls.logger.error("unexpected self.sleep() result - "
1253                              "slept for %es instead of ~%es!",
1254                              after - before, timeout)
1255
1256         cls.logger.debug(
1257             "Finished sleep (%s) - slept %es (wanted %es)",
1258             remark, after - before, timeout)
1259
1260     def virtual_sleep(self, timeout, remark=None):
1261         self.logger.debug("Moving VPP time by %s (%s)", timeout, remark)
1262         self.vapi.cli("set clock adjust %s" % timeout)
1263
1264     def pg_send(self, intf, pkts, worker=None, trace=True):
1265         intf.add_stream(pkts, worker=worker)
1266         self.pg_enable_capture(self.pg_interfaces)
1267         self.pg_start(trace=trace)
1268
1269     def snapshot_stats(self, stats_diff):
1270         """Return snapshot of interesting stats based on diff dictionary."""
1271         stats_snapshot = {}
1272         for sw_if_index in stats_diff:
1273             for counter in stats_diff[sw_if_index]:
1274                 stats_snapshot[counter] = self.statistics[counter]
1275         self.logger.debug(f"Took statistics stats_snapshot: {stats_snapshot}")
1276         return stats_snapshot
1277
1278     def compare_stats_with_snapshot(self, stats_diff, stats_snapshot):
1279         """Assert appropriate difference between current stats and snapshot."""
1280         for sw_if_index in stats_diff:
1281             for cntr, diff in stats_diff[sw_if_index].items():
1282                 if sw_if_index == "err":
1283                     self.assert_equal(
1284                         self.statistics[cntr].sum(),
1285                         stats_snapshot[cntr].sum() + diff,
1286                         f"'{cntr}' counter value (previous value: "
1287                         f"{stats_snapshot[cntr].sum()}, "
1288                         f"expected diff: {diff})")
1289                 else:
1290                     try:
1291                         self.assert_equal(
1292                             self.statistics[cntr][:, sw_if_index].sum(),
1293                             stats_snapshot[cntr][:, sw_if_index].sum() + diff,
1294                             f"'{cntr}' counter value (previous value: "
1295                             f"{stats_snapshot[cntr][:, sw_if_index].sum()}, "
1296                             f"expected diff: {diff})")
1297                     except IndexError:
1298                         # if diff is 0, then this most probably a case where
1299                         # test declares multiple interfaces but traffic hasn't
1300                         # passed through this one yet - which means the counter
1301                         # value is 0 and can be ignored
1302                         if 0 != diff:
1303                             raise
1304
1305     def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None,
1306                                    stats_diff=None, trace=True, msg=None):
1307         if stats_diff:
1308             stats_snapshot = self.snapshot_stats(stats_diff)
1309
1310         self.pg_send(intf, pkts)
1311
1312         try:
1313             if not timeout:
1314                 timeout = 1
1315             for i in self.pg_interfaces:
1316                 i.assert_nothing_captured(timeout=timeout, remark=remark)
1317                 timeout = 0.1
1318         finally:
1319             if trace:
1320                 if msg:
1321                     self.logger.debug(f"send_and_assert_no_replies: {msg}")
1322                 self.logger.debug(self.vapi.cli("show trace"))
1323
1324         if stats_diff:
1325             self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1326
1327     def send_and_expect(self, intf, pkts, output, n_rx=None, worker=None,
1328                         trace=True, msg=None, stats_diff=None):
1329         if stats_diff:
1330             stats_snapshot = self.snapshot_stats(stats_diff)
1331
1332         if not n_rx:
1333             n_rx = 1 if isinstance(pkts, Packet) else len(pkts)
1334         self.pg_send(intf, pkts, worker=worker, trace=trace)
1335         rx = output.get_capture(n_rx)
1336         if trace:
1337             if msg:
1338                 self.logger.debug(f"send_and_expect: {msg}")
1339             self.logger.debug(self.vapi.cli("show trace"))
1340
1341         if stats_diff:
1342             self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1343
1344         return rx
1345
1346     def send_and_expect_load_balancing(self, input, pkts, outputs,
1347                                        worker=None, trace=True):
1348         self.pg_send(input, pkts, worker=worker, trace=trace)
1349         rxs = []
1350         for oo in outputs:
1351             rx = oo._get_capture(1)
1352             self.assertNotEqual(0, len(rx))
1353             rxs.append(rx)
1354         if trace:
1355             self.logger.debug(self.vapi.cli("show trace"))
1356         return rxs
1357
1358     def send_and_expect_some(self, intf, pkts, output,
1359                              worker=None,
1360                              trace=True):
1361         self.pg_send(intf, pkts, worker=worker, trace=trace)
1362         rx = output._get_capture(1)
1363         if trace:
1364             self.logger.debug(self.vapi.cli("show trace"))
1365         self.assertTrue(len(rx) > 0)
1366         self.assertTrue(len(rx) < len(pkts))
1367         return rx
1368
1369     def send_and_expect_only(self, intf, pkts, output, timeout=None,
1370                              stats_diff=None):
1371         if stats_diff:
1372             stats_snapshot = self.snapshot_stats(stats_diff)
1373
1374         self.pg_send(intf, pkts)
1375         rx = output.get_capture(len(pkts))
1376         outputs = [output]
1377         if not timeout:
1378             timeout = 1
1379         for i in self.pg_interfaces:
1380             if i not in outputs:
1381                 i.assert_nothing_captured(timeout=timeout)
1382                 timeout = 0.1
1383
1384         if stats_diff:
1385             self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1386
1387         return rx
1388
1389
1390 def get_testcase_doc_name(test):
1391     return getdoc(test.__class__).splitlines()[0]
1392
1393
1394 def get_test_description(descriptions, test):
1395     short_description = test.shortDescription()
1396     if descriptions and short_description:
1397         return short_description
1398     else:
1399         return str(test)
1400
1401
1402 class TestCaseInfo(object):
1403     def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1404         self.logger = logger
1405         self.tempdir = tempdir
1406         self.vpp_pid = vpp_pid
1407         self.vpp_bin_path = vpp_bin_path
1408         self.core_crash_test = None
1409
1410
1411 class VppTestResult(unittest.TestResult):
1412     """
1413     @property result_string
1414      String variable to store the test case result string.
1415     @property errors
1416      List variable containing 2-tuples of TestCase instances and strings
1417      holding formatted tracebacks. Each tuple represents a test which
1418      raised an unexpected exception.
1419     @property failures
1420      List variable containing 2-tuples of TestCase instances and strings
1421      holding formatted tracebacks. Each tuple represents a test where
1422      a failure was explicitly signalled using the TestCase.assert*()
1423      methods.
1424     """
1425
1426     failed_test_cases_info = set()
1427     core_crash_test_cases_info = set()
1428     current_test_case_info = None
1429
1430     def __init__(self, stream=None, descriptions=None, verbosity=None,
1431                  runner=None):
1432         """
1433         :param stream File descriptor to store where to report test results.
1434             Set to the standard error stream by default.
1435         :param descriptions Boolean variable to store information if to use
1436             test case descriptions.
1437         :param verbosity Integer variable to store required verbosity level.
1438         """
1439         super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1440         self.stream = stream
1441         self.descriptions = descriptions
1442         self.verbosity = verbosity
1443         self.result_string = None
1444         self.runner = runner
1445         self.printed = []
1446
1447     def addSuccess(self, test):
1448         """
1449         Record a test succeeded result
1450
1451         :param test:
1452
1453         """
1454         if self.current_test_case_info:
1455             self.current_test_case_info.logger.debug(
1456                 "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
1457                                                        test._testMethodName,
1458                                                        test._testMethodDoc))
1459         unittest.TestResult.addSuccess(self, test)
1460         self.result_string = colorize("OK", GREEN)
1461
1462         self.send_result_through_pipe(test, PASS)
1463
1464     def addSkip(self, test, reason):
1465         """
1466         Record a test skipped.
1467
1468         :param test:
1469         :param reason:
1470
1471         """
1472         if self.current_test_case_info:
1473             self.current_test_case_info.logger.debug(
1474                 "--- addSkip() %s.%s(%s) called, reason is %s" %
1475                 (test.__class__.__name__, test._testMethodName,
1476                  test._testMethodDoc, reason))
1477         unittest.TestResult.addSkip(self, test, reason)
1478         self.result_string = colorize("SKIP", YELLOW)
1479
1480         if reason == "not enough cpus":
1481             self.send_result_through_pipe(test, SKIP_CPU_SHORTAGE)
1482         else:
1483             self.send_result_through_pipe(test, SKIP)
1484
1485     def symlink_failed(self):
1486         if self.current_test_case_info:
1487             try:
1488                 failed_dir = config.failed_dir
1489                 link_path = os.path.join(
1490                     failed_dir,
1491                     '%s-FAILED' %
1492                     os.path.basename(self.current_test_case_info.tempdir))
1493
1494                 self.current_test_case_info.logger.debug(
1495                     "creating a link to the failed test")
1496                 self.current_test_case_info.logger.debug(
1497                     "os.symlink(%s, %s)" %
1498                     (self.current_test_case_info.tempdir, link_path))
1499                 if os.path.exists(link_path):
1500                     self.current_test_case_info.logger.debug(
1501                         'symlink already exists')
1502                 else:
1503                     os.symlink(self.current_test_case_info.tempdir, link_path)
1504
1505             except Exception as e:
1506                 self.current_test_case_info.logger.error(e)
1507
1508     def send_result_through_pipe(self, test, result):
1509         if hasattr(self, 'test_framework_result_pipe'):
1510             pipe = self.test_framework_result_pipe
1511             if pipe:
1512                 pipe.send((test.id(), result))
1513
1514     def log_error(self, test, err, fn_name):
1515         if self.current_test_case_info:
1516             if isinstance(test, unittest.suite._ErrorHolder):
1517                 test_name = test.description
1518             else:
1519                 test_name = '%s.%s(%s)' % (test.__class__.__name__,
1520                                            test._testMethodName,
1521                                            test._testMethodDoc)
1522             self.current_test_case_info.logger.debug(
1523                 "--- %s() %s called, err is %s" %
1524                 (fn_name, test_name, err))
1525             self.current_test_case_info.logger.debug(
1526                 "formatted exception is:\n%s" %
1527                 "".join(format_exception(*err)))
1528
1529     def add_error(self, test, err, unittest_fn, error_type):
1530         if error_type == FAIL:
1531             self.log_error(test, err, 'addFailure')
1532             error_type_str = colorize("FAIL", RED)
1533         elif error_type == ERROR:
1534             self.log_error(test, err, 'addError')
1535             error_type_str = colorize("ERROR", RED)
1536         else:
1537             raise Exception('Error type %s cannot be used to record an '
1538                             'error or a failure' % error_type)
1539
1540         unittest_fn(self, test, err)
1541         if self.current_test_case_info:
1542             self.result_string = "%s [ temp dir used by test case: %s ]" % \
1543                                  (error_type_str,
1544                                   self.current_test_case_info.tempdir)
1545             self.symlink_failed()
1546             self.failed_test_cases_info.add(self.current_test_case_info)
1547             if is_core_present(self.current_test_case_info.tempdir):
1548                 if not self.current_test_case_info.core_crash_test:
1549                     if isinstance(test, unittest.suite._ErrorHolder):
1550                         test_name = str(test)
1551                     else:
1552                         test_name = "'{!s}' ({!s})".format(
1553                             get_testcase_doc_name(test), test.id())
1554                     self.current_test_case_info.core_crash_test = test_name
1555                 self.core_crash_test_cases_info.add(
1556                     self.current_test_case_info)
1557         else:
1558             self.result_string = '%s [no temp dir]' % error_type_str
1559
1560         self.send_result_through_pipe(test, error_type)
1561
1562     def addFailure(self, test, err):
1563         """
1564         Record a test failed result
1565
1566         :param test:
1567         :param err: error message
1568
1569         """
1570         self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1571
1572     def addError(self, test, err):
1573         """
1574         Record a test error result
1575
1576         :param test:
1577         :param err: error message
1578
1579         """
1580         self.add_error(test, err, unittest.TestResult.addError, ERROR)
1581
1582     def getDescription(self, test):
1583         """
1584         Get test description
1585
1586         :param test:
1587         :returns: test description
1588
1589         """
1590         return get_test_description(self.descriptions, test)
1591
1592     def startTest(self, test):
1593         """
1594         Start a test
1595
1596         :param test:
1597
1598         """
1599
1600         def print_header(test):
1601             if test.__class__ in self.printed:
1602                 return
1603
1604             test_doc = getdoc(test)
1605             if not test_doc:
1606                 raise Exception("No doc string for test '%s'" % test.id())
1607
1608             test_title = test_doc.splitlines()[0].rstrip()
1609             test_title = colorize(test_title, GREEN)
1610             if test.is_tagged_run_solo():
1611                 test_title = colorize(f"SOLO RUN: {test_title}", YELLOW)
1612
1613             # This block may overwrite the colorized title above,
1614             # but we want this to stand out and be fixed
1615             if test.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
1616                 test_title = colorize(
1617                     f"FIXME with VPP workers: {test_title}", RED)
1618
1619             if test.has_tag(TestCaseTag.FIXME_ASAN):
1620                 test_title = colorize(
1621                     f"FIXME with ASAN: {test_title}", RED)
1622                 test.skip_fixme_asan()
1623
1624             if hasattr(test, 'vpp_worker_count'):
1625                 if test.vpp_worker_count == 0:
1626                     test_title += " [main thread only]"
1627                 elif test.vpp_worker_count == 1:
1628                     test_title += " [1 worker thread]"
1629                 else:
1630                     test_title += f" [{test.vpp_worker_count} worker threads]"
1631
1632             if test.__class__.skipped_due_to_cpu_lack:
1633                 test_title = colorize(
1634                     f"{test_title} [skipped - not enough cpus, "
1635                     f"required={test.__class__.get_cpus_required()}, "
1636                     f"available={max_vpp_cpus}]", YELLOW)
1637
1638             print(double_line_delim)
1639             print(test_title)
1640             print(double_line_delim)
1641             self.printed.append(test.__class__)
1642
1643         print_header(test)
1644         self.start_test = time.time()
1645         unittest.TestResult.startTest(self, test)
1646         if self.verbosity > 0:
1647             self.stream.writeln(
1648                 "Starting " + self.getDescription(test) + " ...")
1649             self.stream.writeln(single_line_delim)
1650
1651     def stopTest(self, test):
1652         """
1653         Called when the given test has been run
1654
1655         :param test:
1656
1657         """
1658         unittest.TestResult.stopTest(self, test)
1659
1660         if self.verbosity > 0:
1661             self.stream.writeln(single_line_delim)
1662             self.stream.writeln("%-73s%s" % (self.getDescription(test),
1663                                              self.result_string))
1664             self.stream.writeln(single_line_delim)
1665         else:
1666             self.stream.writeln("%-68s %4.2f %s" %
1667                                 (self.getDescription(test),
1668                                  time.time() - self.start_test,
1669                                  self.result_string))
1670
1671         self.send_result_through_pipe(test, TEST_RUN)
1672
1673     def printErrors(self):
1674         """
1675         Print errors from running the test case
1676         """
1677         if len(self.errors) > 0 or len(self.failures) > 0:
1678             self.stream.writeln()
1679             self.printErrorList('ERROR', self.errors)
1680             self.printErrorList('FAIL', self.failures)
1681
1682         # ^^ that is the last output from unittest before summary
1683         if not self.runner.print_summary:
1684             devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
1685             self.stream = devnull
1686             self.runner.stream = devnull
1687
1688     def printErrorList(self, flavour, errors):
1689         """
1690         Print error list to the output stream together with error type
1691         and test case description.
1692
1693         :param flavour: error type
1694         :param errors: iterable errors
1695
1696         """
1697         for test, err in errors:
1698             self.stream.writeln(double_line_delim)
1699             self.stream.writeln("%s: %s" %
1700                                 (flavour, self.getDescription(test)))
1701             self.stream.writeln(single_line_delim)
1702             self.stream.writeln("%s" % err)
1703
1704
1705 class VppTestRunner(unittest.TextTestRunner):
1706     """
1707     A basic test runner implementation which prints results to standard error.
1708     """
1709
1710     @property
1711     def resultclass(self):
1712         """Class maintaining the results of the tests"""
1713         return VppTestResult
1714
1715     def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1716                  result_pipe=None, failfast=False, buffer=False,
1717                  resultclass=None, print_summary=True, **kwargs):
1718         # ignore stream setting here, use hard-coded stdout to be in sync
1719         # with prints from VppTestCase methods ...
1720         super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1721                                             verbosity, failfast, buffer,
1722                                             resultclass, **kwargs)
1723         KeepAliveReporter.pipe = keep_alive_pipe
1724
1725         self.orig_stream = self.stream
1726         self.resultclass.test_framework_result_pipe = result_pipe
1727
1728         self.print_summary = print_summary
1729
1730     def _makeResult(self):
1731         return self.resultclass(self.stream,
1732                                 self.descriptions,
1733                                 self.verbosity,
1734                                 self)
1735
1736     def run(self, test):
1737         """
1738         Run the tests
1739
1740         :param test:
1741
1742         """
1743         faulthandler.enable()  # emit stack trace to stderr if killed by signal
1744
1745         result = super(VppTestRunner, self).run(test)
1746         if not self.print_summary:
1747             self.stream = self.orig_stream
1748             result.stream = self.orig_stream
1749         return result
1750
1751
1752 class Worker(Thread):
1753     def __init__(self, executable_args, logger, env=None, *args, **kwargs):
1754         super(Worker, self).__init__(*args, **kwargs)
1755         self.logger = logger
1756         self.args = executable_args
1757         if hasattr(self, 'testcase') and self.testcase.debug_all:
1758             if self.testcase.debug_gdbserver:
1759                 self.args = ['/usr/bin/gdbserver', 'localhost:{port}'
1760                              .format(port=self.testcase.gdbserver_port)] + args
1761             elif self.testcase.debug_gdb and hasattr(self, 'wait_for_gdb'):
1762                 self.args.append(self.wait_for_gdb)
1763         self.app_bin = executable_args[0]
1764         self.app_name = os.path.basename(self.app_bin)
1765         if hasattr(self, 'role'):
1766             self.app_name += ' {role}'.format(role=self.role)
1767         self.process = None
1768         self.result = None
1769         env = {} if env is None else env
1770         self.env = copy.deepcopy(env)
1771
1772     def wait_for_enter(self):
1773         if not hasattr(self, 'testcase'):
1774             return
1775         if self.testcase.debug_all and self.testcase.debug_gdbserver:
1776             print()
1777             print(double_line_delim)
1778             print("Spawned GDB Server for '{app}' with PID: {pid}"
1779                   .format(app=self.app_name, pid=self.process.pid))
1780         elif self.testcase.debug_all and self.testcase.debug_gdb:
1781             print()
1782             print(double_line_delim)
1783             print("Spawned '{app}' with PID: {pid}"
1784                   .format(app=self.app_name, pid=self.process.pid))
1785         else:
1786             return
1787         print(single_line_delim)
1788         print("You can debug '{app}' using:".format(app=self.app_name))
1789         if self.testcase.debug_gdbserver:
1790             print("sudo gdb " + self.app_bin +
1791                   " -ex 'target remote localhost:{port}'"
1792                   .format(port=self.testcase.gdbserver_port))
1793             print("Now is the time to attach gdb by running the above "
1794                   "command, set up breakpoints etc., then resume from "
1795                   "within gdb by issuing the 'continue' command")
1796             self.testcase.gdbserver_port += 1
1797         elif self.testcase.debug_gdb:
1798             print("sudo gdb " + self.app_bin +
1799                   " -ex 'attach {pid}'".format(pid=self.process.pid))
1800             print("Now is the time to attach gdb by running the above "
1801                   "command and set up breakpoints etc., then resume from"
1802                   " within gdb by issuing the 'continue' command")
1803         print(single_line_delim)
1804         input("Press ENTER to continue running the testcase...")
1805
1806     def run(self):
1807         executable = self.args[0]
1808         if not os.path.exists(executable) or not os.access(
1809                 executable, os.F_OK | os.X_OK):
1810             # Exit code that means some system file did not exist,
1811             # could not be opened, or had some other kind of error.
1812             self.result = os.EX_OSFILE
1813             raise EnvironmentError(
1814                 "executable '%s' is not found or executable." % executable)
1815         self.logger.debug("Running executable '{app}': '{cmd}'"
1816                           .format(app=self.app_name,
1817                                   cmd=' '.join(self.args)))
1818         env = os.environ.copy()
1819         env.update(self.env)
1820         env["CK_LOG_FILE_NAME"] = "-"
1821         self.process = subprocess.Popen(
1822             ['stdbuf', '-o0', '-e0'] + self.args, shell=False, env=env,
1823             preexec_fn=os.setpgrp, stdout=subprocess.PIPE,
1824             stderr=subprocess.PIPE)
1825         self.wait_for_enter()
1826         out, err = self.process.communicate()
1827         self.logger.debug("Finished running `{app}'".format(app=self.app_name))
1828         self.logger.info("Return code is `%s'" % self.process.returncode)
1829         self.logger.info(single_line_delim)
1830         self.logger.info("Executable `{app}' wrote to stdout:"
1831                          .format(app=self.app_name))
1832         self.logger.info(single_line_delim)
1833         self.logger.info(out.decode('utf-8'))
1834         self.logger.info(single_line_delim)
1835         self.logger.info("Executable `{app}' wrote to stderr:"
1836                          .format(app=self.app_name))
1837         self.logger.info(single_line_delim)
1838         self.logger.info(err.decode('utf-8'))
1839         self.logger.info(single_line_delim)
1840         self.result = self.process.returncode
1841
1842
1843 if __name__ == '__main__':
1844     pass