tests: better reporting for unexpected packets
[vpp.git] / test / framework.py
1 #!/usr/bin/env python3
2
3 from __future__ import print_function
4 import logging
5 import sys
6 import os
7 import select
8 import signal
9 import subprocess
10 import unittest
11 import re
12 import time
13 import faulthandler
14 import random
15 import copy
16 import platform
17 import shutil
18 from collections import deque
19 from threading import Thread, Event
20 from inspect import getdoc, isclass
21 from traceback import format_exception
22 from logging import FileHandler, DEBUG, Formatter
23 from enum import Enum
24 from abc import ABC, abstractmethod
25 from struct import pack, unpack
26
27 import scapy.compat
28 from scapy.packet import Raw, Packet
29 from config import config, available_cpus, num_cpus, max_vpp_cpus
30 import hook as hookmodule
31 from vpp_pg_interface import VppPGInterface
32 from vpp_sub_interface import VppSubInterface
33 from vpp_lo_interface import VppLoInterface
34 from vpp_bvi_interface import VppBviInterface
35 from vpp_papi_provider import VppPapiProvider
36 from vpp_papi import VppEnum
37 import vpp_papi
38 from vpp_papi.vpp_stats import VPPStats
39 from vpp_papi.vpp_transport_socket import VppTransportSocketIOError
40 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
41     get_logger, colorize
42 from vpp_object import VppObjectRegistry
43 from util import ppp, is_core_present
44 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
45 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
46 from scapy.layers.inet6 import ICMPv6EchoReply
47
48
49 logger = logging.getLogger(__name__)
50
51 # Set up an empty logger for the testcase that can be overridden as necessary
52 null_logger = logging.getLogger('VppTestCase')
53 null_logger.addHandler(logging.NullHandler())
54
55 PASS = 0
56 FAIL = 1
57 ERROR = 2
58 SKIP = 3
59 TEST_RUN = 4
60 SKIP_CPU_SHORTAGE = 5
61
62
63 if config.debug_framework:
64     import debug_internal
65
66 """
67   Test framework module.
68
69   The module provides a set of tools for constructing and running tests and
70   representing the results.
71 """
72
73
74 class VppDiedError(Exception):
75     """ exception for reporting that the subprocess has died."""
76
77     signals_by_value = {v: k for k, v in signal.__dict__.items() if
78                         k.startswith('SIG') and not k.startswith('SIG_')}
79
80     def __init__(self, rv=None, testcase=None, method_name=None):
81         self.rv = rv
82         self.signal_name = None
83         self.testcase = testcase
84         self.method_name = method_name
85
86         try:
87             self.signal_name = VppDiedError.signals_by_value[-rv]
88         except (KeyError, TypeError):
89             pass
90
91         if testcase is None and method_name is None:
92             in_msg = ''
93         else:
94             in_msg = ' while running %s.%s' % (testcase, method_name)
95
96         if self.rv:
97             msg = "VPP subprocess died unexpectedly%s with return code: %d%s."\
98                 % (in_msg, self.rv, ' [%s]' %
99                    (self.signal_name if
100                     self.signal_name is not None else ''))
101         else:
102             msg = "VPP subprocess died unexpectedly%s." % in_msg
103
104         super(VppDiedError, self).__init__(msg)
105
106
107 class _PacketInfo(object):
108     """Private class to create packet info object.
109
110     Help process information about the next packet.
111     Set variables to default values.
112     """
113     #: Store the index of the packet.
114     index = -1
115     #: Store the index of the source packet generator interface of the packet.
116     src = -1
117     #: Store the index of the destination packet generator interface
118     #: of the packet.
119     dst = -1
120     #: Store expected ip version
121     ip = -1
122     #: Store expected upper protocol
123     proto = -1
124     #: Store the copy of the former packet.
125     data = None
126
127     def __eq__(self, other):
128         index = self.index == other.index
129         src = self.src == other.src
130         dst = self.dst == other.dst
131         data = self.data == other.data
132         return index and src and dst and data
133
134
135 def pump_output(testclass):
136     """ pump output from vpp stdout/stderr to proper queues """
137     stdout_fragment = ""
138     stderr_fragment = ""
139     while not testclass.pump_thread_stop_flag.is_set():
140         readable = select.select([testclass.vpp.stdout.fileno(),
141                                   testclass.vpp.stderr.fileno(),
142                                   testclass.pump_thread_wakeup_pipe[0]],
143                                  [], [])[0]
144         if testclass.vpp.stdout.fileno() in readable:
145             read = os.read(testclass.vpp.stdout.fileno(), 102400)
146             if len(read) > 0:
147                 split = read.decode('ascii',
148                                     errors='backslashreplace').splitlines(True)
149                 if len(stdout_fragment) > 0:
150                     split[0] = "%s%s" % (stdout_fragment, split[0])
151                 if len(split) > 0 and split[-1].endswith("\n"):
152                     limit = None
153                 else:
154                     limit = -1
155                     stdout_fragment = split[-1]
156                 testclass.vpp_stdout_deque.extend(split[:limit])
157                 if not config.cache_vpp_output:
158                     for line in split[:limit]:
159                         testclass.logger.info(
160                             "VPP STDOUT: %s" % line.rstrip("\n"))
161         if testclass.vpp.stderr.fileno() in readable:
162             read = os.read(testclass.vpp.stderr.fileno(), 102400)
163             if len(read) > 0:
164                 split = read.decode('ascii',
165                                     errors='backslashreplace').splitlines(True)
166                 if len(stderr_fragment) > 0:
167                     split[0] = "%s%s" % (stderr_fragment, split[0])
168                 if len(split) > 0 and split[-1].endswith("\n"):
169                     limit = None
170                 else:
171                     limit = -1
172                     stderr_fragment = split[-1]
173
174                 testclass.vpp_stderr_deque.extend(split[:limit])
175                 if not config.cache_vpp_output:
176                     for line in split[:limit]:
177                         testclass.logger.error(
178                             "VPP STDERR: %s" % line.rstrip("\n"))
179                         # ignoring the dummy pipe here intentionally - the
180                         # flag will take care of properly terminating the loop
181
182
183 def _is_platform_aarch64():
184     return platform.machine() == 'aarch64'
185
186
187 is_platform_aarch64 = _is_platform_aarch64()
188
189
190 class KeepAliveReporter(object):
191     """
192     Singleton object which reports test start to parent process
193     """
194     _shared_state = {}
195
196     def __init__(self):
197         self.__dict__ = self._shared_state
198         self._pipe = None
199
200     @property
201     def pipe(self):
202         return self._pipe
203
204     @pipe.setter
205     def pipe(self, pipe):
206         if self._pipe is not None:
207             raise Exception("Internal error - pipe should only be set once.")
208         self._pipe = pipe
209
210     def send_keep_alive(self, test, desc=None):
211         """
212         Write current test tmpdir & desc to keep-alive pipe to signal liveness
213         """
214         if self.pipe is None:
215             # if not running forked..
216             return
217
218         if isclass(test):
219             desc = '%s (%s)' % (desc, unittest.util.strclass(test))
220         else:
221             desc = test.id()
222
223         self.pipe.send((desc, config.vpp, test.tempdir, test.vpp.pid))
224
225
226 class TestCaseTag(Enum):
227     # marks the suites that must run at the end
228     # using only a single test runner
229     RUN_SOLO = 1
230     # marks the suites broken on VPP multi-worker
231     FIXME_VPP_WORKERS = 2
232     # marks the suites broken when ASan is enabled
233     FIXME_ASAN = 3
234
235
236 def create_tag_decorator(e):
237     def decorator(cls):
238         try:
239             cls.test_tags.append(e)
240         except AttributeError:
241             cls.test_tags = [e]
242         return cls
243     return decorator
244
245
246 tag_run_solo = create_tag_decorator(TestCaseTag.RUN_SOLO)
247 tag_fixme_vpp_workers = create_tag_decorator(TestCaseTag.FIXME_VPP_WORKERS)
248 tag_fixme_asan = create_tag_decorator(TestCaseTag.FIXME_ASAN)
249
250
251 class DummyVpp:
252     returncode = None
253     pid = 0xcafebafe
254
255     def poll(self):
256         pass
257
258     def terminate(self):
259         pass
260
261
262 class CPUInterface(ABC):
263     cpus = []
264     skipped_due_to_cpu_lack = False
265
266     @classmethod
267     @abstractmethod
268     def get_cpus_required(cls):
269         pass
270
271     @classmethod
272     def assign_cpus(cls, cpus):
273         cls.cpus = cpus
274
275
276 class VppTestCase(CPUInterface, unittest.TestCase):
277     """This subclass is a base class for VPP test cases that are implemented as
278     classes. It provides methods to create and run test case.
279     """
280
281     extra_vpp_statseg_config = ""
282     extra_vpp_punt_config = []
283     extra_vpp_plugin_config = []
284     logger = null_logger
285     vapi_response_timeout = 5
286     remove_configured_vpp_objects_on_tear_down = True
287
288     @property
289     def packet_infos(self):
290         """List of packet infos"""
291         return self._packet_infos
292
293     @classmethod
294     def get_packet_count_for_if_idx(cls, dst_if_index):
295         """Get the number of packet info for specified destination if index"""
296         if dst_if_index in cls._packet_count_for_dst_if_idx:
297             return cls._packet_count_for_dst_if_idx[dst_if_index]
298         else:
299             return 0
300
301     @classmethod
302     def has_tag(cls, tag):
303         """ if the test case has a given tag - return true """
304         try:
305             return tag in cls.test_tags
306         except AttributeError:
307             pass
308         return False
309
310     @classmethod
311     def is_tagged_run_solo(cls):
312         """ if the test case class is timing-sensitive - return true """
313         return cls.has_tag(TestCaseTag.RUN_SOLO)
314
315     @classmethod
316     def skip_fixme_asan(cls):
317         """ if @tag_fixme_asan & ASan is enabled - mark for skip """
318         if cls.has_tag(TestCaseTag.FIXME_ASAN):
319             vpp_extra_cmake_args = os.environ.get('VPP_EXTRA_CMAKE_ARGS', '')
320             if 'DVPP_ENABLE_SANITIZE_ADDR=ON' in vpp_extra_cmake_args:
321                 cls = unittest.skip("Skipping @tag_fixme_asan tests")(cls)
322
323     @classmethod
324     def instance(cls):
325         """Return the instance of this testcase"""
326         return cls.test_instance
327
328     @classmethod
329     def set_debug_flags(cls, d):
330         cls.gdbserver_port = 7777
331         cls.debug_core = False
332         cls.debug_gdb = False
333         cls.debug_gdbserver = False
334         cls.debug_all = False
335         cls.debug_attach = False
336         if d is None:
337             return
338         dl = d.lower()
339         if dl == "core":
340             cls.debug_core = True
341         elif dl == "gdb" or dl == "gdb-all":
342             cls.debug_gdb = True
343         elif dl == "gdbserver" or dl == "gdbserver-all":
344             cls.debug_gdbserver = True
345         elif dl == "attach":
346             cls.debug_attach = True
347         else:
348             raise Exception("Unrecognized DEBUG option: '%s'" % d)
349         if dl == "gdb-all" or dl == "gdbserver-all":
350             cls.debug_all = True
351
352     @classmethod
353     def get_vpp_worker_count(cls):
354         if not hasattr(cls, "vpp_worker_count"):
355             if cls.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
356                 cls.vpp_worker_count = 0
357             else:
358                 cls.vpp_worker_count = config.vpp_worker_count
359         return cls.vpp_worker_count
360
361     @classmethod
362     def get_cpus_required(cls):
363         return 1 + cls.get_vpp_worker_count()
364
365     @classmethod
366     def setUpConstants(cls):
367         """ Set-up the test case class based on environment variables """
368         cls.step = config.step
369         cls.plugin_path = ":".join(config.vpp_plugin_dir)
370         cls.test_plugin_path = ":".join(config.vpp_test_plugin_dir)
371         cls.extern_plugin_path = ":".join(config.extern_plugin_dir)
372         debug_cli = ""
373         if cls.step or cls.debug_gdb or cls.debug_gdbserver:
374             debug_cli = "cli-listen localhost:5002"
375         size = re.search(r"\d+[gG]", config.coredump_size)
376         if size:
377             coredump_size = f"coredump-size {config.coredump_size}".lower()
378         else:
379             coredump_size = "coredump-size unlimited"
380         default_variant = config.variant
381         if default_variant is not None:
382             default_variant = "defaults { %s 100 }" % default_variant
383         else:
384             default_variant = ""
385
386         api_fuzzing = config.api_fuzz
387         if api_fuzzing is None:
388             api_fuzzing = 'off'
389
390         cls.vpp_cmdline = [
391             config.vpp,
392             "unix", "{", "nodaemon", debug_cli, "full-coredump",
393             coredump_size, "runtime-dir", cls.tempdir, "}",
394             "api-trace", "{", "on", "}",
395             "api-segment", "{", "prefix", cls.get_api_segment_prefix(), "}",
396             "cpu", "{", "main-core", str(cls.cpus[0]), ]
397         if cls.extern_plugin_path not in (None, ""):
398             cls.extra_vpp_plugin_config.append(
399                 "add-path %s" % cls.extern_plugin_path)
400         if cls.get_vpp_worker_count():
401             cls.vpp_cmdline.extend([
402                 "corelist-workers", ",".join([str(x) for x in cls.cpus[1:]])])
403         cls.vpp_cmdline.extend([
404             "}",
405             "physmem", "{", "max-size", "32m", "}",
406             "statseg", "{", "socket-name", cls.get_stats_sock_path(),
407             cls.extra_vpp_statseg_config, "}",
408             "socksvr", "{", "socket-name", cls.get_api_sock_path(), "}",
409             "node { ", default_variant, "}",
410             "api-fuzz {", api_fuzzing, "}",
411             "plugins", "{", "plugin", "dpdk_plugin.so", "{", "disable", "}",
412             "plugin", "rdma_plugin.so", "{", "disable", "}",
413             "plugin", "lisp_unittest_plugin.so", "{", "enable", "}",
414             "plugin", "unittest_plugin.so", "{", "enable", "}"
415         ] + cls.extra_vpp_plugin_config + ["}", ])
416
417         if cls.extra_vpp_punt_config is not None:
418             cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
419
420         if not cls.debug_attach:
421             cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
422             cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
423
424     @classmethod
425     def wait_for_enter(cls):
426         if cls.debug_gdbserver:
427             print(double_line_delim)
428             print("Spawned GDB server with PID: %d" % cls.vpp.pid)
429         elif cls.debug_gdb:
430             print(double_line_delim)
431             print("Spawned VPP with PID: %d" % cls.vpp.pid)
432         else:
433             cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
434             return
435         print(single_line_delim)
436         print("You can debug VPP using:")
437         if cls.debug_gdbserver:
438             print(f"sudo gdb {config.vpp} "
439                   f"-ex 'target remote localhost:{cls.gdbserver_port}'")
440             print("Now is the time to attach gdb by running the above "
441                   "command, set up breakpoints etc., then resume VPP from "
442                   "within gdb by issuing the 'continue' command")
443             cls.gdbserver_port += 1
444         elif cls.debug_gdb:
445             print(f"sudo gdb {config.vpp} -ex 'attach {cls.vpp.pid}'")
446             print("Now is the time to attach gdb by running the above "
447                   "command and set up breakpoints etc., then resume VPP from"
448                   " within gdb by issuing the 'continue' command")
449         print(single_line_delim)
450         input("Press ENTER to continue running the testcase...")
451
452     @classmethod
453     def attach_vpp(cls):
454         cls.vpp = DummyVpp()
455
456     @classmethod
457     def run_vpp(cls):
458         cls.logger.debug(f"Assigned cpus: {cls.cpus}")
459         cmdline = cls.vpp_cmdline
460
461         if cls.debug_gdbserver:
462             gdbserver = '/usr/bin/gdbserver'
463             if not os.path.isfile(gdbserver) or\
464                     not os.access(gdbserver, os.X_OK):
465                 raise Exception("gdbserver binary '%s' does not exist or is "
466                                 "not executable" % gdbserver)
467
468             cmdline = [gdbserver, 'localhost:{port}'
469                        .format(port=cls.gdbserver_port)] + cls.vpp_cmdline
470             cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
471
472         try:
473             cls.vpp = subprocess.Popen(cmdline,
474                                        stdout=subprocess.PIPE,
475                                        stderr=subprocess.PIPE)
476         except subprocess.CalledProcessError as e:
477             cls.logger.critical("Subprocess returned with non-0 return code: ("
478                                 "%s)", e.returncode)
479             raise
480         except OSError as e:
481             cls.logger.critical("Subprocess returned with OS error: "
482                                 "(%s) %s", e.errno, e.strerror)
483             raise
484         except Exception as e:
485             cls.logger.exception("Subprocess returned unexpected from "
486                                  "%s:", cmdline)
487             raise
488
489         cls.wait_for_enter()
490
491     @classmethod
492     def wait_for_coredump(cls):
493         corefile = cls.tempdir + "/core"
494         if os.path.isfile(corefile):
495             cls.logger.error("Waiting for coredump to complete: %s", corefile)
496             curr_size = os.path.getsize(corefile)
497             deadline = time.time() + 60
498             ok = False
499             while time.time() < deadline:
500                 cls.sleep(1)
501                 size = curr_size
502                 curr_size = os.path.getsize(corefile)
503                 if size == curr_size:
504                     ok = True
505                     break
506             if not ok:
507                 cls.logger.error("Timed out waiting for coredump to complete:"
508                                  " %s", corefile)
509             else:
510                 cls.logger.error("Coredump complete: %s, size %d",
511                                  corefile, curr_size)
512
513     @classmethod
514     def get_stats_sock_path(cls):
515         return "%s/stats.sock" % cls.tempdir
516
517     @classmethod
518     def get_api_sock_path(cls):
519         return "%s/api.sock" % cls.tempdir
520
521     @classmethod
522     def get_api_segment_prefix(cls):
523         return os.path.basename(cls.tempdir)  # Only used for VAPI
524
525     @classmethod
526     def get_tempdir(cls):
527         tmpdir = f"{config.tmp_dir}/vpp-unittest-{cls.__name__}"
528         if config.wipe_tmp_dir:
529             shutil.rmtree(tmpdir, ignore_errors=True)
530         os.mkdir(tmpdir)
531         return tmpdir
532
533     @classmethod
534     def create_file_handler(cls):
535         if config.log_dir is None:
536             cls.file_handler = FileHandler(f"{cls.tempdir}/log.txt")
537             return
538
539         logdir = f"{config.log_dir}/vpp-unittest-{cls.__name__}"
540         if config.wipe_tmp_dir:
541             shutil.rmtree(logdir, ignore_errors=True)
542         os.mkdir(logdir)
543         cls.file_handler = FileHandler(f"{logdir}/log.txt")
544
545     @classmethod
546     def setUpClass(cls):
547         """
548         Perform class setup before running the testcase
549         Remove shared memory files, start vpp and connect the vpp-api
550         """
551         super(VppTestCase, cls).setUpClass()
552         cls.logger = get_logger(cls.__name__)
553         random.seed(config.rnd_seed)
554         if hasattr(cls, 'parallel_handler'):
555             cls.logger.addHandler(cls.parallel_handler)
556             cls.logger.propagate = False
557         cls.set_debug_flags(config.debug)
558         cls.tempdir = cls.get_tempdir()
559         cls.create_file_handler()
560         cls.file_handler.setFormatter(
561             Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
562                       datefmt="%H:%M:%S"))
563         cls.file_handler.setLevel(DEBUG)
564         cls.logger.addHandler(cls.file_handler)
565         cls.logger.debug("--- setUpClass() for %s called ---" % cls.__name__)
566         os.chdir(cls.tempdir)
567         cls.logger.info("Temporary dir is %s, api socket is %s",
568                         cls.tempdir, cls.get_api_sock_path())
569         cls.logger.debug("Random seed is %s", config.rnd_seed)
570         cls.setUpConstants()
571         cls.reset_packet_infos()
572         cls._pcaps = []
573         cls._old_pcaps = []
574         cls.verbose = 0
575         cls.vpp_dead = False
576         cls.registry = VppObjectRegistry()
577         cls.vpp_startup_failed = False
578         cls.reporter = KeepAliveReporter()
579         # need to catch exceptions here because if we raise, then the cleanup
580         # doesn't get called and we might end with a zombie vpp
581         try:
582             if cls.debug_attach:
583                 cls.attach_vpp()
584             else:
585                 cls.run_vpp()
586             cls.reporter.send_keep_alive(cls, 'setUpClass')
587             VppTestResult.current_test_case_info = TestCaseInfo(
588                 cls.logger, cls.tempdir, cls.vpp.pid, config.vpp)
589             cls.vpp_stdout_deque = deque()
590             cls.vpp_stderr_deque = deque()
591             if not cls.debug_attach:
592                 cls.pump_thread_stop_flag = Event()
593                 cls.pump_thread_wakeup_pipe = os.pipe()
594                 cls.pump_thread = Thread(target=pump_output, args=(cls,))
595                 cls.pump_thread.daemon = True
596                 cls.pump_thread.start()
597             if cls.debug_gdb or cls.debug_gdbserver or cls.debug_attach:
598                 cls.vapi_response_timeout = 0
599             cls.vapi = VppPapiProvider(cls.__name__, cls,
600                                        cls.vapi_response_timeout)
601             if cls.step:
602                 hook = hookmodule.StepHook(cls)
603             else:
604                 hook = hookmodule.PollHook(cls)
605             cls.vapi.register_hook(hook)
606             cls.statistics = VPPStats(socketname=cls.get_stats_sock_path())
607             try:
608                 hook.poll_vpp()
609             except VppDiedError:
610                 cls.vpp_startup_failed = True
611                 cls.logger.critical(
612                     "VPP died shortly after startup, check the"
613                     " output to standard error for possible cause")
614                 raise
615             try:
616                 cls.vapi.connect()
617             except (vpp_papi.VPPIOError, Exception) as e:
618                 cls.logger.debug("Exception connecting to vapi: %s" % e)
619                 cls.vapi.disconnect()
620
621                 if cls.debug_gdbserver:
622                     print(colorize("You're running VPP inside gdbserver but "
623                                    "VPP-API connection failed, did you forget "
624                                    "to 'continue' VPP from within gdb?", RED))
625                 raise e
626             if cls.debug_attach:
627                 last_line = cls.vapi.cli("show thread").split("\n")[-2]
628                 cls.vpp_worker_count = int(last_line.split(" ")[0])
629                 print("Detected VPP with %s workers." % cls.vpp_worker_count)
630         except vpp_papi.VPPRuntimeError as e:
631             cls.logger.debug("%s" % e)
632             cls.quit()
633             raise e
634         except Exception as e:
635             cls.logger.debug("Exception connecting to VPP: %s" % e)
636             cls.quit()
637             raise e
638
639     @classmethod
640     def _debug_quit(cls):
641         if (cls.debug_gdbserver or cls.debug_gdb):
642             try:
643                 cls.vpp.poll()
644
645                 if cls.vpp.returncode is None:
646                     print()
647                     print(double_line_delim)
648                     print("VPP or GDB server is still running")
649                     print(single_line_delim)
650                     input("When done debugging, press ENTER to kill the "
651                           "process and finish running the testcase...")
652             except AttributeError:
653                 pass
654
655     @classmethod
656     def quit(cls):
657         """
658         Disconnect vpp-api, kill vpp and cleanup shared memory files
659         """
660         cls._debug_quit()
661
662         # first signal that we want to stop the pump thread, then wake it up
663         if hasattr(cls, 'pump_thread_stop_flag'):
664             cls.pump_thread_stop_flag.set()
665         if hasattr(cls, 'pump_thread_wakeup_pipe'):
666             os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
667         if hasattr(cls, 'pump_thread'):
668             cls.logger.debug("Waiting for pump thread to stop")
669             cls.pump_thread.join()
670         if hasattr(cls, 'vpp_stderr_reader_thread'):
671             cls.logger.debug("Waiting for stderr pump to stop")
672             cls.vpp_stderr_reader_thread.join()
673
674         if hasattr(cls, 'vpp'):
675             if hasattr(cls, 'vapi'):
676                 cls.logger.debug(cls.vapi.vpp.get_stats())
677                 cls.logger.debug("Disconnecting class vapi client on %s",
678                                  cls.__name__)
679                 cls.vapi.disconnect()
680                 cls.logger.debug("Deleting class vapi attribute on %s",
681                                  cls.__name__)
682                 del cls.vapi
683             cls.vpp.poll()
684             if not cls.debug_attach and cls.vpp.returncode is None:
685                 cls.wait_for_coredump()
686                 cls.logger.debug("Sending TERM to vpp")
687                 cls.vpp.terminate()
688                 cls.logger.debug("Waiting for vpp to die")
689                 try:
690                     outs, errs = cls.vpp.communicate(timeout=5)
691                 except subprocess.TimeoutExpired:
692                     cls.vpp.kill()
693                     outs, errs = cls.vpp.communicate()
694             cls.logger.debug("Deleting class vpp attribute on %s",
695                              cls.__name__)
696             if not cls.debug_attach:
697                 cls.vpp.stdout.close()
698                 cls.vpp.stderr.close()
699             del cls.vpp
700
701         if cls.vpp_startup_failed:
702             stdout_log = cls.logger.info
703             stderr_log = cls.logger.critical
704         else:
705             stdout_log = cls.logger.info
706             stderr_log = cls.logger.info
707
708         if hasattr(cls, 'vpp_stdout_deque'):
709             stdout_log(single_line_delim)
710             stdout_log('VPP output to stdout while running %s:', cls.__name__)
711             stdout_log(single_line_delim)
712             vpp_output = "".join(cls.vpp_stdout_deque)
713             with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
714                 f.write(vpp_output)
715             stdout_log('\n%s', vpp_output)
716             stdout_log(single_line_delim)
717
718         if hasattr(cls, 'vpp_stderr_deque'):
719             stderr_log(single_line_delim)
720             stderr_log('VPP output to stderr while running %s:', cls.__name__)
721             stderr_log(single_line_delim)
722             vpp_output = "".join(cls.vpp_stderr_deque)
723             with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
724                 f.write(vpp_output)
725             stderr_log('\n%s', vpp_output)
726             stderr_log(single_line_delim)
727
728     @classmethod
729     def tearDownClass(cls):
730         """ Perform final cleanup after running all tests in this test-case """
731         cls.logger.debug("--- tearDownClass() for %s called ---" %
732                          cls.__name__)
733         cls.reporter.send_keep_alive(cls, 'tearDownClass')
734         cls.quit()
735         cls.file_handler.close()
736         cls.reset_packet_infos()
737         if config.debug_framework:
738             debug_internal.on_tear_down_class(cls)
739
740     def show_commands_at_teardown(self):
741         """ Allow subclass specific teardown logging additions."""
742         self.logger.info("--- No test specific show commands provided. ---")
743
744     def tearDown(self):
745         """ Show various debug prints after each test """
746         self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
747                           (self.__class__.__name__, self._testMethodName,
748                            self._testMethodDoc))
749
750         try:
751             if not self.vpp_dead:
752                 self.logger.debug(self.vapi.cli("show trace max 1000"))
753                 self.logger.info(self.vapi.ppcli("show interface"))
754                 self.logger.info(self.vapi.ppcli("show hardware"))
755                 self.logger.info(self.statistics.set_errors_str())
756                 self.logger.info(self.vapi.ppcli("show run"))
757                 self.logger.info(self.vapi.ppcli("show log"))
758                 self.logger.info(self.vapi.ppcli("show bihash"))
759                 self.logger.info("Logging testcase specific show commands.")
760                 self.show_commands_at_teardown()
761                 if self.remove_configured_vpp_objects_on_tear_down:
762                     self.registry.remove_vpp_config(self.logger)
763             # Save/Dump VPP api trace log
764             m = self._testMethodName
765             api_trace = "vpp_api_trace.%s.%d.log" % (m, self.vpp.pid)
766             tmp_api_trace = "/tmp/%s" % api_trace
767             vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
768             self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
769             self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
770                                                     vpp_api_trace_log))
771             os.rename(tmp_api_trace, vpp_api_trace_log)
772         except VppTransportSocketIOError:
773             self.logger.debug("VppTransportSocketIOError: Vpp dead. "
774                               "Cannot log show commands.")
775             self.vpp_dead = True
776         else:
777             self.registry.unregister_all(self.logger)
778
779     def setUp(self):
780         """ Clear trace before running each test"""
781         super(VppTestCase, self).setUp()
782         self.reporter.send_keep_alive(self)
783         if self.vpp_dead:
784             raise VppDiedError(rv=None, testcase=self.__class__.__name__,
785                                method_name=self._testMethodName)
786         self.sleep(.1, "during setUp")
787         self.vpp_stdout_deque.append(
788             "--- test setUp() for %s.%s(%s) starts here ---\n" %
789             (self.__class__.__name__, self._testMethodName,
790              self._testMethodDoc))
791         self.vpp_stderr_deque.append(
792             "--- test setUp() for %s.%s(%s) starts here ---\n" %
793             (self.__class__.__name__, self._testMethodName,
794              self._testMethodDoc))
795         self.vapi.cli("clear trace")
796         # store the test instance inside the test class - so that objects
797         # holding the class can access instance methods (like assertEqual)
798         type(self).test_instance = self
799
800     @classmethod
801     def pg_enable_capture(cls, interfaces=None):
802         """
803         Enable capture on packet-generator interfaces
804
805         :param interfaces: iterable interface indexes (if None,
806                            use self.pg_interfaces)
807
808         """
809         if interfaces is None:
810             interfaces = cls.pg_interfaces
811         for i in interfaces:
812             i.enable_capture()
813
814     @classmethod
815     def register_pcap(cls, intf, worker):
816         """ Register a pcap in the testclass """
817         # add to the list of captures with current timestamp
818         cls._pcaps.append((intf, worker))
819
820     @classmethod
821     def get_vpp_time(cls):
822         # processes e.g. "Time now 2.190522, Wed, 11 Mar 2020 17:29:54 GMT"
823         # returns float("2.190522")
824         timestr = cls.vapi.cli('show clock')
825         head, sep, tail = timestr.partition(',')
826         head, sep, tail = head.partition('Time now')
827         return float(tail)
828
829     @classmethod
830     def sleep_on_vpp_time(cls, sec):
831         """ Sleep according to time in VPP world """
832         # On a busy system with many processes
833         # we might end up with VPP time being slower than real world
834         # So take that into account when waiting for VPP to do something
835         start_time = cls.get_vpp_time()
836         while cls.get_vpp_time() - start_time < sec:
837             cls.sleep(0.1)
838
839     @classmethod
840     def pg_start(cls, trace=True):
841         """ Enable the PG, wait till it is done, then clean up """
842         for (intf, worker) in cls._old_pcaps:
843             intf.handle_old_pcap_file(intf.get_in_path(worker),
844                                       intf.in_history_counter)
845         cls._old_pcaps = []
846         if trace:
847             cls.vapi.cli("clear trace")
848             cls.vapi.cli("trace add pg-input 1000")
849         cls.vapi.cli('packet-generator enable')
850         # PG, when starts, runs to completion -
851         # so let's avoid a race condition,
852         # and wait a little till it's done.
853         # Then clean it up  - and then be gone.
854         deadline = time.time() + 300
855         while cls.vapi.cli('show packet-generator').find("Yes") != -1:
856             cls.sleep(0.01)  # yield
857             if time.time() > deadline:
858                 cls.logger.error("Timeout waiting for pg to stop")
859                 break
860         for intf, worker in cls._pcaps:
861             cls.vapi.cli('packet-generator delete %s' %
862                          intf.get_cap_name(worker))
863         cls._old_pcaps = cls._pcaps
864         cls._pcaps = []
865
866     @classmethod
867     def create_pg_interfaces_internal(cls, interfaces, gso=0, gso_size=0,
868                                       mode=None):
869         """
870         Create packet-generator interfaces.
871
872         :param interfaces: iterable indexes of the interfaces.
873         :returns: List of created interfaces.
874
875         """
876         result = []
877         for i in interfaces:
878             intf = VppPGInterface(cls, i, gso, gso_size, mode)
879             setattr(cls, intf.name, intf)
880             result.append(intf)
881         cls.pg_interfaces = result
882         return result
883
884     @classmethod
885     def create_pg_ip4_interfaces(cls, interfaces, gso=0, gso_size=0):
886         pgmode = VppEnum.vl_api_pg_interface_mode_t
887         return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
888                                                  pgmode.PG_API_MODE_IP4)
889
890     @classmethod
891     def create_pg_ip6_interfaces(cls, interfaces, gso=0, gso_size=0):
892         pgmode = VppEnum.vl_api_pg_interface_mode_t
893         return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
894                                                  pgmode.PG_API_MODE_IP6)
895
896     @classmethod
897     def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0):
898         pgmode = VppEnum.vl_api_pg_interface_mode_t
899         return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
900                                                  pgmode.PG_API_MODE_ETHERNET)
901
902     @classmethod
903     def create_pg_ethernet_interfaces(cls, interfaces, gso=0, gso_size=0):
904         pgmode = VppEnum.vl_api_pg_interface_mode_t
905         return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
906                                                  pgmode.PG_API_MODE_ETHERNET)
907
908     @classmethod
909     def create_loopback_interfaces(cls, count):
910         """
911         Create loopback interfaces.
912
913         :param count: number of interfaces created.
914         :returns: List of created interfaces.
915         """
916         result = [VppLoInterface(cls) for i in range(count)]
917         for intf in result:
918             setattr(cls, intf.name, intf)
919         cls.lo_interfaces = result
920         return result
921
922     @classmethod
923     def create_bvi_interfaces(cls, count):
924         """
925         Create BVI interfaces.
926
927         :param count: number of interfaces created.
928         :returns: List of created interfaces.
929         """
930         result = [VppBviInterface(cls) for i in range(count)]
931         for intf in result:
932             setattr(cls, intf.name, intf)
933         cls.bvi_interfaces = result
934         return result
935
936     @staticmethod
937     def extend_packet(packet, size, padding=' '):
938         """
939         Extend packet to given size by padding with spaces or custom padding
940         NOTE: Currently works only when Raw layer is present.
941
942         :param packet: packet
943         :param size: target size
944         :param padding: padding used to extend the payload
945
946         """
947         packet_len = len(packet) + 4
948         extend = size - packet_len
949         if extend > 0:
950             num = (extend // len(padding)) + 1
951             packet[Raw].load += (padding * num)[:extend].encode("ascii")
952
953     @classmethod
954     def reset_packet_infos(cls):
955         """ Reset the list of packet info objects and packet counts to zero """
956         cls._packet_infos = {}
957         cls._packet_count_for_dst_if_idx = {}
958
959     @classmethod
960     def create_packet_info(cls, src_if, dst_if):
961         """
962         Create packet info object containing the source and destination indexes
963         and add it to the testcase's packet info list
964
965         :param VppInterface src_if: source interface
966         :param VppInterface dst_if: destination interface
967
968         :returns: _PacketInfo object
969
970         """
971         info = _PacketInfo()
972         info.index = len(cls._packet_infos)
973         info.src = src_if.sw_if_index
974         info.dst = dst_if.sw_if_index
975         if isinstance(dst_if, VppSubInterface):
976             dst_idx = dst_if.parent.sw_if_index
977         else:
978             dst_idx = dst_if.sw_if_index
979         if dst_idx in cls._packet_count_for_dst_if_idx:
980             cls._packet_count_for_dst_if_idx[dst_idx] += 1
981         else:
982             cls._packet_count_for_dst_if_idx[dst_idx] = 1
983         cls._packet_infos[info.index] = info
984         return info
985
986     @staticmethod
987     def info_to_payload(info):
988         """
989         Convert _PacketInfo object to packet payload
990
991         :param info: _PacketInfo object
992
993         :returns: string containing serialized data from packet info
994         """
995
996         # retrieve payload, currently 18 bytes (4 x ints + 1 short)
997         return pack('iiiih', info.index, info.src,
998                     info.dst, info.ip, info.proto)
999
1000     @staticmethod
1001     def payload_to_info(payload, payload_field='load'):
1002         """
1003         Convert packet payload to _PacketInfo object
1004
1005         :param payload: packet payload
1006         :type payload:  <class 'scapy.packet.Raw'>
1007         :param payload_field: packet fieldname of payload "load" for
1008                 <class 'scapy.packet.Raw'>
1009         :type payload_field: str
1010         :returns: _PacketInfo object containing de-serialized data from payload
1011
1012         """
1013
1014         # retrieve payload, currently 18 bytes (4 x ints + 1 short)
1015         payload_b = getattr(payload, payload_field)[:18]
1016
1017         info = _PacketInfo()
1018         info.index, info.src, info.dst, info.ip, info.proto \
1019             = unpack('iiiih', payload_b)
1020
1021         # some SRv6 TCs depend on get an exception if bad values are detected
1022         if info.index > 0x4000:
1023             raise ValueError('Index value is invalid')
1024
1025         return info
1026
1027     def get_next_packet_info(self, info):
1028         """
1029         Iterate over the packet info list stored in the testcase
1030         Start iteration with first element if info is None
1031         Continue based on index in info if info is specified
1032
1033         :param info: info or None
1034         :returns: next info in list or None if no more infos
1035         """
1036         if info is None:
1037             next_index = 0
1038         else:
1039             next_index = info.index + 1
1040         if next_index == len(self._packet_infos):
1041             return None
1042         else:
1043             return self._packet_infos[next_index]
1044
1045     def get_next_packet_info_for_interface(self, src_index, info):
1046         """
1047         Search the packet info list for the next packet info with same source
1048         interface index
1049
1050         :param src_index: source interface index to search for
1051         :param info: packet info - where to start the search
1052         :returns: packet info or None
1053
1054         """
1055         while True:
1056             info = self.get_next_packet_info(info)
1057             if info is None:
1058                 return None
1059             if info.src == src_index:
1060                 return info
1061
1062     def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
1063         """
1064         Search the packet info list for the next packet info with same source
1065         and destination interface indexes
1066
1067         :param src_index: source interface index to search for
1068         :param dst_index: destination interface index to search for
1069         :param info: packet info - where to start the search
1070         :returns: packet info or None
1071
1072         """
1073         while True:
1074             info = self.get_next_packet_info_for_interface(src_index, info)
1075             if info is None:
1076                 return None
1077             if info.dst == dst_index:
1078                 return info
1079
1080     def assert_equal(self, real_value, expected_value, name_or_class=None):
1081         if name_or_class is None:
1082             self.assertEqual(real_value, expected_value)
1083             return
1084         try:
1085             msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
1086             msg = msg % (getdoc(name_or_class).strip(),
1087                          real_value, str(name_or_class(real_value)),
1088                          expected_value, str(name_or_class(expected_value)))
1089         except Exception:
1090             msg = "Invalid %s: %s does not match expected value %s" % (
1091                 name_or_class, real_value, expected_value)
1092
1093         self.assertEqual(real_value, expected_value, msg)
1094
1095     def assert_in_range(self,
1096                         real_value,
1097                         expected_min,
1098                         expected_max,
1099                         name=None):
1100         if name is None:
1101             msg = None
1102         else:
1103             msg = "Invalid %s: %s out of range <%s,%s>" % (
1104                 name, real_value, expected_min, expected_max)
1105         self.assertTrue(expected_min <= real_value <= expected_max, msg)
1106
1107     def assert_packet_checksums_valid(self, packet,
1108                                       ignore_zero_udp_checksums=True):
1109         received = packet.__class__(scapy.compat.raw(packet))
1110         udp_layers = ['UDP', 'UDPerror']
1111         checksum_fields = ['cksum', 'chksum']
1112         checksums = []
1113         counter = 0
1114         temp = received.__class__(scapy.compat.raw(received))
1115         while True:
1116             layer = temp.getlayer(counter)
1117             if layer:
1118                 layer = layer.copy()
1119                 layer.remove_payload()
1120                 for cf in checksum_fields:
1121                     if hasattr(layer, cf):
1122                         if ignore_zero_udp_checksums and \
1123                                 0 == getattr(layer, cf) and \
1124                                 layer.name in udp_layers:
1125                             continue
1126                         delattr(temp.getlayer(counter), cf)
1127                         checksums.append((counter, cf))
1128             else:
1129                 break
1130             counter = counter + 1
1131         if 0 == len(checksums):
1132             return
1133         temp = temp.__class__(scapy.compat.raw(temp))
1134         for layer, cf in checksums:
1135             calc_sum = getattr(temp[layer], cf)
1136             self.assert_equal(
1137                 getattr(received[layer], cf), calc_sum,
1138                 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
1139             self.logger.debug(
1140                 "Checksum field `%s` on `%s` layer has correct value `%s`" %
1141                 (cf, temp[layer].name, calc_sum))
1142
1143     def assert_checksum_valid(self, received_packet, layer,
1144                               field_name='chksum',
1145                               ignore_zero_checksum=False):
1146         """ Check checksum of received packet on given layer """
1147         received_packet_checksum = getattr(received_packet[layer], field_name)
1148         if ignore_zero_checksum and 0 == received_packet_checksum:
1149             return
1150         recalculated = received_packet.__class__(
1151             scapy.compat.raw(received_packet))
1152         delattr(recalculated[layer], field_name)
1153         recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
1154         self.assert_equal(received_packet_checksum,
1155                           getattr(recalculated[layer], field_name),
1156                           "packet checksum on layer: %s" % layer)
1157
1158     def assert_ip_checksum_valid(self, received_packet,
1159                                  ignore_zero_checksum=False):
1160         self.assert_checksum_valid(received_packet, 'IP',
1161                                    ignore_zero_checksum=ignore_zero_checksum)
1162
1163     def assert_tcp_checksum_valid(self, received_packet,
1164                                   ignore_zero_checksum=False):
1165         self.assert_checksum_valid(received_packet, 'TCP',
1166                                    ignore_zero_checksum=ignore_zero_checksum)
1167
1168     def assert_udp_checksum_valid(self, received_packet,
1169                                   ignore_zero_checksum=True):
1170         self.assert_checksum_valid(received_packet, 'UDP',
1171                                    ignore_zero_checksum=ignore_zero_checksum)
1172
1173     def assert_embedded_icmp_checksum_valid(self, received_packet):
1174         if received_packet.haslayer(IPerror):
1175             self.assert_checksum_valid(received_packet, 'IPerror')
1176         if received_packet.haslayer(TCPerror):
1177             self.assert_checksum_valid(received_packet, 'TCPerror')
1178         if received_packet.haslayer(UDPerror):
1179             self.assert_checksum_valid(received_packet, 'UDPerror',
1180                                        ignore_zero_checksum=True)
1181         if received_packet.haslayer(ICMPerror):
1182             self.assert_checksum_valid(received_packet, 'ICMPerror')
1183
1184     def assert_icmp_checksum_valid(self, received_packet):
1185         self.assert_checksum_valid(received_packet, 'ICMP')
1186         self.assert_embedded_icmp_checksum_valid(received_packet)
1187
1188     def assert_icmpv6_checksum_valid(self, pkt):
1189         if pkt.haslayer(ICMPv6DestUnreach):
1190             self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
1191             self.assert_embedded_icmp_checksum_valid(pkt)
1192         if pkt.haslayer(ICMPv6EchoRequest):
1193             self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
1194         if pkt.haslayer(ICMPv6EchoReply):
1195             self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
1196
1197     def get_counter(self, counter):
1198         if counter.startswith("/"):
1199             counter_value = self.statistics.get_counter(counter)
1200         else:
1201             counters = self.vapi.cli("sh errors").split('\n')
1202             counter_value = 0
1203             for i in range(1, len(counters) - 1):
1204                 results = counters[i].split()
1205                 if results[1] == counter:
1206                     counter_value = int(results[0])
1207                     break
1208         return counter_value
1209
1210     def assert_counter_equal(self, counter, expected_value,
1211                              thread=None, index=0):
1212         c = self.get_counter(counter)
1213         if thread is not None:
1214             c = c[thread][index]
1215         else:
1216             c = sum(x[index] for x in c)
1217         self.assert_equal(c, expected_value, "counter `%s'" % counter)
1218
1219     def assert_packet_counter_equal(self, counter, expected_value):
1220         counter_value = self.get_counter(counter)
1221         self.assert_equal(counter_value, expected_value,
1222                           "packet counter `%s'" % counter)
1223
1224     def assert_error_counter_equal(self, counter, expected_value):
1225         counter_value = self.statistics[counter].sum()
1226         self.assert_equal(counter_value, expected_value,
1227                           "error counter `%s'" % counter)
1228
1229     @classmethod
1230     def sleep(cls, timeout, remark=None):
1231
1232         # /* Allow sleep(0) to maintain win32 semantics, and as decreed
1233         #  * by Guido, only the main thread can be interrupted.
1234         # */
1235         # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892  # noqa
1236         if timeout == 0:
1237             # yield quantum
1238             if hasattr(os, 'sched_yield'):
1239                 os.sched_yield()
1240             else:
1241                 time.sleep(0)
1242             return
1243
1244         cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1245         before = time.time()
1246         time.sleep(timeout)
1247         after = time.time()
1248         if after - before > 2 * timeout:
1249             cls.logger.error("unexpected self.sleep() result - "
1250                              "slept for %es instead of ~%es!",
1251                              after - before, timeout)
1252
1253         cls.logger.debug(
1254             "Finished sleep (%s) - slept %es (wanted %es)",
1255             remark, after - before, timeout)
1256
1257     def virtual_sleep(self, timeout, remark=None):
1258         self.logger.debug("Moving VPP time by %s (%s)", timeout, remark)
1259         self.vapi.cli("set clock adjust %s" % timeout)
1260
1261     def pg_send(self, intf, pkts, worker=None, trace=True):
1262         intf.add_stream(pkts, worker=worker)
1263         self.pg_enable_capture(self.pg_interfaces)
1264         self.pg_start(trace=trace)
1265
1266     def snapshot_stats(self, stats_diff):
1267         """Return snapshot of interesting stats based on diff dictionary."""
1268         stats_snapshot = {}
1269         for sw_if_index in stats_diff:
1270             for counter in stats_diff[sw_if_index]:
1271                 stats_snapshot[counter] = self.statistics[counter]
1272         self.logger.debug(f"Took statistics stats_snapshot: {stats_snapshot}")
1273         return stats_snapshot
1274
1275     def compare_stats_with_snapshot(self, stats_diff, stats_snapshot):
1276         """Assert appropriate difference between current stats and snapshot."""
1277         for sw_if_index in stats_diff:
1278             for cntr, diff in stats_diff[sw_if_index].items():
1279                 if sw_if_index == "err":
1280                     self.assert_equal(
1281                         self.statistics[cntr].sum(),
1282                         stats_snapshot[cntr].sum() + diff,
1283                         f"'{cntr}' counter value (previous value: "
1284                         f"{stats_snapshot[cntr].sum()}, "
1285                         f"expected diff: {diff})")
1286                 else:
1287                     try:
1288                         self.assert_equal(
1289                             self.statistics[cntr][:, sw_if_index].sum(),
1290                             stats_snapshot[cntr][:, sw_if_index].sum() + diff,
1291                             f"'{cntr}' counter value (previous value: "
1292                             f"{stats_snapshot[cntr][:, sw_if_index].sum()}, "
1293                             f"expected diff: {diff})")
1294                     except IndexError:
1295                         # if diff is 0, then this most probably a case where
1296                         # test declares multiple interfaces but traffic hasn't
1297                         # passed through this one yet - which means the counter
1298                         # value is 0 and can be ignored
1299                         if 0 != diff:
1300                             raise
1301
1302     def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None,
1303                                    stats_diff=None, trace=True, msg=None):
1304         if stats_diff:
1305             stats_snapshot = self.snapshot_stats(stats_diff)
1306
1307         self.pg_send(intf, pkts)
1308
1309         try:
1310             if not timeout:
1311                 timeout = 1
1312             for i in self.pg_interfaces:
1313                 i.assert_nothing_captured(timeout=timeout, remark=remark)
1314                 timeout = 0.1
1315         finally:
1316             if trace:
1317                 if msg:
1318                     self.logger.debug(f"send_and_assert_no_replies: {msg}")
1319                 self.logger.debug(self.vapi.cli("show trace"))
1320
1321         if stats_diff:
1322             self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1323
1324     def send_and_expect(self, intf, pkts, output, n_rx=None, worker=None,
1325                         trace=True, msg=None, stats_diff=None):
1326         if stats_diff:
1327             stats_snapshot = self.snapshot_stats(stats_diff)
1328
1329         if not n_rx:
1330             n_rx = 1 if isinstance(pkts, Packet) else len(pkts)
1331         self.pg_send(intf, pkts, worker=worker, trace=trace)
1332         rx = output.get_capture(n_rx)
1333         if trace:
1334             if msg:
1335                 self.logger.debug(f"send_and_expect: {msg}")
1336             self.logger.debug(self.vapi.cli("show trace"))
1337
1338         if stats_diff:
1339             self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1340
1341         return rx
1342
1343     def send_and_expect_load_balancing(self, input, pkts, outputs,
1344                                        worker=None, trace=True):
1345         self.pg_send(input, pkts, worker=worker, trace=trace)
1346         rxs = []
1347         for oo in outputs:
1348             rx = oo._get_capture(1)
1349             self.assertNotEqual(0, len(rx))
1350             rxs.append(rx)
1351         if trace:
1352             self.logger.debug(self.vapi.cli("show trace"))
1353         return rxs
1354
1355     def send_and_expect_only(self, intf, pkts, output, timeout=None,
1356                              stats_diff=None):
1357         if stats_diff:
1358             stats_snapshot = self.snapshot_stats(stats_diff)
1359
1360         self.pg_send(intf, pkts)
1361         rx = output.get_capture(len(pkts))
1362         outputs = [output]
1363         if not timeout:
1364             timeout = 1
1365         for i in self.pg_interfaces:
1366             if i not in outputs:
1367                 i.assert_nothing_captured(timeout=timeout)
1368                 timeout = 0.1
1369
1370         if stats_diff:
1371             self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1372
1373         return rx
1374
1375
1376 def get_testcase_doc_name(test):
1377     return getdoc(test.__class__).splitlines()[0]
1378
1379
1380 def get_test_description(descriptions, test):
1381     short_description = test.shortDescription()
1382     if descriptions and short_description:
1383         return short_description
1384     else:
1385         return str(test)
1386
1387
1388 class TestCaseInfo(object):
1389     def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1390         self.logger = logger
1391         self.tempdir = tempdir
1392         self.vpp_pid = vpp_pid
1393         self.vpp_bin_path = vpp_bin_path
1394         self.core_crash_test = None
1395
1396
1397 class VppTestResult(unittest.TestResult):
1398     """
1399     @property result_string
1400      String variable to store the test case result string.
1401     @property errors
1402      List variable containing 2-tuples of TestCase instances and strings
1403      holding formatted tracebacks. Each tuple represents a test which
1404      raised an unexpected exception.
1405     @property failures
1406      List variable containing 2-tuples of TestCase instances and strings
1407      holding formatted tracebacks. Each tuple represents a test where
1408      a failure was explicitly signalled using the TestCase.assert*()
1409      methods.
1410     """
1411
1412     failed_test_cases_info = set()
1413     core_crash_test_cases_info = set()
1414     current_test_case_info = None
1415
1416     def __init__(self, stream=None, descriptions=None, verbosity=None,
1417                  runner=None):
1418         """
1419         :param stream File descriptor to store where to report test results.
1420             Set to the standard error stream by default.
1421         :param descriptions Boolean variable to store information if to use
1422             test case descriptions.
1423         :param verbosity Integer variable to store required verbosity level.
1424         """
1425         super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1426         self.stream = stream
1427         self.descriptions = descriptions
1428         self.verbosity = verbosity
1429         self.result_string = None
1430         self.runner = runner
1431         self.printed = []
1432
1433     def addSuccess(self, test):
1434         """
1435         Record a test succeeded result
1436
1437         :param test:
1438
1439         """
1440         if self.current_test_case_info:
1441             self.current_test_case_info.logger.debug(
1442                 "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
1443                                                        test._testMethodName,
1444                                                        test._testMethodDoc))
1445         unittest.TestResult.addSuccess(self, test)
1446         self.result_string = colorize("OK", GREEN)
1447
1448         self.send_result_through_pipe(test, PASS)
1449
1450     def addSkip(self, test, reason):
1451         """
1452         Record a test skipped.
1453
1454         :param test:
1455         :param reason:
1456
1457         """
1458         if self.current_test_case_info:
1459             self.current_test_case_info.logger.debug(
1460                 "--- addSkip() %s.%s(%s) called, reason is %s" %
1461                 (test.__class__.__name__, test._testMethodName,
1462                  test._testMethodDoc, reason))
1463         unittest.TestResult.addSkip(self, test, reason)
1464         self.result_string = colorize("SKIP", YELLOW)
1465
1466         if reason == "not enough cpus":
1467             self.send_result_through_pipe(test, SKIP_CPU_SHORTAGE)
1468         else:
1469             self.send_result_through_pipe(test, SKIP)
1470
1471     def symlink_failed(self):
1472         if self.current_test_case_info:
1473             try:
1474                 failed_dir = config.failed_dir
1475                 link_path = os.path.join(
1476                     failed_dir,
1477                     '%s-FAILED' %
1478                     os.path.basename(self.current_test_case_info.tempdir))
1479
1480                 self.current_test_case_info.logger.debug(
1481                     "creating a link to the failed test")
1482                 self.current_test_case_info.logger.debug(
1483                     "os.symlink(%s, %s)" %
1484                     (self.current_test_case_info.tempdir, link_path))
1485                 if os.path.exists(link_path):
1486                     self.current_test_case_info.logger.debug(
1487                         'symlink already exists')
1488                 else:
1489                     os.symlink(self.current_test_case_info.tempdir, link_path)
1490
1491             except Exception as e:
1492                 self.current_test_case_info.logger.error(e)
1493
1494     def send_result_through_pipe(self, test, result):
1495         if hasattr(self, 'test_framework_result_pipe'):
1496             pipe = self.test_framework_result_pipe
1497             if pipe:
1498                 pipe.send((test.id(), result))
1499
1500     def log_error(self, test, err, fn_name):
1501         if self.current_test_case_info:
1502             if isinstance(test, unittest.suite._ErrorHolder):
1503                 test_name = test.description
1504             else:
1505                 test_name = '%s.%s(%s)' % (test.__class__.__name__,
1506                                            test._testMethodName,
1507                                            test._testMethodDoc)
1508             self.current_test_case_info.logger.debug(
1509                 "--- %s() %s called, err is %s" %
1510                 (fn_name, test_name, err))
1511             self.current_test_case_info.logger.debug(
1512                 "formatted exception is:\n%s" %
1513                 "".join(format_exception(*err)))
1514
1515     def add_error(self, test, err, unittest_fn, error_type):
1516         if error_type == FAIL:
1517             self.log_error(test, err, 'addFailure')
1518             error_type_str = colorize("FAIL", RED)
1519         elif error_type == ERROR:
1520             self.log_error(test, err, 'addError')
1521             error_type_str = colorize("ERROR", RED)
1522         else:
1523             raise Exception('Error type %s cannot be used to record an '
1524                             'error or a failure' % error_type)
1525
1526         unittest_fn(self, test, err)
1527         if self.current_test_case_info:
1528             self.result_string = "%s [ temp dir used by test case: %s ]" % \
1529                                  (error_type_str,
1530                                   self.current_test_case_info.tempdir)
1531             self.symlink_failed()
1532             self.failed_test_cases_info.add(self.current_test_case_info)
1533             if is_core_present(self.current_test_case_info.tempdir):
1534                 if not self.current_test_case_info.core_crash_test:
1535                     if isinstance(test, unittest.suite._ErrorHolder):
1536                         test_name = str(test)
1537                     else:
1538                         test_name = "'{!s}' ({!s})".format(
1539                             get_testcase_doc_name(test), test.id())
1540                     self.current_test_case_info.core_crash_test = test_name
1541                 self.core_crash_test_cases_info.add(
1542                     self.current_test_case_info)
1543         else:
1544             self.result_string = '%s [no temp dir]' % error_type_str
1545
1546         self.send_result_through_pipe(test, error_type)
1547
1548     def addFailure(self, test, err):
1549         """
1550         Record a test failed result
1551
1552         :param test:
1553         :param err: error message
1554
1555         """
1556         self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1557
1558     def addError(self, test, err):
1559         """
1560         Record a test error result
1561
1562         :param test:
1563         :param err: error message
1564
1565         """
1566         self.add_error(test, err, unittest.TestResult.addError, ERROR)
1567
1568     def getDescription(self, test):
1569         """
1570         Get test description
1571
1572         :param test:
1573         :returns: test description
1574
1575         """
1576         return get_test_description(self.descriptions, test)
1577
1578     def startTest(self, test):
1579         """
1580         Start a test
1581
1582         :param test:
1583
1584         """
1585
1586         def print_header(test):
1587             if test.__class__ in self.printed:
1588                 return
1589
1590             test_doc = getdoc(test)
1591             if not test_doc:
1592                 raise Exception("No doc string for test '%s'" % test.id())
1593
1594             test_title = test_doc.splitlines()[0].rstrip()
1595             test_title = colorize(test_title, GREEN)
1596             if test.is_tagged_run_solo():
1597                 test_title = colorize(f"SOLO RUN: {test_title}", YELLOW)
1598
1599             # This block may overwrite the colorized title above,
1600             # but we want this to stand out and be fixed
1601             if test.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
1602                 test_title = colorize(
1603                     f"FIXME with VPP workers: {test_title}", RED)
1604
1605             if test.has_tag(TestCaseTag.FIXME_ASAN):
1606                 test_title = colorize(
1607                     f"FIXME with ASAN: {test_title}", RED)
1608                 test.skip_fixme_asan()
1609
1610             if hasattr(test, 'vpp_worker_count'):
1611                 if test.vpp_worker_count == 0:
1612                     test_title += " [main thread only]"
1613                 elif test.vpp_worker_count == 1:
1614                     test_title += " [1 worker thread]"
1615                 else:
1616                     test_title += f" [{test.vpp_worker_count} worker threads]"
1617
1618             if test.__class__.skipped_due_to_cpu_lack:
1619                 test_title = colorize(
1620                     f"{test_title} [skipped - not enough cpus, "
1621                     f"required={test.__class__.get_cpus_required()}, "
1622                     f"available={max_vpp_cpus}]", YELLOW)
1623
1624             print(double_line_delim)
1625             print(test_title)
1626             print(double_line_delim)
1627             self.printed.append(test.__class__)
1628
1629         print_header(test)
1630         self.start_test = time.time()
1631         unittest.TestResult.startTest(self, test)
1632         if self.verbosity > 0:
1633             self.stream.writeln(
1634                 "Starting " + self.getDescription(test) + " ...")
1635             self.stream.writeln(single_line_delim)
1636
1637     def stopTest(self, test):
1638         """
1639         Called when the given test has been run
1640
1641         :param test:
1642
1643         """
1644         unittest.TestResult.stopTest(self, test)
1645
1646         if self.verbosity > 0:
1647             self.stream.writeln(single_line_delim)
1648             self.stream.writeln("%-73s%s" % (self.getDescription(test),
1649                                              self.result_string))
1650             self.stream.writeln(single_line_delim)
1651         else:
1652             self.stream.writeln("%-68s %4.2f %s" %
1653                                 (self.getDescription(test),
1654                                  time.time() - self.start_test,
1655                                  self.result_string))
1656
1657         self.send_result_through_pipe(test, TEST_RUN)
1658
1659     def printErrors(self):
1660         """
1661         Print errors from running the test case
1662         """
1663         if len(self.errors) > 0 or len(self.failures) > 0:
1664             self.stream.writeln()
1665             self.printErrorList('ERROR', self.errors)
1666             self.printErrorList('FAIL', self.failures)
1667
1668         # ^^ that is the last output from unittest before summary
1669         if not self.runner.print_summary:
1670             devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
1671             self.stream = devnull
1672             self.runner.stream = devnull
1673
1674     def printErrorList(self, flavour, errors):
1675         """
1676         Print error list to the output stream together with error type
1677         and test case description.
1678
1679         :param flavour: error type
1680         :param errors: iterable errors
1681
1682         """
1683         for test, err in errors:
1684             self.stream.writeln(double_line_delim)
1685             self.stream.writeln("%s: %s" %
1686                                 (flavour, self.getDescription(test)))
1687             self.stream.writeln(single_line_delim)
1688             self.stream.writeln("%s" % err)
1689
1690
1691 class VppTestRunner(unittest.TextTestRunner):
1692     """
1693     A basic test runner implementation which prints results to standard error.
1694     """
1695
1696     @property
1697     def resultclass(self):
1698         """Class maintaining the results of the tests"""
1699         return VppTestResult
1700
1701     def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1702                  result_pipe=None, failfast=False, buffer=False,
1703                  resultclass=None, print_summary=True, **kwargs):
1704         # ignore stream setting here, use hard-coded stdout to be in sync
1705         # with prints from VppTestCase methods ...
1706         super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1707                                             verbosity, failfast, buffer,
1708                                             resultclass, **kwargs)
1709         KeepAliveReporter.pipe = keep_alive_pipe
1710
1711         self.orig_stream = self.stream
1712         self.resultclass.test_framework_result_pipe = result_pipe
1713
1714         self.print_summary = print_summary
1715
1716     def _makeResult(self):
1717         return self.resultclass(self.stream,
1718                                 self.descriptions,
1719                                 self.verbosity,
1720                                 self)
1721
1722     def run(self, test):
1723         """
1724         Run the tests
1725
1726         :param test:
1727
1728         """
1729         faulthandler.enable()  # emit stack trace to stderr if killed by signal
1730
1731         result = super(VppTestRunner, self).run(test)
1732         if not self.print_summary:
1733             self.stream = self.orig_stream
1734             result.stream = self.orig_stream
1735         return result
1736
1737
1738 class Worker(Thread):
1739     def __init__(self, executable_args, logger, env=None, *args, **kwargs):
1740         super(Worker, self).__init__(*args, **kwargs)
1741         self.logger = logger
1742         self.args = executable_args
1743         if hasattr(self, 'testcase') and self.testcase.debug_all:
1744             if self.testcase.debug_gdbserver:
1745                 self.args = ['/usr/bin/gdbserver', 'localhost:{port}'
1746                              .format(port=self.testcase.gdbserver_port)] + args
1747             elif self.testcase.debug_gdb and hasattr(self, 'wait_for_gdb'):
1748                 self.args.append(self.wait_for_gdb)
1749         self.app_bin = executable_args[0]
1750         self.app_name = os.path.basename(self.app_bin)
1751         if hasattr(self, 'role'):
1752             self.app_name += ' {role}'.format(role=self.role)
1753         self.process = None
1754         self.result = None
1755         env = {} if env is None else env
1756         self.env = copy.deepcopy(env)
1757
1758     def wait_for_enter(self):
1759         if not hasattr(self, 'testcase'):
1760             return
1761         if self.testcase.debug_all and self.testcase.debug_gdbserver:
1762             print()
1763             print(double_line_delim)
1764             print("Spawned GDB Server for '{app}' with PID: {pid}"
1765                   .format(app=self.app_name, pid=self.process.pid))
1766         elif self.testcase.debug_all and self.testcase.debug_gdb:
1767             print()
1768             print(double_line_delim)
1769             print("Spawned '{app}' with PID: {pid}"
1770                   .format(app=self.app_name, pid=self.process.pid))
1771         else:
1772             return
1773         print(single_line_delim)
1774         print("You can debug '{app}' using:".format(app=self.app_name))
1775         if self.testcase.debug_gdbserver:
1776             print("sudo gdb " + self.app_bin +
1777                   " -ex 'target remote localhost:{port}'"
1778                   .format(port=self.testcase.gdbserver_port))
1779             print("Now is the time to attach gdb by running the above "
1780                   "command, set up breakpoints etc., then resume from "
1781                   "within gdb by issuing the 'continue' command")
1782             self.testcase.gdbserver_port += 1
1783         elif self.testcase.debug_gdb:
1784             print("sudo gdb " + self.app_bin +
1785                   " -ex 'attach {pid}'".format(pid=self.process.pid))
1786             print("Now is the time to attach gdb by running the above "
1787                   "command and set up breakpoints etc., then resume from"
1788                   " within gdb by issuing the 'continue' command")
1789         print(single_line_delim)
1790         input("Press ENTER to continue running the testcase...")
1791
1792     def run(self):
1793         executable = self.args[0]
1794         if not os.path.exists(executable) or not os.access(
1795                 executable, os.F_OK | os.X_OK):
1796             # Exit code that means some system file did not exist,
1797             # could not be opened, or had some other kind of error.
1798             self.result = os.EX_OSFILE
1799             raise EnvironmentError(
1800                 "executable '%s' is not found or executable." % executable)
1801         self.logger.debug("Running executable '{app}': '{cmd}'"
1802                           .format(app=self.app_name,
1803                                   cmd=' '.join(self.args)))
1804         env = os.environ.copy()
1805         env.update(self.env)
1806         env["CK_LOG_FILE_NAME"] = "-"
1807         self.process = subprocess.Popen(
1808             ['stdbuf', '-o0', '-e0'] + self.args, shell=False, env=env,
1809             preexec_fn=os.setpgrp, stdout=subprocess.PIPE,
1810             stderr=subprocess.PIPE)
1811         self.wait_for_enter()
1812         out, err = self.process.communicate()
1813         self.logger.debug("Finished running `{app}'".format(app=self.app_name))
1814         self.logger.info("Return code is `%s'" % self.process.returncode)
1815         self.logger.info(single_line_delim)
1816         self.logger.info("Executable `{app}' wrote to stdout:"
1817                          .format(app=self.app_name))
1818         self.logger.info(single_line_delim)
1819         self.logger.info(out.decode('utf-8'))
1820         self.logger.info(single_line_delim)
1821         self.logger.info("Executable `{app}' wrote to stderr:"
1822                          .format(app=self.app_name))
1823         self.logger.info(single_line_delim)
1824         self.logger.info(err.decode('utf-8'))
1825         self.logger.info(single_line_delim)
1826         self.result = self.process.returncode
1827
1828
1829 if __name__ == '__main__':
1830     pass