tests: replace pycodestyle with black
[vpp.git] / test / framework.py
1 #!/usr/bin/env python3
2
3 from __future__ import print_function
4 import logging
5 import sys
6 import os
7 import select
8 import signal
9 import subprocess
10 import unittest
11 import re
12 import time
13 import faulthandler
14 import random
15 import copy
16 import platform
17 import shutil
18 from collections import deque
19 from threading import Thread, Event
20 from inspect import getdoc, isclass
21 from traceback import format_exception
22 from logging import FileHandler, DEBUG, Formatter
23 from enum import Enum
24 from abc import ABC, abstractmethod
25 from struct import pack, unpack
26
27 import scapy.compat
28 from scapy.packet import Raw, Packet
29 from config import config, available_cpus, num_cpus, max_vpp_cpus
30 import hook as hookmodule
31 from vpp_pg_interface import VppPGInterface
32 from vpp_sub_interface import VppSubInterface
33 from vpp_lo_interface import VppLoInterface
34 from vpp_bvi_interface import VppBviInterface
35 from vpp_papi_provider import VppPapiProvider
36 from vpp_papi import VppEnum
37 import vpp_papi
38 from vpp_papi.vpp_stats import VPPStats
39 from vpp_papi.vpp_transport_socket import VppTransportSocketIOError
40 from log import (
41     RED,
42     GREEN,
43     YELLOW,
44     double_line_delim,
45     single_line_delim,
46     get_logger,
47     colorize,
48 )
49 from vpp_object import VppObjectRegistry
50 from util import ppp, is_core_present
51 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
52 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
53 from scapy.layers.inet6 import ICMPv6EchoReply
54
55
56 logger = logging.getLogger(__name__)
57
58 # Set up an empty logger for the testcase that can be overridden as necessary
59 null_logger = logging.getLogger("VppTestCase")
60 null_logger.addHandler(logging.NullHandler())
61
62 PASS = 0
63 FAIL = 1
64 ERROR = 2
65 SKIP = 3
66 TEST_RUN = 4
67 SKIP_CPU_SHORTAGE = 5
68
69
70 if config.debug_framework:
71     import debug_internal
72
73 """
74   Test framework module.
75
76   The module provides a set of tools for constructing and running tests and
77   representing the results.
78 """
79
80
81 class VppDiedError(Exception):
82     """exception for reporting that the subprocess has died."""
83
84     signals_by_value = {
85         v: k
86         for k, v in signal.__dict__.items()
87         if k.startswith("SIG") and not k.startswith("SIG_")
88     }
89
90     def __init__(self, rv=None, testcase=None, method_name=None):
91         self.rv = rv
92         self.signal_name = None
93         self.testcase = testcase
94         self.method_name = method_name
95
96         try:
97             self.signal_name = VppDiedError.signals_by_value[-rv]
98         except (KeyError, TypeError):
99             pass
100
101         if testcase is None and method_name is None:
102             in_msg = ""
103         else:
104             in_msg = " while running %s.%s" % (testcase, method_name)
105
106         if self.rv:
107             msg = "VPP subprocess died unexpectedly%s with return code: %d%s." % (
108                 in_msg,
109                 self.rv,
110                 " [%s]" % (self.signal_name if self.signal_name is not None else ""),
111             )
112         else:
113             msg = "VPP subprocess died unexpectedly%s." % in_msg
114
115         super(VppDiedError, self).__init__(msg)
116
117
118 class _PacketInfo(object):
119     """Private class to create packet info object.
120
121     Help process information about the next packet.
122     Set variables to default values.
123     """
124
125     #: Store the index of the packet.
126     index = -1
127     #: Store the index of the source packet generator interface of the packet.
128     src = -1
129     #: Store the index of the destination packet generator interface
130     #: of the packet.
131     dst = -1
132     #: Store expected ip version
133     ip = -1
134     #: Store expected upper protocol
135     proto = -1
136     #: Store the copy of the former packet.
137     data = None
138
139     def __eq__(self, other):
140         index = self.index == other.index
141         src = self.src == other.src
142         dst = self.dst == other.dst
143         data = self.data == other.data
144         return index and src and dst and data
145
146
147 def pump_output(testclass):
148     """pump output from vpp stdout/stderr to proper queues"""
149     stdout_fragment = ""
150     stderr_fragment = ""
151     while not testclass.pump_thread_stop_flag.is_set():
152         readable = select.select(
153             [
154                 testclass.vpp.stdout.fileno(),
155                 testclass.vpp.stderr.fileno(),
156                 testclass.pump_thread_wakeup_pipe[0],
157             ],
158             [],
159             [],
160         )[0]
161         if testclass.vpp.stdout.fileno() in readable:
162             read = os.read(testclass.vpp.stdout.fileno(), 102400)
163             if len(read) > 0:
164                 split = read.decode("ascii", errors="backslashreplace").splitlines(True)
165                 if len(stdout_fragment) > 0:
166                     split[0] = "%s%s" % (stdout_fragment, split[0])
167                 if len(split) > 0 and split[-1].endswith("\n"):
168                     limit = None
169                 else:
170                     limit = -1
171                     stdout_fragment = split[-1]
172                 testclass.vpp_stdout_deque.extend(split[:limit])
173                 if not config.cache_vpp_output:
174                     for line in split[:limit]:
175                         testclass.logger.info("VPP STDOUT: %s" % line.rstrip("\n"))
176         if testclass.vpp.stderr.fileno() in readable:
177             read = os.read(testclass.vpp.stderr.fileno(), 102400)
178             if len(read) > 0:
179                 split = read.decode("ascii", errors="backslashreplace").splitlines(True)
180                 if len(stderr_fragment) > 0:
181                     split[0] = "%s%s" % (stderr_fragment, split[0])
182                 if len(split) > 0 and split[-1].endswith("\n"):
183                     limit = None
184                 else:
185                     limit = -1
186                     stderr_fragment = split[-1]
187
188                 testclass.vpp_stderr_deque.extend(split[:limit])
189                 if not config.cache_vpp_output:
190                     for line in split[:limit]:
191                         testclass.logger.error("VPP STDERR: %s" % line.rstrip("\n"))
192                         # ignoring the dummy pipe here intentionally - the
193                         # flag will take care of properly terminating the loop
194
195
196 def _is_platform_aarch64():
197     return platform.machine() == "aarch64"
198
199
200 is_platform_aarch64 = _is_platform_aarch64()
201
202
203 class KeepAliveReporter(object):
204     """
205     Singleton object which reports test start to parent process
206     """
207
208     _shared_state = {}
209
210     def __init__(self):
211         self.__dict__ = self._shared_state
212         self._pipe = None
213
214     @property
215     def pipe(self):
216         return self._pipe
217
218     @pipe.setter
219     def pipe(self, pipe):
220         if self._pipe is not None:
221             raise Exception("Internal error - pipe should only be set once.")
222         self._pipe = pipe
223
224     def send_keep_alive(self, test, desc=None):
225         """
226         Write current test tmpdir & desc to keep-alive pipe to signal liveness
227         """
228         if self.pipe is None:
229             # if not running forked..
230             return
231
232         if isclass(test):
233             desc = "%s (%s)" % (desc, unittest.util.strclass(test))
234         else:
235             desc = test.id()
236
237         self.pipe.send((desc, config.vpp, test.tempdir, test.vpp.pid))
238
239
240 class TestCaseTag(Enum):
241     # marks the suites that must run at the end
242     # using only a single test runner
243     RUN_SOLO = 1
244     # marks the suites broken on VPP multi-worker
245     FIXME_VPP_WORKERS = 2
246     # marks the suites broken when ASan is enabled
247     FIXME_ASAN = 3
248
249
250 def create_tag_decorator(e):
251     def decorator(cls):
252         try:
253             cls.test_tags.append(e)
254         except AttributeError:
255             cls.test_tags = [e]
256         return cls
257
258     return decorator
259
260
261 tag_run_solo = create_tag_decorator(TestCaseTag.RUN_SOLO)
262 tag_fixme_vpp_workers = create_tag_decorator(TestCaseTag.FIXME_VPP_WORKERS)
263 tag_fixme_asan = create_tag_decorator(TestCaseTag.FIXME_ASAN)
264
265
266 class DummyVpp:
267     returncode = None
268     pid = 0xCAFEBAFE
269
270     def poll(self):
271         pass
272
273     def terminate(self):
274         pass
275
276
277 class CPUInterface(ABC):
278     cpus = []
279     skipped_due_to_cpu_lack = False
280
281     @classmethod
282     @abstractmethod
283     def get_cpus_required(cls):
284         pass
285
286     @classmethod
287     def assign_cpus(cls, cpus):
288         cls.cpus = cpus
289
290
291 class VppTestCase(CPUInterface, unittest.TestCase):
292     """This subclass is a base class for VPP test cases that are implemented as
293     classes. It provides methods to create and run test case.
294     """
295
296     extra_vpp_statseg_config = ""
297     extra_vpp_punt_config = []
298     extra_vpp_plugin_config = []
299     logger = null_logger
300     vapi_response_timeout = 5
301     remove_configured_vpp_objects_on_tear_down = True
302
303     @property
304     def packet_infos(self):
305         """List of packet infos"""
306         return self._packet_infos
307
308     @classmethod
309     def get_packet_count_for_if_idx(cls, dst_if_index):
310         """Get the number of packet info for specified destination if index"""
311         if dst_if_index in cls._packet_count_for_dst_if_idx:
312             return cls._packet_count_for_dst_if_idx[dst_if_index]
313         else:
314             return 0
315
316     @classmethod
317     def has_tag(cls, tag):
318         """if the test case has a given tag - return true"""
319         try:
320             return tag in cls.test_tags
321         except AttributeError:
322             pass
323         return False
324
325     @classmethod
326     def is_tagged_run_solo(cls):
327         """if the test case class is timing-sensitive - return true"""
328         return cls.has_tag(TestCaseTag.RUN_SOLO)
329
330     @classmethod
331     def skip_fixme_asan(cls):
332         """if @tag_fixme_asan & ASan is enabled - mark for skip"""
333         if cls.has_tag(TestCaseTag.FIXME_ASAN):
334             vpp_extra_cmake_args = os.environ.get("VPP_EXTRA_CMAKE_ARGS", "")
335             if "DVPP_ENABLE_SANITIZE_ADDR=ON" in vpp_extra_cmake_args:
336                 cls = unittest.skip("Skipping @tag_fixme_asan tests")(cls)
337
338     @classmethod
339     def instance(cls):
340         """Return the instance of this testcase"""
341         return cls.test_instance
342
343     @classmethod
344     def set_debug_flags(cls, d):
345         cls.gdbserver_port = 7777
346         cls.debug_core = False
347         cls.debug_gdb = False
348         cls.debug_gdbserver = False
349         cls.debug_all = False
350         cls.debug_attach = False
351         if d is None:
352             return
353         dl = d.lower()
354         if dl == "core":
355             cls.debug_core = True
356         elif dl == "gdb" or dl == "gdb-all":
357             cls.debug_gdb = True
358         elif dl == "gdbserver" or dl == "gdbserver-all":
359             cls.debug_gdbserver = True
360         elif dl == "attach":
361             cls.debug_attach = True
362         else:
363             raise Exception("Unrecognized DEBUG option: '%s'" % d)
364         if dl == "gdb-all" or dl == "gdbserver-all":
365             cls.debug_all = True
366
367     @classmethod
368     def get_vpp_worker_count(cls):
369         if not hasattr(cls, "vpp_worker_count"):
370             if cls.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
371                 cls.vpp_worker_count = 0
372             else:
373                 cls.vpp_worker_count = config.vpp_worker_count
374         return cls.vpp_worker_count
375
376     @classmethod
377     def get_cpus_required(cls):
378         return 1 + cls.get_vpp_worker_count()
379
380     @classmethod
381     def setUpConstants(cls):
382         """Set-up the test case class based on environment variables"""
383         cls.step = config.step
384         cls.plugin_path = ":".join(config.vpp_plugin_dir)
385         cls.test_plugin_path = ":".join(config.vpp_test_plugin_dir)
386         cls.extern_plugin_path = ":".join(config.extern_plugin_dir)
387         debug_cli = ""
388         if cls.step or cls.debug_gdb or cls.debug_gdbserver:
389             debug_cli = "cli-listen localhost:5002"
390         size = re.search(r"\d+[gG]", config.coredump_size)
391         if size:
392             coredump_size = f"coredump-size {config.coredump_size}".lower()
393         else:
394             coredump_size = "coredump-size unlimited"
395         default_variant = config.variant
396         if default_variant is not None:
397             default_variant = "defaults { %s 100 }" % default_variant
398         else:
399             default_variant = ""
400
401         api_fuzzing = config.api_fuzz
402         if api_fuzzing is None:
403             api_fuzzing = "off"
404
405         cls.vpp_cmdline = [
406             config.vpp,
407             "unix",
408             "{",
409             "nodaemon",
410             debug_cli,
411             "full-coredump",
412             coredump_size,
413             "runtime-dir",
414             cls.tempdir,
415             "}",
416             "api-trace",
417             "{",
418             "on",
419             "}",
420             "api-segment",
421             "{",
422             "prefix",
423             cls.get_api_segment_prefix(),
424             "}",
425             "cpu",
426             "{",
427             "main-core",
428             str(cls.cpus[0]),
429         ]
430         if cls.extern_plugin_path not in (None, ""):
431             cls.extra_vpp_plugin_config.append("add-path %s" % cls.extern_plugin_path)
432         if cls.get_vpp_worker_count():
433             cls.vpp_cmdline.extend(
434                 ["corelist-workers", ",".join([str(x) for x in cls.cpus[1:]])]
435             )
436         cls.vpp_cmdline.extend(
437             [
438                 "}",
439                 "physmem",
440                 "{",
441                 "max-size",
442                 "32m",
443                 "}",
444                 "statseg",
445                 "{",
446                 "socket-name",
447                 cls.get_stats_sock_path(),
448                 cls.extra_vpp_statseg_config,
449                 "}",
450                 "socksvr",
451                 "{",
452                 "socket-name",
453                 cls.get_api_sock_path(),
454                 "}",
455                 "node { ",
456                 default_variant,
457                 "}",
458                 "api-fuzz {",
459                 api_fuzzing,
460                 "}",
461                 "plugins",
462                 "{",
463                 "plugin",
464                 "dpdk_plugin.so",
465                 "{",
466                 "disable",
467                 "}",
468                 "plugin",
469                 "rdma_plugin.so",
470                 "{",
471                 "disable",
472                 "}",
473                 "plugin",
474                 "lisp_unittest_plugin.so",
475                 "{",
476                 "enable",
477                 "}",
478                 "plugin",
479                 "unittest_plugin.so",
480                 "{",
481                 "enable",
482                 "}",
483             ]
484             + cls.extra_vpp_plugin_config
485             + [
486                 "}",
487             ]
488         )
489
490         if cls.extra_vpp_punt_config is not None:
491             cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
492
493         if not cls.debug_attach:
494             cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
495             cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
496
497     @classmethod
498     def wait_for_enter(cls):
499         if cls.debug_gdbserver:
500             print(double_line_delim)
501             print("Spawned GDB server with PID: %d" % cls.vpp.pid)
502         elif cls.debug_gdb:
503             print(double_line_delim)
504             print("Spawned VPP with PID: %d" % cls.vpp.pid)
505         else:
506             cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
507             return
508         print(single_line_delim)
509         print("You can debug VPP using:")
510         if cls.debug_gdbserver:
511             print(
512                 f"sudo gdb {config.vpp} "
513                 f"-ex 'target remote localhost:{cls.gdbserver_port}'"
514             )
515             print(
516                 "Now is the time to attach gdb by running the above "
517                 "command, set up breakpoints etc., then resume VPP from "
518                 "within gdb by issuing the 'continue' command"
519             )
520             cls.gdbserver_port += 1
521         elif cls.debug_gdb:
522             print(f"sudo gdb {config.vpp} -ex 'attach {cls.vpp.pid}'")
523             print(
524                 "Now is the time to attach gdb by running the above "
525                 "command and set up breakpoints etc., then resume VPP from"
526                 " within gdb by issuing the 'continue' command"
527             )
528         print(single_line_delim)
529         input("Press ENTER to continue running the testcase...")
530
531     @classmethod
532     def attach_vpp(cls):
533         cls.vpp = DummyVpp()
534
535     @classmethod
536     def run_vpp(cls):
537         cls.logger.debug(f"Assigned cpus: {cls.cpus}")
538         cmdline = cls.vpp_cmdline
539
540         if cls.debug_gdbserver:
541             gdbserver = "/usr/bin/gdbserver"
542             if not os.path.isfile(gdbserver) or not os.access(gdbserver, os.X_OK):
543                 raise Exception(
544                     "gdbserver binary '%s' does not exist or is "
545                     "not executable" % gdbserver
546                 )
547
548             cmdline = [
549                 gdbserver,
550                 "localhost:{port}".format(port=cls.gdbserver_port),
551             ] + cls.vpp_cmdline
552             cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
553
554         try:
555             cls.vpp = subprocess.Popen(
556                 cmdline, stdout=subprocess.PIPE, stderr=subprocess.PIPE
557             )
558         except subprocess.CalledProcessError as e:
559             cls.logger.critical(
560                 "Subprocess returned with non-0 return code: (%s)", e.returncode
561             )
562             raise
563         except OSError as e:
564             cls.logger.critical(
565                 "Subprocess returned with OS error: (%s) %s", e.errno, e.strerror
566             )
567             raise
568         except Exception as e:
569             cls.logger.exception("Subprocess returned unexpected from %s:", cmdline)
570             raise
571
572         cls.wait_for_enter()
573
574     @classmethod
575     def wait_for_coredump(cls):
576         corefile = cls.tempdir + "/core"
577         if os.path.isfile(corefile):
578             cls.logger.error("Waiting for coredump to complete: %s", corefile)
579             curr_size = os.path.getsize(corefile)
580             deadline = time.time() + 60
581             ok = False
582             while time.time() < deadline:
583                 cls.sleep(1)
584                 size = curr_size
585                 curr_size = os.path.getsize(corefile)
586                 if size == curr_size:
587                     ok = True
588                     break
589             if not ok:
590                 cls.logger.error(
591                     "Timed out waiting for coredump to complete: %s", corefile
592                 )
593             else:
594                 cls.logger.error("Coredump complete: %s, size %d", corefile, curr_size)
595
596     @classmethod
597     def get_stats_sock_path(cls):
598         return "%s/stats.sock" % cls.tempdir
599
600     @classmethod
601     def get_api_sock_path(cls):
602         return "%s/api.sock" % cls.tempdir
603
604     @classmethod
605     def get_api_segment_prefix(cls):
606         return os.path.basename(cls.tempdir)  # Only used for VAPI
607
608     @classmethod
609     def get_tempdir(cls):
610         if cls.debug_attach:
611             tmpdir = f"{config.tmp_dir}/unittest-attach-gdb"
612         else:
613             tmpdir = f"{config.tmp_dir}/vpp-unittest-{cls.__name__}"
614             if config.wipe_tmp_dir:
615                 shutil.rmtree(tmpdir, ignore_errors=True)
616             os.mkdir(tmpdir)
617         return tmpdir
618
619     @classmethod
620     def create_file_handler(cls):
621         if config.log_dir is None:
622             cls.file_handler = FileHandler(f"{cls.tempdir}/log.txt")
623             return
624
625         logdir = f"{config.log_dir}/vpp-unittest-{cls.__name__}"
626         if config.wipe_tmp_dir:
627             shutil.rmtree(logdir, ignore_errors=True)
628         os.mkdir(logdir)
629         cls.file_handler = FileHandler(f"{logdir}/log.txt")
630
631     @classmethod
632     def setUpClass(cls):
633         """
634         Perform class setup before running the testcase
635         Remove shared memory files, start vpp and connect the vpp-api
636         """
637         super(VppTestCase, cls).setUpClass()
638         cls.logger = get_logger(cls.__name__)
639         random.seed(config.rnd_seed)
640         if hasattr(cls, "parallel_handler"):
641             cls.logger.addHandler(cls.parallel_handler)
642             cls.logger.propagate = False
643         cls.set_debug_flags(config.debug)
644         cls.tempdir = cls.get_tempdir()
645         cls.create_file_handler()
646         cls.file_handler.setFormatter(
647             Formatter(fmt="%(asctime)s,%(msecs)03d %(message)s", datefmt="%H:%M:%S")
648         )
649         cls.file_handler.setLevel(DEBUG)
650         cls.logger.addHandler(cls.file_handler)
651         cls.logger.debug("--- setUpClass() for %s called ---" % cls.__name__)
652         os.chdir(cls.tempdir)
653         cls.logger.info(
654             "Temporary dir is %s, api socket is %s",
655             cls.tempdir,
656             cls.get_api_sock_path(),
657         )
658         cls.logger.debug("Random seed is %s", config.rnd_seed)
659         cls.setUpConstants()
660         cls.reset_packet_infos()
661         cls._pcaps = []
662         cls._old_pcaps = []
663         cls.verbose = 0
664         cls.vpp_dead = False
665         cls.registry = VppObjectRegistry()
666         cls.vpp_startup_failed = False
667         cls.reporter = KeepAliveReporter()
668         # need to catch exceptions here because if we raise, then the cleanup
669         # doesn't get called and we might end with a zombie vpp
670         try:
671             if cls.debug_attach:
672                 cls.attach_vpp()
673             else:
674                 cls.run_vpp()
675             cls.reporter.send_keep_alive(cls, "setUpClass")
676             VppTestResult.current_test_case_info = TestCaseInfo(
677                 cls.logger, cls.tempdir, cls.vpp.pid, config.vpp
678             )
679             cls.vpp_stdout_deque = deque()
680             cls.vpp_stderr_deque = deque()
681             if not cls.debug_attach:
682                 cls.pump_thread_stop_flag = Event()
683                 cls.pump_thread_wakeup_pipe = os.pipe()
684                 cls.pump_thread = Thread(target=pump_output, args=(cls,))
685                 cls.pump_thread.daemon = True
686                 cls.pump_thread.start()
687             if cls.debug_gdb or cls.debug_gdbserver or cls.debug_attach:
688                 cls.vapi_response_timeout = 0
689             cls.vapi = VppPapiProvider(cls.__name__, cls, cls.vapi_response_timeout)
690             if cls.step:
691                 hook = hookmodule.StepHook(cls)
692             else:
693                 hook = hookmodule.PollHook(cls)
694             cls.vapi.register_hook(hook)
695             cls.statistics = VPPStats(socketname=cls.get_stats_sock_path())
696             try:
697                 hook.poll_vpp()
698             except VppDiedError:
699                 cls.vpp_startup_failed = True
700                 cls.logger.critical(
701                     "VPP died shortly after startup, check the"
702                     " output to standard error for possible cause"
703                 )
704                 raise
705             try:
706                 cls.vapi.connect()
707             except (vpp_papi.VPPIOError, Exception) as e:
708                 cls.logger.debug("Exception connecting to vapi: %s" % e)
709                 cls.vapi.disconnect()
710
711                 if cls.debug_gdbserver:
712                     print(
713                         colorize(
714                             "You're running VPP inside gdbserver but "
715                             "VPP-API connection failed, did you forget "
716                             "to 'continue' VPP from within gdb?",
717                             RED,
718                         )
719                     )
720                 raise e
721             if cls.debug_attach:
722                 last_line = cls.vapi.cli("show thread").split("\n")[-2]
723                 cls.vpp_worker_count = int(last_line.split(" ")[0])
724                 print("Detected VPP with %s workers." % cls.vpp_worker_count)
725         except vpp_papi.VPPRuntimeError as e:
726             cls.logger.debug("%s" % e)
727             cls.quit()
728             raise e
729         except Exception as e:
730             cls.logger.debug("Exception connecting to VPP: %s" % e)
731             cls.quit()
732             raise e
733
734     @classmethod
735     def _debug_quit(cls):
736         if cls.debug_gdbserver or cls.debug_gdb:
737             try:
738                 cls.vpp.poll()
739
740                 if cls.vpp.returncode is None:
741                     print()
742                     print(double_line_delim)
743                     print("VPP or GDB server is still running")
744                     print(single_line_delim)
745                     input(
746                         "When done debugging, press ENTER to kill the "
747                         "process and finish running the testcase..."
748                     )
749             except AttributeError:
750                 pass
751
752     @classmethod
753     def quit(cls):
754         """
755         Disconnect vpp-api, kill vpp and cleanup shared memory files
756         """
757         cls._debug_quit()
758
759         # first signal that we want to stop the pump thread, then wake it up
760         if hasattr(cls, "pump_thread_stop_flag"):
761             cls.pump_thread_stop_flag.set()
762         if hasattr(cls, "pump_thread_wakeup_pipe"):
763             os.write(cls.pump_thread_wakeup_pipe[1], b"ding dong wake up")
764         if hasattr(cls, "pump_thread"):
765             cls.logger.debug("Waiting for pump thread to stop")
766             cls.pump_thread.join()
767         if hasattr(cls, "vpp_stderr_reader_thread"):
768             cls.logger.debug("Waiting for stderr pump to stop")
769             cls.vpp_stderr_reader_thread.join()
770
771         if hasattr(cls, "vpp"):
772             if hasattr(cls, "vapi"):
773                 cls.logger.debug(cls.vapi.vpp.get_stats())
774                 cls.logger.debug("Disconnecting class vapi client on %s", cls.__name__)
775                 cls.vapi.disconnect()
776                 cls.logger.debug("Deleting class vapi attribute on %s", cls.__name__)
777                 del cls.vapi
778             cls.vpp.poll()
779             if not cls.debug_attach and cls.vpp.returncode is None:
780                 cls.wait_for_coredump()
781                 cls.logger.debug("Sending TERM to vpp")
782                 cls.vpp.terminate()
783                 cls.logger.debug("Waiting for vpp to die")
784                 try:
785                     outs, errs = cls.vpp.communicate(timeout=5)
786                 except subprocess.TimeoutExpired:
787                     cls.vpp.kill()
788                     outs, errs = cls.vpp.communicate()
789             cls.logger.debug("Deleting class vpp attribute on %s", cls.__name__)
790             if not cls.debug_attach:
791                 cls.vpp.stdout.close()
792                 cls.vpp.stderr.close()
793             del cls.vpp
794
795         if cls.vpp_startup_failed:
796             stdout_log = cls.logger.info
797             stderr_log = cls.logger.critical
798         else:
799             stdout_log = cls.logger.info
800             stderr_log = cls.logger.info
801
802         if hasattr(cls, "vpp_stdout_deque"):
803             stdout_log(single_line_delim)
804             stdout_log("VPP output to stdout while running %s:", cls.__name__)
805             stdout_log(single_line_delim)
806             vpp_output = "".join(cls.vpp_stdout_deque)
807             with open(cls.tempdir + "/vpp_stdout.txt", "w") as f:
808                 f.write(vpp_output)
809             stdout_log("\n%s", vpp_output)
810             stdout_log(single_line_delim)
811
812         if hasattr(cls, "vpp_stderr_deque"):
813             stderr_log(single_line_delim)
814             stderr_log("VPP output to stderr while running %s:", cls.__name__)
815             stderr_log(single_line_delim)
816             vpp_output = "".join(cls.vpp_stderr_deque)
817             with open(cls.tempdir + "/vpp_stderr.txt", "w") as f:
818                 f.write(vpp_output)
819             stderr_log("\n%s", vpp_output)
820             stderr_log(single_line_delim)
821
822     @classmethod
823     def tearDownClass(cls):
824         """Perform final cleanup after running all tests in this test-case"""
825         cls.logger.debug("--- tearDownClass() for %s called ---" % cls.__name__)
826         cls.reporter.send_keep_alive(cls, "tearDownClass")
827         cls.quit()
828         cls.file_handler.close()
829         cls.reset_packet_infos()
830         if config.debug_framework:
831             debug_internal.on_tear_down_class(cls)
832
833     def show_commands_at_teardown(self):
834         """Allow subclass specific teardown logging additions."""
835         self.logger.info("--- No test specific show commands provided. ---")
836
837     def tearDown(self):
838         """Show various debug prints after each test"""
839         self.logger.debug(
840             "--- tearDown() for %s.%s(%s) called ---"
841             % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
842         )
843
844         try:
845             if not self.vpp_dead:
846                 self.logger.debug(self.vapi.cli("show trace max 1000"))
847                 self.logger.info(self.vapi.ppcli("show interface"))
848                 self.logger.info(self.vapi.ppcli("show hardware"))
849                 self.logger.info(self.statistics.set_errors_str())
850                 self.logger.info(self.vapi.ppcli("show run"))
851                 self.logger.info(self.vapi.ppcli("show log"))
852                 self.logger.info(self.vapi.ppcli("show bihash"))
853                 self.logger.info("Logging testcase specific show commands.")
854                 self.show_commands_at_teardown()
855                 if self.remove_configured_vpp_objects_on_tear_down:
856                     self.registry.remove_vpp_config(self.logger)
857             # Save/Dump VPP api trace log
858             m = self._testMethodName
859             api_trace = "vpp_api_trace.%s.%d.log" % (m, self.vpp.pid)
860             tmp_api_trace = "/tmp/%s" % api_trace
861             vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
862             self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
863             self.logger.info("Moving %s to %s\n" % (tmp_api_trace, vpp_api_trace_log))
864             os.rename(tmp_api_trace, vpp_api_trace_log)
865         except VppTransportSocketIOError:
866             self.logger.debug(
867                 "VppTransportSocketIOError: Vpp dead. Cannot log show commands."
868             )
869             self.vpp_dead = True
870         else:
871             self.registry.unregister_all(self.logger)
872
873     def setUp(self):
874         """Clear trace before running each test"""
875         super(VppTestCase, self).setUp()
876         self.reporter.send_keep_alive(self)
877         if self.vpp_dead:
878             raise VppDiedError(
879                 rv=None,
880                 testcase=self.__class__.__name__,
881                 method_name=self._testMethodName,
882             )
883         self.sleep(0.1, "during setUp")
884         self.vpp_stdout_deque.append(
885             "--- test setUp() for %s.%s(%s) starts here ---\n"
886             % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
887         )
888         self.vpp_stderr_deque.append(
889             "--- test setUp() for %s.%s(%s) starts here ---\n"
890             % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
891         )
892         self.vapi.cli("clear trace")
893         # store the test instance inside the test class - so that objects
894         # holding the class can access instance methods (like assertEqual)
895         type(self).test_instance = self
896
897     @classmethod
898     def pg_enable_capture(cls, interfaces=None):
899         """
900         Enable capture on packet-generator interfaces
901
902         :param interfaces: iterable interface indexes (if None,
903                            use self.pg_interfaces)
904
905         """
906         if interfaces is None:
907             interfaces = cls.pg_interfaces
908         for i in interfaces:
909             i.enable_capture()
910
911     @classmethod
912     def register_pcap(cls, intf, worker):
913         """Register a pcap in the testclass"""
914         # add to the list of captures with current timestamp
915         cls._pcaps.append((intf, worker))
916
917     @classmethod
918     def get_vpp_time(cls):
919         # processes e.g. "Time now 2.190522, Wed, 11 Mar 2020 17:29:54 GMT"
920         # returns float("2.190522")
921         timestr = cls.vapi.cli("show clock")
922         head, sep, tail = timestr.partition(",")
923         head, sep, tail = head.partition("Time now")
924         return float(tail)
925
926     @classmethod
927     def sleep_on_vpp_time(cls, sec):
928         """Sleep according to time in VPP world"""
929         # On a busy system with many processes
930         # we might end up with VPP time being slower than real world
931         # So take that into account when waiting for VPP to do something
932         start_time = cls.get_vpp_time()
933         while cls.get_vpp_time() - start_time < sec:
934             cls.sleep(0.1)
935
936     @classmethod
937     def pg_start(cls, trace=True):
938         """Enable the PG, wait till it is done, then clean up"""
939         for (intf, worker) in cls._old_pcaps:
940             intf.handle_old_pcap_file(intf.get_in_path(worker), intf.in_history_counter)
941         cls._old_pcaps = []
942         if trace:
943             cls.vapi.cli("clear trace")
944             cls.vapi.cli("trace add pg-input 1000")
945         cls.vapi.cli("packet-generator enable")
946         # PG, when starts, runs to completion -
947         # so let's avoid a race condition,
948         # and wait a little till it's done.
949         # Then clean it up  - and then be gone.
950         deadline = time.time() + 300
951         while cls.vapi.cli("show packet-generator").find("Yes") != -1:
952             cls.sleep(0.01)  # yield
953             if time.time() > deadline:
954                 cls.logger.error("Timeout waiting for pg to stop")
955                 break
956         for intf, worker in cls._pcaps:
957             cls.vapi.cli("packet-generator delete %s" % intf.get_cap_name(worker))
958         cls._old_pcaps = cls._pcaps
959         cls._pcaps = []
960
961     @classmethod
962     def create_pg_interfaces_internal(cls, interfaces, gso=0, gso_size=0, mode=None):
963         """
964         Create packet-generator interfaces.
965
966         :param interfaces: iterable indexes of the interfaces.
967         :returns: List of created interfaces.
968
969         """
970         result = []
971         for i in interfaces:
972             intf = VppPGInterface(cls, i, gso, gso_size, mode)
973             setattr(cls, intf.name, intf)
974             result.append(intf)
975         cls.pg_interfaces = result
976         return result
977
978     @classmethod
979     def create_pg_ip4_interfaces(cls, interfaces, gso=0, gso_size=0):
980         pgmode = VppEnum.vl_api_pg_interface_mode_t
981         return cls.create_pg_interfaces_internal(
982             interfaces, gso, gso_size, pgmode.PG_API_MODE_IP4
983         )
984
985     @classmethod
986     def create_pg_ip6_interfaces(cls, interfaces, gso=0, gso_size=0):
987         pgmode = VppEnum.vl_api_pg_interface_mode_t
988         return cls.create_pg_interfaces_internal(
989             interfaces, gso, gso_size, pgmode.PG_API_MODE_IP6
990         )
991
992     @classmethod
993     def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0):
994         pgmode = VppEnum.vl_api_pg_interface_mode_t
995         return cls.create_pg_interfaces_internal(
996             interfaces, gso, gso_size, pgmode.PG_API_MODE_ETHERNET
997         )
998
999     @classmethod
1000     def create_pg_ethernet_interfaces(cls, interfaces, gso=0, gso_size=0):
1001         pgmode = VppEnum.vl_api_pg_interface_mode_t
1002         return cls.create_pg_interfaces_internal(
1003             interfaces, gso, gso_size, pgmode.PG_API_MODE_ETHERNET
1004         )
1005
1006     @classmethod
1007     def create_loopback_interfaces(cls, count):
1008         """
1009         Create loopback interfaces.
1010
1011         :param count: number of interfaces created.
1012         :returns: List of created interfaces.
1013         """
1014         result = [VppLoInterface(cls) for i in range(count)]
1015         for intf in result:
1016             setattr(cls, intf.name, intf)
1017         cls.lo_interfaces = result
1018         return result
1019
1020     @classmethod
1021     def create_bvi_interfaces(cls, count):
1022         """
1023         Create BVI interfaces.
1024
1025         :param count: number of interfaces created.
1026         :returns: List of created interfaces.
1027         """
1028         result = [VppBviInterface(cls) for i in range(count)]
1029         for intf in result:
1030             setattr(cls, intf.name, intf)
1031         cls.bvi_interfaces = result
1032         return result
1033
1034     @staticmethod
1035     def extend_packet(packet, size, padding=" "):
1036         """
1037         Extend packet to given size by padding with spaces or custom padding
1038         NOTE: Currently works only when Raw layer is present.
1039
1040         :param packet: packet
1041         :param size: target size
1042         :param padding: padding used to extend the payload
1043
1044         """
1045         packet_len = len(packet) + 4
1046         extend = size - packet_len
1047         if extend > 0:
1048             num = (extend // len(padding)) + 1
1049             packet[Raw].load += (padding * num)[:extend].encode("ascii")
1050
1051     @classmethod
1052     def reset_packet_infos(cls):
1053         """Reset the list of packet info objects and packet counts to zero"""
1054         cls._packet_infos = {}
1055         cls._packet_count_for_dst_if_idx = {}
1056
1057     @classmethod
1058     def create_packet_info(cls, src_if, dst_if):
1059         """
1060         Create packet info object containing the source and destination indexes
1061         and add it to the testcase's packet info list
1062
1063         :param VppInterface src_if: source interface
1064         :param VppInterface dst_if: destination interface
1065
1066         :returns: _PacketInfo object
1067
1068         """
1069         info = _PacketInfo()
1070         info.index = len(cls._packet_infos)
1071         info.src = src_if.sw_if_index
1072         info.dst = dst_if.sw_if_index
1073         if isinstance(dst_if, VppSubInterface):
1074             dst_idx = dst_if.parent.sw_if_index
1075         else:
1076             dst_idx = dst_if.sw_if_index
1077         if dst_idx in cls._packet_count_for_dst_if_idx:
1078             cls._packet_count_for_dst_if_idx[dst_idx] += 1
1079         else:
1080             cls._packet_count_for_dst_if_idx[dst_idx] = 1
1081         cls._packet_infos[info.index] = info
1082         return info
1083
1084     @staticmethod
1085     def info_to_payload(info):
1086         """
1087         Convert _PacketInfo object to packet payload
1088
1089         :param info: _PacketInfo object
1090
1091         :returns: string containing serialized data from packet info
1092         """
1093
1094         # retrieve payload, currently 18 bytes (4 x ints + 1 short)
1095         return pack("iiiih", info.index, info.src, info.dst, info.ip, info.proto)
1096
1097     @staticmethod
1098     def payload_to_info(payload, payload_field="load"):
1099         """
1100         Convert packet payload to _PacketInfo object
1101
1102         :param payload: packet payload
1103         :type payload:  <class 'scapy.packet.Raw'>
1104         :param payload_field: packet fieldname of payload "load" for
1105                 <class 'scapy.packet.Raw'>
1106         :type payload_field: str
1107         :returns: _PacketInfo object containing de-serialized data from payload
1108
1109         """
1110
1111         # retrieve payload, currently 18 bytes (4 x ints + 1 short)
1112         payload_b = getattr(payload, payload_field)[:18]
1113
1114         info = _PacketInfo()
1115         info.index, info.src, info.dst, info.ip, info.proto = unpack("iiiih", payload_b)
1116
1117         # some SRv6 TCs depend on get an exception if bad values are detected
1118         if info.index > 0x4000:
1119             raise ValueError("Index value is invalid")
1120
1121         return info
1122
1123     def get_next_packet_info(self, info):
1124         """
1125         Iterate over the packet info list stored in the testcase
1126         Start iteration with first element if info is None
1127         Continue based on index in info if info is specified
1128
1129         :param info: info or None
1130         :returns: next info in list or None if no more infos
1131         """
1132         if info is None:
1133             next_index = 0
1134         else:
1135             next_index = info.index + 1
1136         if next_index == len(self._packet_infos):
1137             return None
1138         else:
1139             return self._packet_infos[next_index]
1140
1141     def get_next_packet_info_for_interface(self, src_index, info):
1142         """
1143         Search the packet info list for the next packet info with same source
1144         interface index
1145
1146         :param src_index: source interface index to search for
1147         :param info: packet info - where to start the search
1148         :returns: packet info or None
1149
1150         """
1151         while True:
1152             info = self.get_next_packet_info(info)
1153             if info is None:
1154                 return None
1155             if info.src == src_index:
1156                 return info
1157
1158     def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
1159         """
1160         Search the packet info list for the next packet info with same source
1161         and destination interface indexes
1162
1163         :param src_index: source interface index to search for
1164         :param dst_index: destination interface index to search for
1165         :param info: packet info - where to start the search
1166         :returns: packet info or None
1167
1168         """
1169         while True:
1170             info = self.get_next_packet_info_for_interface(src_index, info)
1171             if info is None:
1172                 return None
1173             if info.dst == dst_index:
1174                 return info
1175
1176     def assert_equal(self, real_value, expected_value, name_or_class=None):
1177         if name_or_class is None:
1178             self.assertEqual(real_value, expected_value)
1179             return
1180         try:
1181             msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
1182             msg = msg % (
1183                 getdoc(name_or_class).strip(),
1184                 real_value,
1185                 str(name_or_class(real_value)),
1186                 expected_value,
1187                 str(name_or_class(expected_value)),
1188             )
1189         except Exception:
1190             msg = "Invalid %s: %s does not match expected value %s" % (
1191                 name_or_class,
1192                 real_value,
1193                 expected_value,
1194             )
1195
1196         self.assertEqual(real_value, expected_value, msg)
1197
1198     def assert_in_range(self, real_value, expected_min, expected_max, name=None):
1199         if name is None:
1200             msg = None
1201         else:
1202             msg = "Invalid %s: %s out of range <%s,%s>" % (
1203                 name,
1204                 real_value,
1205                 expected_min,
1206                 expected_max,
1207             )
1208         self.assertTrue(expected_min <= real_value <= expected_max, msg)
1209
1210     def assert_packet_checksums_valid(self, packet, ignore_zero_udp_checksums=True):
1211         received = packet.__class__(scapy.compat.raw(packet))
1212         udp_layers = ["UDP", "UDPerror"]
1213         checksum_fields = ["cksum", "chksum"]
1214         checksums = []
1215         counter = 0
1216         temp = received.__class__(scapy.compat.raw(received))
1217         while True:
1218             layer = temp.getlayer(counter)
1219             if layer:
1220                 layer = layer.copy()
1221                 layer.remove_payload()
1222                 for cf in checksum_fields:
1223                     if hasattr(layer, cf):
1224                         if (
1225                             ignore_zero_udp_checksums
1226                             and 0 == getattr(layer, cf)
1227                             and layer.name in udp_layers
1228                         ):
1229                             continue
1230                         delattr(temp.getlayer(counter), cf)
1231                         checksums.append((counter, cf))
1232             else:
1233                 break
1234             counter = counter + 1
1235         if 0 == len(checksums):
1236             return
1237         temp = temp.__class__(scapy.compat.raw(temp))
1238         for layer, cf in checksums:
1239             calc_sum = getattr(temp[layer], cf)
1240             self.assert_equal(
1241                 getattr(received[layer], cf),
1242                 calc_sum,
1243                 "packet checksum on layer #%d: %s" % (layer, temp[layer].name),
1244             )
1245             self.logger.debug(
1246                 "Checksum field `%s` on `%s` layer has correct value `%s`"
1247                 % (cf, temp[layer].name, calc_sum)
1248             )
1249
1250     def assert_checksum_valid(
1251         self, received_packet, layer, field_name="chksum", ignore_zero_checksum=False
1252     ):
1253         """Check checksum of received packet on given layer"""
1254         received_packet_checksum = getattr(received_packet[layer], field_name)
1255         if ignore_zero_checksum and 0 == received_packet_checksum:
1256             return
1257         recalculated = received_packet.__class__(scapy.compat.raw(received_packet))
1258         delattr(recalculated[layer], field_name)
1259         recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
1260         self.assert_equal(
1261             received_packet_checksum,
1262             getattr(recalculated[layer], field_name),
1263             "packet checksum on layer: %s" % layer,
1264         )
1265
1266     def assert_ip_checksum_valid(self, received_packet, ignore_zero_checksum=False):
1267         self.assert_checksum_valid(
1268             received_packet, "IP", ignore_zero_checksum=ignore_zero_checksum
1269         )
1270
1271     def assert_tcp_checksum_valid(self, received_packet, ignore_zero_checksum=False):
1272         self.assert_checksum_valid(
1273             received_packet, "TCP", ignore_zero_checksum=ignore_zero_checksum
1274         )
1275
1276     def assert_udp_checksum_valid(self, received_packet, ignore_zero_checksum=True):
1277         self.assert_checksum_valid(
1278             received_packet, "UDP", ignore_zero_checksum=ignore_zero_checksum
1279         )
1280
1281     def assert_embedded_icmp_checksum_valid(self, received_packet):
1282         if received_packet.haslayer(IPerror):
1283             self.assert_checksum_valid(received_packet, "IPerror")
1284         if received_packet.haslayer(TCPerror):
1285             self.assert_checksum_valid(received_packet, "TCPerror")
1286         if received_packet.haslayer(UDPerror):
1287             self.assert_checksum_valid(
1288                 received_packet, "UDPerror", ignore_zero_checksum=True
1289             )
1290         if received_packet.haslayer(ICMPerror):
1291             self.assert_checksum_valid(received_packet, "ICMPerror")
1292
1293     def assert_icmp_checksum_valid(self, received_packet):
1294         self.assert_checksum_valid(received_packet, "ICMP")
1295         self.assert_embedded_icmp_checksum_valid(received_packet)
1296
1297     def assert_icmpv6_checksum_valid(self, pkt):
1298         if pkt.haslayer(ICMPv6DestUnreach):
1299             self.assert_checksum_valid(pkt, "ICMPv6DestUnreach", "cksum")
1300             self.assert_embedded_icmp_checksum_valid(pkt)
1301         if pkt.haslayer(ICMPv6EchoRequest):
1302             self.assert_checksum_valid(pkt, "ICMPv6EchoRequest", "cksum")
1303         if pkt.haslayer(ICMPv6EchoReply):
1304             self.assert_checksum_valid(pkt, "ICMPv6EchoReply", "cksum")
1305
1306     def get_counter(self, counter):
1307         if counter.startswith("/"):
1308             counter_value = self.statistics.get_counter(counter)
1309         else:
1310             counters = self.vapi.cli("sh errors").split("\n")
1311             counter_value = 0
1312             for i in range(1, len(counters) - 1):
1313                 results = counters[i].split()
1314                 if results[1] == counter:
1315                     counter_value = int(results[0])
1316                     break
1317         return counter_value
1318
1319     def assert_counter_equal(self, counter, expected_value, thread=None, index=0):
1320         c = self.get_counter(counter)
1321         if thread is not None:
1322             c = c[thread][index]
1323         else:
1324             c = sum(x[index] for x in c)
1325         self.assert_equal(c, expected_value, "counter `%s'" % counter)
1326
1327     def assert_packet_counter_equal(self, counter, expected_value):
1328         counter_value = self.get_counter(counter)
1329         self.assert_equal(
1330             counter_value, expected_value, "packet counter `%s'" % counter
1331         )
1332
1333     def assert_error_counter_equal(self, counter, expected_value):
1334         counter_value = self.statistics[counter].sum()
1335         self.assert_equal(counter_value, expected_value, "error counter `%s'" % counter)
1336
1337     @classmethod
1338     def sleep(cls, timeout, remark=None):
1339
1340         # /* Allow sleep(0) to maintain win32 semantics, and as decreed
1341         #  * by Guido, only the main thread can be interrupted.
1342         # */
1343         # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892  # noqa
1344         if timeout == 0:
1345             # yield quantum
1346             if hasattr(os, "sched_yield"):
1347                 os.sched_yield()
1348             else:
1349                 time.sleep(0)
1350             return
1351
1352         cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1353         before = time.time()
1354         time.sleep(timeout)
1355         after = time.time()
1356         if after - before > 2 * timeout:
1357             cls.logger.error(
1358                 "unexpected self.sleep() result - slept for %es instead of ~%es!",
1359                 after - before,
1360                 timeout,
1361             )
1362
1363         cls.logger.debug(
1364             "Finished sleep (%s) - slept %es (wanted %es)",
1365             remark,
1366             after - before,
1367             timeout,
1368         )
1369
1370     def virtual_sleep(self, timeout, remark=None):
1371         self.logger.debug("Moving VPP time by %s (%s)", timeout, remark)
1372         self.vapi.cli("set clock adjust %s" % timeout)
1373
1374     def pg_send(self, intf, pkts, worker=None, trace=True):
1375         intf.add_stream(pkts, worker=worker)
1376         self.pg_enable_capture(self.pg_interfaces)
1377         self.pg_start(trace=trace)
1378
1379     def snapshot_stats(self, stats_diff):
1380         """Return snapshot of interesting stats based on diff dictionary."""
1381         stats_snapshot = {}
1382         for sw_if_index in stats_diff:
1383             for counter in stats_diff[sw_if_index]:
1384                 stats_snapshot[counter] = self.statistics[counter]
1385         self.logger.debug(f"Took statistics stats_snapshot: {stats_snapshot}")
1386         return stats_snapshot
1387
1388     def compare_stats_with_snapshot(self, stats_diff, stats_snapshot):
1389         """Assert appropriate difference between current stats and snapshot."""
1390         for sw_if_index in stats_diff:
1391             for cntr, diff in stats_diff[sw_if_index].items():
1392                 if sw_if_index == "err":
1393                     self.assert_equal(
1394                         self.statistics[cntr].sum(),
1395                         stats_snapshot[cntr].sum() + diff,
1396                         f"'{cntr}' counter value (previous value: "
1397                         f"{stats_snapshot[cntr].sum()}, "
1398                         f"expected diff: {diff})",
1399                     )
1400                 else:
1401                     try:
1402                         self.assert_equal(
1403                             self.statistics[cntr][:, sw_if_index].sum(),
1404                             stats_snapshot[cntr][:, sw_if_index].sum() + diff,
1405                             f"'{cntr}' counter value (previous value: "
1406                             f"{stats_snapshot[cntr][:, sw_if_index].sum()}, "
1407                             f"expected diff: {diff})",
1408                         )
1409                     except IndexError:
1410                         # if diff is 0, then this most probably a case where
1411                         # test declares multiple interfaces but traffic hasn't
1412                         # passed through this one yet - which means the counter
1413                         # value is 0 and can be ignored
1414                         if 0 != diff:
1415                             raise
1416
1417     def send_and_assert_no_replies(
1418         self, intf, pkts, remark="", timeout=None, stats_diff=None, trace=True, msg=None
1419     ):
1420         if stats_diff:
1421             stats_snapshot = self.snapshot_stats(stats_diff)
1422
1423         self.pg_send(intf, pkts)
1424
1425         try:
1426             if not timeout:
1427                 timeout = 1
1428             for i in self.pg_interfaces:
1429                 i.assert_nothing_captured(timeout=timeout, remark=remark)
1430                 timeout = 0.1
1431         finally:
1432             if trace:
1433                 if msg:
1434                     self.logger.debug(f"send_and_assert_no_replies: {msg}")
1435                 self.logger.debug(self.vapi.cli("show trace"))
1436
1437         if stats_diff:
1438             self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1439
1440     def send_and_expect(
1441         self,
1442         intf,
1443         pkts,
1444         output,
1445         n_rx=None,
1446         worker=None,
1447         trace=True,
1448         msg=None,
1449         stats_diff=None,
1450     ):
1451         if stats_diff:
1452             stats_snapshot = self.snapshot_stats(stats_diff)
1453
1454         if not n_rx:
1455             n_rx = 1 if isinstance(pkts, Packet) else len(pkts)
1456         self.pg_send(intf, pkts, worker=worker, trace=trace)
1457         rx = output.get_capture(n_rx)
1458         if trace:
1459             if msg:
1460                 self.logger.debug(f"send_and_expect: {msg}")
1461             self.logger.debug(self.vapi.cli("show trace"))
1462
1463         if stats_diff:
1464             self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1465
1466         return rx
1467
1468     def send_and_expect_load_balancing(
1469         self, input, pkts, outputs, worker=None, trace=True
1470     ):
1471         self.pg_send(input, pkts, worker=worker, trace=trace)
1472         rxs = []
1473         for oo in outputs:
1474             rx = oo._get_capture(1)
1475             self.assertNotEqual(0, len(rx))
1476             rxs.append(rx)
1477         if trace:
1478             self.logger.debug(self.vapi.cli("show trace"))
1479         return rxs
1480
1481     def send_and_expect_some(self, intf, pkts, output, worker=None, trace=True):
1482         self.pg_send(intf, pkts, worker=worker, trace=trace)
1483         rx = output._get_capture(1)
1484         if trace:
1485             self.logger.debug(self.vapi.cli("show trace"))
1486         self.assertTrue(len(rx) > 0)
1487         self.assertTrue(len(rx) < len(pkts))
1488         return rx
1489
1490     def send_and_expect_only(self, intf, pkts, output, timeout=None, stats_diff=None):
1491         if stats_diff:
1492             stats_snapshot = self.snapshot_stats(stats_diff)
1493
1494         self.pg_send(intf, pkts)
1495         rx = output.get_capture(len(pkts))
1496         outputs = [output]
1497         if not timeout:
1498             timeout = 1
1499         for i in self.pg_interfaces:
1500             if i not in outputs:
1501                 i.assert_nothing_captured(timeout=timeout)
1502                 timeout = 0.1
1503
1504         if stats_diff:
1505             self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1506
1507         return rx
1508
1509
1510 def get_testcase_doc_name(test):
1511     return getdoc(test.__class__).splitlines()[0]
1512
1513
1514 def get_test_description(descriptions, test):
1515     short_description = test.shortDescription()
1516     if descriptions and short_description:
1517         return short_description
1518     else:
1519         return str(test)
1520
1521
1522 class TestCaseInfo(object):
1523     def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1524         self.logger = logger
1525         self.tempdir = tempdir
1526         self.vpp_pid = vpp_pid
1527         self.vpp_bin_path = vpp_bin_path
1528         self.core_crash_test = None
1529
1530
1531 class VppTestResult(unittest.TestResult):
1532     """
1533     @property result_string
1534      String variable to store the test case result string.
1535     @property errors
1536      List variable containing 2-tuples of TestCase instances and strings
1537      holding formatted tracebacks. Each tuple represents a test which
1538      raised an unexpected exception.
1539     @property failures
1540      List variable containing 2-tuples of TestCase instances and strings
1541      holding formatted tracebacks. Each tuple represents a test where
1542      a failure was explicitly signalled using the TestCase.assert*()
1543      methods.
1544     """
1545
1546     failed_test_cases_info = set()
1547     core_crash_test_cases_info = set()
1548     current_test_case_info = None
1549
1550     def __init__(self, stream=None, descriptions=None, verbosity=None, runner=None):
1551         """
1552         :param stream File descriptor to store where to report test results.
1553             Set to the standard error stream by default.
1554         :param descriptions Boolean variable to store information if to use
1555             test case descriptions.
1556         :param verbosity Integer variable to store required verbosity level.
1557         """
1558         super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1559         self.stream = stream
1560         self.descriptions = descriptions
1561         self.verbosity = verbosity
1562         self.result_string = None
1563         self.runner = runner
1564         self.printed = []
1565
1566     def addSuccess(self, test):
1567         """
1568         Record a test succeeded result
1569
1570         :param test:
1571
1572         """
1573         if self.current_test_case_info:
1574             self.current_test_case_info.logger.debug(
1575                 "--- addSuccess() %s.%s(%s) called"
1576                 % (test.__class__.__name__, test._testMethodName, test._testMethodDoc)
1577             )
1578         unittest.TestResult.addSuccess(self, test)
1579         self.result_string = colorize("OK", GREEN)
1580
1581         self.send_result_through_pipe(test, PASS)
1582
1583     def addSkip(self, test, reason):
1584         """
1585         Record a test skipped.
1586
1587         :param test:
1588         :param reason:
1589
1590         """
1591         if self.current_test_case_info:
1592             self.current_test_case_info.logger.debug(
1593                 "--- addSkip() %s.%s(%s) called, reason is %s"
1594                 % (
1595                     test.__class__.__name__,
1596                     test._testMethodName,
1597                     test._testMethodDoc,
1598                     reason,
1599                 )
1600             )
1601         unittest.TestResult.addSkip(self, test, reason)
1602         self.result_string = colorize("SKIP", YELLOW)
1603
1604         if reason == "not enough cpus":
1605             self.send_result_through_pipe(test, SKIP_CPU_SHORTAGE)
1606         else:
1607             self.send_result_through_pipe(test, SKIP)
1608
1609     def symlink_failed(self):
1610         if self.current_test_case_info:
1611             try:
1612                 failed_dir = config.failed_dir
1613                 link_path = os.path.join(
1614                     failed_dir,
1615                     "%s-FAILED" % os.path.basename(self.current_test_case_info.tempdir),
1616                 )
1617
1618                 self.current_test_case_info.logger.debug(
1619                     "creating a link to the failed test"
1620                 )
1621                 self.current_test_case_info.logger.debug(
1622                     "os.symlink(%s, %s)"
1623                     % (self.current_test_case_info.tempdir, link_path)
1624                 )
1625                 if os.path.exists(link_path):
1626                     self.current_test_case_info.logger.debug("symlink already exists")
1627                 else:
1628                     os.symlink(self.current_test_case_info.tempdir, link_path)
1629
1630             except Exception as e:
1631                 self.current_test_case_info.logger.error(e)
1632
1633     def send_result_through_pipe(self, test, result):
1634         if hasattr(self, "test_framework_result_pipe"):
1635             pipe = self.test_framework_result_pipe
1636             if pipe:
1637                 pipe.send((test.id(), result))
1638
1639     def log_error(self, test, err, fn_name):
1640         if self.current_test_case_info:
1641             if isinstance(test, unittest.suite._ErrorHolder):
1642                 test_name = test.description
1643             else:
1644                 test_name = "%s.%s(%s)" % (
1645                     test.__class__.__name__,
1646                     test._testMethodName,
1647                     test._testMethodDoc,
1648                 )
1649             self.current_test_case_info.logger.debug(
1650                 "--- %s() %s called, err is %s" % (fn_name, test_name, err)
1651             )
1652             self.current_test_case_info.logger.debug(
1653                 "formatted exception is:\n%s" % "".join(format_exception(*err))
1654             )
1655
1656     def add_error(self, test, err, unittest_fn, error_type):
1657         if error_type == FAIL:
1658             self.log_error(test, err, "addFailure")
1659             error_type_str = colorize("FAIL", RED)
1660         elif error_type == ERROR:
1661             self.log_error(test, err, "addError")
1662             error_type_str = colorize("ERROR", RED)
1663         else:
1664             raise Exception(
1665                 "Error type %s cannot be used to record an "
1666                 "error or a failure" % error_type
1667             )
1668
1669         unittest_fn(self, test, err)
1670         if self.current_test_case_info:
1671             self.result_string = "%s [ temp dir used by test case: %s ]" % (
1672                 error_type_str,
1673                 self.current_test_case_info.tempdir,
1674             )
1675             self.symlink_failed()
1676             self.failed_test_cases_info.add(self.current_test_case_info)
1677             if is_core_present(self.current_test_case_info.tempdir):
1678                 if not self.current_test_case_info.core_crash_test:
1679                     if isinstance(test, unittest.suite._ErrorHolder):
1680                         test_name = str(test)
1681                     else:
1682                         test_name = "'{!s}' ({!s})".format(
1683                             get_testcase_doc_name(test), test.id()
1684                         )
1685                     self.current_test_case_info.core_crash_test = test_name
1686                 self.core_crash_test_cases_info.add(self.current_test_case_info)
1687         else:
1688             self.result_string = "%s [no temp dir]" % error_type_str
1689
1690         self.send_result_through_pipe(test, error_type)
1691
1692     def addFailure(self, test, err):
1693         """
1694         Record a test failed result
1695
1696         :param test:
1697         :param err: error message
1698
1699         """
1700         self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1701
1702     def addError(self, test, err):
1703         """
1704         Record a test error result
1705
1706         :param test:
1707         :param err: error message
1708
1709         """
1710         self.add_error(test, err, unittest.TestResult.addError, ERROR)
1711
1712     def getDescription(self, test):
1713         """
1714         Get test description
1715
1716         :param test:
1717         :returns: test description
1718
1719         """
1720         return get_test_description(self.descriptions, test)
1721
1722     def startTest(self, test):
1723         """
1724         Start a test
1725
1726         :param test:
1727
1728         """
1729
1730         def print_header(test):
1731             if test.__class__ in self.printed:
1732                 return
1733
1734             test_doc = getdoc(test)
1735             if not test_doc:
1736                 raise Exception("No doc string for test '%s'" % test.id())
1737
1738             test_title = test_doc.splitlines()[0].rstrip()
1739             test_title = colorize(test_title, GREEN)
1740             if test.is_tagged_run_solo():
1741                 test_title = colorize(f"SOLO RUN: {test_title}", YELLOW)
1742
1743             # This block may overwrite the colorized title above,
1744             # but we want this to stand out and be fixed
1745             if test.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
1746                 test_title = colorize(f"FIXME with VPP workers: {test_title}", RED)
1747
1748             if test.has_tag(TestCaseTag.FIXME_ASAN):
1749                 test_title = colorize(f"FIXME with ASAN: {test_title}", RED)
1750                 test.skip_fixme_asan()
1751
1752             if hasattr(test, "vpp_worker_count"):
1753                 if test.vpp_worker_count == 0:
1754                     test_title += " [main thread only]"
1755                 elif test.vpp_worker_count == 1:
1756                     test_title += " [1 worker thread]"
1757                 else:
1758                     test_title += f" [{test.vpp_worker_count} worker threads]"
1759
1760             if test.__class__.skipped_due_to_cpu_lack:
1761                 test_title = colorize(
1762                     f"{test_title} [skipped - not enough cpus, "
1763                     f"required={test.__class__.get_cpus_required()}, "
1764                     f"available={max_vpp_cpus}]",
1765                     YELLOW,
1766                 )
1767
1768             print(double_line_delim)
1769             print(test_title)
1770             print(double_line_delim)
1771             self.printed.append(test.__class__)
1772
1773         print_header(test)
1774         self.start_test = time.time()
1775         unittest.TestResult.startTest(self, test)
1776         if self.verbosity > 0:
1777             self.stream.writeln("Starting " + self.getDescription(test) + " ...")
1778             self.stream.writeln(single_line_delim)
1779
1780     def stopTest(self, test):
1781         """
1782         Called when the given test has been run
1783
1784         :param test:
1785
1786         """
1787         unittest.TestResult.stopTest(self, test)
1788
1789         if self.verbosity > 0:
1790             self.stream.writeln(single_line_delim)
1791             self.stream.writeln(
1792                 "%-73s%s" % (self.getDescription(test), self.result_string)
1793             )
1794             self.stream.writeln(single_line_delim)
1795         else:
1796             self.stream.writeln(
1797                 "%-68s %4.2f %s"
1798                 % (
1799                     self.getDescription(test),
1800                     time.time() - self.start_test,
1801                     self.result_string,
1802                 )
1803             )
1804
1805         self.send_result_through_pipe(test, TEST_RUN)
1806
1807     def printErrors(self):
1808         """
1809         Print errors from running the test case
1810         """
1811         if len(self.errors) > 0 or len(self.failures) > 0:
1812             self.stream.writeln()
1813             self.printErrorList("ERROR", self.errors)
1814             self.printErrorList("FAIL", self.failures)
1815
1816         # ^^ that is the last output from unittest before summary
1817         if not self.runner.print_summary:
1818             devnull = unittest.runner._WritelnDecorator(open(os.devnull, "w"))
1819             self.stream = devnull
1820             self.runner.stream = devnull
1821
1822     def printErrorList(self, flavour, errors):
1823         """
1824         Print error list to the output stream together with error type
1825         and test case description.
1826
1827         :param flavour: error type
1828         :param errors: iterable errors
1829
1830         """
1831         for test, err in errors:
1832             self.stream.writeln(double_line_delim)
1833             self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
1834             self.stream.writeln(single_line_delim)
1835             self.stream.writeln("%s" % err)
1836
1837
1838 class VppTestRunner(unittest.TextTestRunner):
1839     """
1840     A basic test runner implementation which prints results to standard error.
1841     """
1842
1843     @property
1844     def resultclass(self):
1845         """Class maintaining the results of the tests"""
1846         return VppTestResult
1847
1848     def __init__(
1849         self,
1850         keep_alive_pipe=None,
1851         descriptions=True,
1852         verbosity=1,
1853         result_pipe=None,
1854         failfast=False,
1855         buffer=False,
1856         resultclass=None,
1857         print_summary=True,
1858         **kwargs,
1859     ):
1860         # ignore stream setting here, use hard-coded stdout to be in sync
1861         # with prints from VppTestCase methods ...
1862         super(VppTestRunner, self).__init__(
1863             sys.stdout, descriptions, verbosity, failfast, buffer, resultclass, **kwargs
1864         )
1865         KeepAliveReporter.pipe = keep_alive_pipe
1866
1867         self.orig_stream = self.stream
1868         self.resultclass.test_framework_result_pipe = result_pipe
1869
1870         self.print_summary = print_summary
1871
1872     def _makeResult(self):
1873         return self.resultclass(self.stream, self.descriptions, self.verbosity, self)
1874
1875     def run(self, test):
1876         """
1877         Run the tests
1878
1879         :param test:
1880
1881         """
1882         faulthandler.enable()  # emit stack trace to stderr if killed by signal
1883
1884         result = super(VppTestRunner, self).run(test)
1885         if not self.print_summary:
1886             self.stream = self.orig_stream
1887             result.stream = self.orig_stream
1888         return result
1889
1890
1891 class Worker(Thread):
1892     def __init__(self, executable_args, logger, env=None, *args, **kwargs):
1893         super(Worker, self).__init__(*args, **kwargs)
1894         self.logger = logger
1895         self.args = executable_args
1896         if hasattr(self, "testcase") and self.testcase.debug_all:
1897             if self.testcase.debug_gdbserver:
1898                 self.args = [
1899                     "/usr/bin/gdbserver",
1900                     "localhost:{port}".format(port=self.testcase.gdbserver_port),
1901                 ] + args
1902             elif self.testcase.debug_gdb and hasattr(self, "wait_for_gdb"):
1903                 self.args.append(self.wait_for_gdb)
1904         self.app_bin = executable_args[0]
1905         self.app_name = os.path.basename(self.app_bin)
1906         if hasattr(self, "role"):
1907             self.app_name += " {role}".format(role=self.role)
1908         self.process = None
1909         self.result = None
1910         env = {} if env is None else env
1911         self.env = copy.deepcopy(env)
1912
1913     def wait_for_enter(self):
1914         if not hasattr(self, "testcase"):
1915             return
1916         if self.testcase.debug_all and self.testcase.debug_gdbserver:
1917             print()
1918             print(double_line_delim)
1919             print(
1920                 "Spawned GDB Server for '{app}' with PID: {pid}".format(
1921                     app=self.app_name, pid=self.process.pid
1922                 )
1923             )
1924         elif self.testcase.debug_all and self.testcase.debug_gdb:
1925             print()
1926             print(double_line_delim)
1927             print(
1928                 "Spawned '{app}' with PID: {pid}".format(
1929                     app=self.app_name, pid=self.process.pid
1930                 )
1931             )
1932         else:
1933             return
1934         print(single_line_delim)
1935         print("You can debug '{app}' using:".format(app=self.app_name))
1936         if self.testcase.debug_gdbserver:
1937             print(
1938                 "sudo gdb "
1939                 + self.app_bin
1940                 + " -ex 'target remote localhost:{port}'".format(
1941                     port=self.testcase.gdbserver_port
1942                 )
1943             )
1944             print(
1945                 "Now is the time to attach gdb by running the above "
1946                 "command, set up breakpoints etc., then resume from "
1947                 "within gdb by issuing the 'continue' command"
1948             )
1949             self.testcase.gdbserver_port += 1
1950         elif self.testcase.debug_gdb:
1951             print(
1952                 "sudo gdb "
1953                 + self.app_bin
1954                 + " -ex 'attach {pid}'".format(pid=self.process.pid)
1955             )
1956             print(
1957                 "Now is the time to attach gdb by running the above "
1958                 "command and set up breakpoints etc., then resume from"
1959                 " within gdb by issuing the 'continue' command"
1960             )
1961         print(single_line_delim)
1962         input("Press ENTER to continue running the testcase...")
1963
1964     def run(self):
1965         executable = self.args[0]
1966         if not os.path.exists(executable) or not os.access(
1967             executable, os.F_OK | os.X_OK
1968         ):
1969             # Exit code that means some system file did not exist,
1970             # could not be opened, or had some other kind of error.
1971             self.result = os.EX_OSFILE
1972             raise EnvironmentError(
1973                 "executable '%s' is not found or executable." % executable
1974             )
1975         self.logger.debug(
1976             "Running executable '{app}': '{cmd}'".format(
1977                 app=self.app_name, cmd=" ".join(self.args)
1978             )
1979         )
1980         env = os.environ.copy()
1981         env.update(self.env)
1982         env["CK_LOG_FILE_NAME"] = "-"
1983         self.process = subprocess.Popen(
1984             ["stdbuf", "-o0", "-e0"] + self.args,
1985             shell=False,
1986             env=env,
1987             preexec_fn=os.setpgrp,
1988             stdout=subprocess.PIPE,
1989             stderr=subprocess.PIPE,
1990         )
1991         self.wait_for_enter()
1992         out, err = self.process.communicate()
1993         self.logger.debug("Finished running `{app}'".format(app=self.app_name))
1994         self.logger.info("Return code is `%s'" % self.process.returncode)
1995         self.logger.info(single_line_delim)
1996         self.logger.info(
1997             "Executable `{app}' wrote to stdout:".format(app=self.app_name)
1998         )
1999         self.logger.info(single_line_delim)
2000         self.logger.info(out.decode("utf-8"))
2001         self.logger.info(single_line_delim)
2002         self.logger.info(
2003             "Executable `{app}' wrote to stderr:".format(app=self.app_name)
2004         )
2005         self.logger.info(single_line_delim)
2006         self.logger.info(err.decode("utf-8"))
2007         self.logger.info(single_line_delim)
2008         self.result = self.process.returncode
2009
2010
2011 if __name__ == "__main__":
2012     pass