tests: Use errno value rather than a specific int
[vpp.git] / test / framework.py
index d7a0026..6ff03d8 100644 (file)
@@ -1,36 +1,49 @@
-#!/usr/bin/env python
+#!/usr/bin/env python3
 
 from __future__ import print_function
-import gc
+import logging
 import sys
 import os
 import select
+import signal
+import subprocess
 import unittest
-import tempfile
+import re
 import time
-import resource
+import faulthandler
+import random
+import copy
+import platform
+import shutil
 from collections import deque
 from threading import Thread, Event
-from inspect import getdoc
+from inspect import getdoc, isclass
 from traceback import format_exception
 from logging import FileHandler, DEBUG, Formatter
-from scapy.packet import Raw
-from hook import StepHook, PollHook
+from enum import Enum
+from abc import ABC, abstractmethod
+from struct import pack, unpack
+
+import scapy.compat
+from scapy.packet import Raw, Packet
 from vpp_pg_interface import VppPGInterface
 from vpp_sub_interface import VppSubInterface
 from vpp_lo_interface import VppLoInterface
+from vpp_bvi_interface import VppBviInterface
 from vpp_papi_provider import VppPapiProvider
-from log import *
+from vpp_papi import VppEnum
+import vpp_papi
 from vpp_object import VppObjectRegistry
-if os.name == 'posix' and sys.version_info[0] < 3:
-    # using subprocess32 is recommended by python official documentation
-    # @ https://docs.python.org/2/library/subprocess.html
-    import subprocess32 as subprocess
-else:
-    import subprocess
+from util import ppp, is_core_present
+from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
+from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
+from scapy.layers.inet6 import ICMPv6EchoReply
+from vpp_running import use_running
+from asfframework import VppAsfTestCase
+
 
 """
-  Test framework module.
+  Packet Generator / Scapy Test framework module.
 
   The module provides a set of tools for constructing and running tests and
   representing the results.
@@ -43,6 +56,7 @@ class _PacketInfo(object):
     Help process information about the next packet.
     Set variables to default values.
     """
+
     #: Store the index of the packet.
     index = -1
     #: Store the index of the source packet generator interface of the packet.
@@ -65,33 +79,8 @@ class _PacketInfo(object):
         return index and src and dst and data
 
 
-def pump_output(testclass):
-    """ pump output from vpp stdout/stderr to proper queues """
-    while not testclass.pump_thread_stop_flag.wait(0):
-        readable = select.select([testclass.vpp.stdout.fileno(),
-                                  testclass.vpp.stderr.fileno(),
-                                  testclass.pump_thread_wakeup_pipe[0]],
-                                 [], [])[0]
-        if testclass.vpp.stdout.fileno() in readable:
-            read = os.read(testclass.vpp.stdout.fileno(), 1024)
-            testclass.vpp_stdout_deque.append(read)
-        if testclass.vpp.stderr.fileno() in readable:
-            read = os.read(testclass.vpp.stderr.fileno(), 1024)
-            testclass.vpp_stderr_deque.append(read)
-        # ignoring the dummy pipe here intentionally - the flag will take care
-        # of properly terminating the loop
-
-
-def running_extended_tests():
-    try:
-        s = os.getenv("EXTENDED_TESTS")
-        return True if s.lower() in ("y", "yes", "1") else False
-    except:
-        return False
-    return False
-
-
-class VppTestCase(unittest.TestCase):
+@use_running
+class VppTestCase(VppAsfTestCase):
     """This subclass is a base class for VPP test cases that are implemented as
     classes. It provides methods to create and run test case.
     """
@@ -109,328 +98,66 @@ class VppTestCase(unittest.TestCase):
         else:
             return 0
 
-    @classmethod
-    def instance(cls):
-        """Return the instance of this testcase"""
-        return cls.test_instance
-
-    @classmethod
-    def set_debug_flags(cls, d):
-        cls.debug_core = False
-        cls.debug_gdb = False
-        cls.debug_gdbserver = False
-        if d is None:
-            return
-        dl = d.lower()
-        if dl == "core":
-            cls.debug_core = True
-        elif dl == "gdb":
-            cls.debug_gdb = True
-        elif dl == "gdbserver":
-            cls.debug_gdbserver = True
-        else:
-            raise Exception("Unrecognized DEBUG option: '%s'" % d)
-
-    @classmethod
-    def setUpConstants(cls):
-        """ Set-up the test case class based on environment variables """
-        try:
-            s = os.getenv("STEP")
-            cls.step = True if s.lower() in ("y", "yes", "1") else False
-        except:
-            cls.step = False
-        try:
-            d = os.getenv("DEBUG")
-        except:
-            d = None
-        cls.set_debug_flags(d)
-        cls.vpp_bin = os.getenv('VPP_TEST_BIN', "vpp")
-        cls.plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
-        debug_cli = ""
-        if cls.step or cls.debug_gdb or cls.debug_gdbserver:
-            debug_cli = "cli-listen localhost:5002"
-        coredump_size = None
-        try:
-            size = os.getenv("COREDUMP_SIZE")
-            if size is not None:
-                coredump_size = "coredump-size %s" % size
-        except:
-            pass
-        if coredump_size is None:
-            coredump_size = "coredump-size unlimited"
-        cls.vpp_cmdline = [cls.vpp_bin, "unix",
-                           "{", "nodaemon", debug_cli, coredump_size, "}",
-                           "api-trace", "{", "on", "}",
-                           "api-segment", "{", "prefix", cls.shm_prefix, "}",
-                           "plugins", "{", "plugin", "dpdk_plugin.so", "{",
-                           "disable", "}", "}"]
-        if cls.plugin_path is not None:
-            cls.vpp_cmdline.extend(["plugin_path", cls.plugin_path])
-        cls.logger.info("vpp_cmdline: %s" % cls.vpp_cmdline)
-
-    @classmethod
-    def wait_for_enter(cls):
-        if cls.debug_gdbserver:
-            print(double_line_delim)
-            print("Spawned GDB server with PID: %d" % cls.vpp.pid)
-        elif cls.debug_gdb:
-            print(double_line_delim)
-            print("Spawned VPP with PID: %d" % cls.vpp.pid)
-        else:
-            cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
-            return
-        print(single_line_delim)
-        print("You can debug the VPP using e.g.:")
-        if cls.debug_gdbserver:
-            print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
-            print("Now is the time to attach a gdb by running the above "
-                  "command, set up breakpoints etc. and then resume VPP from "
-                  "within gdb by issuing the 'continue' command")
-        elif cls.debug_gdb:
-            print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
-            print("Now is the time to attach a gdb by running the above "
-                  "command and set up breakpoints etc.")
-        print(single_line_delim)
-        raw_input("Press ENTER to continue running the testcase...")
-
-    @classmethod
-    def run_vpp(cls):
-        cmdline = cls.vpp_cmdline
-
-        if cls.debug_gdbserver:
-            gdbserver = '/usr/bin/gdbserver'
-            if not os.path.isfile(gdbserver) or \
-                    not os.access(gdbserver, os.X_OK):
-                raise Exception("gdbserver binary '%s' does not exist or is "
-                                "not executable" % gdbserver)
-
-            cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
-            cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
-
-        try:
-            cls.vpp = subprocess.Popen(cmdline,
-                                       stdout=subprocess.PIPE,
-                                       stderr=subprocess.PIPE,
-                                       bufsize=1)
-        except Exception as e:
-            cls.logger.critical("Couldn't start vpp: %s" % e)
-            raise
-
-        cls.wait_for_enter()
-
     @classmethod
     def setUpClass(cls):
-        """
-        Perform class setup before running the testcase
-        Remove shared memory files, start vpp and connect the vpp-api
-        """
-        gc.collect()  # run garbage collection first
-        cls.logger = getLogger(cls.__name__)
-        cls.tempdir = tempfile.mkdtemp(
-            prefix='vpp-unittest-' + cls.__name__ + '-')
-        file_handler = FileHandler("%s/log.txt" % cls.tempdir)
-        file_handler.setFormatter(
-            Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
-                      datefmt="%H:%M:%S"))
-        file_handler.setLevel(DEBUG)
-        cls.logger.addHandler(file_handler)
-        cls.shm_prefix = cls.tempdir.split("/")[-1]
-        os.chdir(cls.tempdir)
-        cls.logger.info("Temporary dir is %s, shm prefix is %s",
-                        cls.tempdir, cls.shm_prefix)
-        cls.setUpConstants()
+        super(VppTestCase, cls).setUpClass()
         cls.reset_packet_infos()
-        cls._captures = []
-        cls._zombie_captures = []
-        cls.verbose = 0
-        cls.vpp_dead = False
-        cls.registry = VppObjectRegistry()
-        # need to catch exceptions here because if we raise, then the cleanup
-        # doesn't get called and we might end with a zombie vpp
-        try:
-            cls.run_vpp()
-            cls.vpp_stdout_deque = deque()
-            cls.vpp_stderr_deque = deque()
-            cls.pump_thread_stop_flag = Event()
-            cls.pump_thread_wakeup_pipe = os.pipe()
-            cls.pump_thread = Thread(target=pump_output, args=(cls,))
-            cls.pump_thread.daemon = True
-            cls.pump_thread.start()
-            cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls)
-            if cls.step:
-                hook = StepHook(cls)
-            else:
-                hook = PollHook(cls)
-            cls.vapi.register_hook(hook)
-            cls.sleep(0.1, "after vpp startup, before initial poll")
-            hook.poll_vpp()
-            try:
-                cls.vapi.connect()
-            except:
-                if cls.debug_gdbserver:
-                    print(colorize("You're running VPP inside gdbserver but "
-                                   "VPP-API connection failed, did you forget "
-                                   "to 'continue' VPP from within gdb?", RED))
-                raise
-        except:
-            t, v, tb = sys.exc_info()
-            try:
-                cls.quit()
-            except:
-                pass
-            raise t, v, tb
-
-    @classmethod
-    def quit(cls):
-        """
-        Disconnect vpp-api, kill vpp and cleanup shared memory files
-        """
-        if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
-            cls.vpp.poll()
-            if cls.vpp.returncode is None:
-                print(double_line_delim)
-                print("VPP or GDB server is still running")
-                print(single_line_delim)
-                raw_input("When done debugging, press ENTER to kill the "
-                          "process and finish running the testcase...")
-
-        os.write(cls.pump_thread_wakeup_pipe[1], 'ding dong wake up')
-        cls.pump_thread_stop_flag.set()
-        if hasattr(cls, 'pump_thread'):
-            cls.logger.debug("Waiting for pump thread to stop")
-            cls.pump_thread.join()
-        if hasattr(cls, 'vpp_stderr_reader_thread'):
-            cls.logger.debug("Waiting for stdderr pump to stop")
-            cls.vpp_stderr_reader_thread.join()
-
-        if hasattr(cls, 'vpp'):
-            if hasattr(cls, 'vapi'):
-                cls.vapi.disconnect()
-                del cls.vapi
-            cls.vpp.poll()
-            if cls.vpp.returncode is None:
-                cls.logger.debug("Sending TERM to vpp")
-                cls.vpp.terminate()
-                cls.logger.debug("Waiting for vpp to die")
-                cls.vpp.communicate()
-            del cls.vpp
-
-        if hasattr(cls, 'vpp_stdout_deque'):
-            cls.logger.info(single_line_delim)
-            cls.logger.info('VPP output to stdout while running %s:',
-                            cls.__name__)
-            cls.logger.info(single_line_delim)
-            f = open(cls.tempdir + '/vpp_stdout.txt', 'w')
-            vpp_output = "".join(cls.vpp_stdout_deque)
-            f.write(vpp_output)
-            cls.logger.info('\n%s', vpp_output)
-            cls.logger.info(single_line_delim)
-
-        if hasattr(cls, 'vpp_stderr_deque'):
-            cls.logger.info(single_line_delim)
-            cls.logger.info('VPP output to stderr while running %s:',
-                            cls.__name__)
-            cls.logger.info(single_line_delim)
-            f = open(cls.tempdir + '/vpp_stderr.txt', 'w')
-            vpp_output = "".join(cls.vpp_stderr_deque)
-            f.write(vpp_output)
-            cls.logger.info('\n%s', vpp_output)
-            cls.logger.info(single_line_delim)
+        cls._pcaps = []
+        cls._old_pcaps = []
 
     @classmethod
     def tearDownClass(cls):
-        """ Perform final cleanup after running all tests in this test-case """
-        cls.quit()
-
-    def tearDown(self):
-        """ Show various debug prints after each test """
-        self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
-                          (self.__class__.__name__, self._testMethodName,
-                           self._testMethodDoc))
-        if not self.vpp_dead:
-            self.logger.debug(self.vapi.cli("show trace"))
-            self.logger.info(self.vapi.ppcli("show int"))
-            self.logger.info(self.vapi.ppcli("show hardware"))
-            self.logger.info(self.vapi.ppcli("show error"))
-            self.logger.info(self.vapi.ppcli("show run"))
-            self.registry.remove_vpp_config(self.logger)
-            # Save/Dump VPP api trace log
-            api_trace = "vpp_api_trace.%s.log" % self._testMethodName
-            tmp_api_trace = "/tmp/%s" % api_trace
-            vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
-            self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
-            self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
-                                                    vpp_api_trace_log))
-            os.rename(tmp_api_trace, vpp_api_trace_log)
-            self.logger.info(self.vapi.ppcli("api trace dump %s" %
-                                             vpp_api_trace_log))
-        else:
-            self.registry.unregister_all(self.logger)
-
-    def setUp(self):
-        """ Clear trace before running each test"""
-        self.logger.debug("--- setUp() for %s.%s(%s) called ---" %
-                          (self.__class__.__name__, self._testMethodName,
-                           self._testMethodDoc))
-        if self.vpp_dead:
-            raise Exception("VPP is dead when setting up the test")
-        self.sleep(.1, "during setUp")
-        self.vpp_stdout_deque.append(
-            "--- test setUp() for %s.%s(%s) starts here ---\n" %
-            (self.__class__.__name__, self._testMethodName,
-             self._testMethodDoc))
-        self.vpp_stderr_deque.append(
-            "--- test setUp() for %s.%s(%s) starts here ---\n" %
-            (self.__class__.__name__, self._testMethodName,
-             self._testMethodDoc))
-        self.vapi.cli("clear trace")
-        # store the test instance inside the test class - so that objects
-        # holding the class can access instance methods (like assertEqual)
-        type(self).test_instance = self
+        cls.logger.debug("--- tearDownClass() for %s called ---" % cls.__name__)
+        cls.reset_packet_infos()
+        super(VppTestCase, cls).tearDownClass()
 
     @classmethod
-    def pg_enable_capture(cls, interfaces):
+    def pg_enable_capture(cls, interfaces=None):
         """
         Enable capture on packet-generator interfaces
 
-        :param interfaces: iterable interface indexes
+        :param interfaces: iterable interface indexes (if None,
+                           use self.pg_interfaces)
 
         """
+        if interfaces is None:
+            interfaces = cls.pg_interfaces
         for i in interfaces:
             i.enable_capture()
 
     @classmethod
-    def register_capture(cls, cap_name):
-        """ Register a capture in the testclass """
+    def register_pcap(cls, intf, worker):
+        """Register a pcap in the testclass"""
         # add to the list of captures with current timestamp
-        cls._captures.append((time.time(), cap_name))
-        # filter out from zombies
-        cls._zombie_captures = [(stamp, name)
-                                for (stamp, name) in cls._zombie_captures
-                                if name != cap_name]
+        cls._pcaps.append((intf, worker))
 
     @classmethod
-    def pg_start(cls):
-        """ Remove any zombie captures and enable the packet generator """
-        # how long before capture is allowed to be deleted - otherwise vpp
-        # crashes - 100ms seems enough (this shouldn't be needed at all)
-        capture_ttl = 0.1
-        now = time.time()
-        for stamp, cap_name in cls._zombie_captures:
-            wait = stamp + capture_ttl - now
-            if wait > 0:
-                cls.sleep(wait, "before deleting capture %s" % cap_name)
-                now = time.time()
-            cls.logger.debug("Removing zombie capture %s" % cap_name)
-            cls.vapi.cli('packet-generator delete %s' % cap_name)
-
-        cls.vapi.cli("trace add pg-input 50")  # 50 is maximum
-        cls.vapi.cli('packet-generator enable')
-        cls._zombie_captures = cls._captures
-        cls._captures = []
+    def pg_start(cls, trace=True, traceFilter=False):
+        """Enable the PG, wait till it is done, then clean up"""
+        for intf, worker in cls._old_pcaps:
+            intf.remove_old_pcap_file(intf.get_in_path(worker))
+        cls._old_pcaps = []
+        if trace:
+            cls.vapi.cli("clear trace")
+            cls.vapi.cli("trace add pg-input 1000" + (" filter" if traceFilter else ""))
+        cls.vapi.cli("packet-generator enable")
+        # PG, when starts, runs to completion -
+        # so let's avoid a race condition,
+        # and wait a little till it's done.
+        # Then clean it up  - and then be gone.
+        deadline = time.time() + 300
+        while cls.vapi.cli("show packet-generator").find("Yes") != -1:
+            cls.sleep(0.01)  # yield
+            if time.time() > deadline:
+                cls.logger.error("Timeout waiting for pg to stop")
+                break
+        for intf, worker in cls._pcaps:
+            cls.vapi.cli("packet-generator delete %s" % intf.get_cap_name(worker))
+        cls._old_pcaps = cls._pcaps
+        cls._pcaps = []
 
     @classmethod
-    def create_pg_interfaces(cls, interfaces):
+    def create_pg_interfaces_internal(cls, interfaces, gso=0, gso_size=0, mode=None):
         """
         Create packet-generator interfaces.
 
@@ -440,46 +167,106 @@ class VppTestCase(unittest.TestCase):
         """
         result = []
         for i in interfaces:
-            intf = VppPGInterface(cls, i)
+            intf = VppPGInterface(cls, i, gso, gso_size, mode)
             setattr(cls, intf.name, intf)
             result.append(intf)
         cls.pg_interfaces = result
         return result
 
     @classmethod
-    def create_loopback_interfaces(cls, interfaces):
+    def create_pg_ip4_interfaces(cls, interfaces, gso=0, gso_size=0):
+        if not hasattr(cls, "vpp"):
+            cls.pg_interfaces = []
+            return cls.pg_interfaces
+        pgmode = VppEnum.vl_api_pg_interface_mode_t
+        return cls.create_pg_interfaces_internal(
+            interfaces, gso, gso_size, pgmode.PG_API_MODE_IP4
+        )
+
+    @classmethod
+    def create_pg_ip6_interfaces(cls, interfaces, gso=0, gso_size=0):
+        if not hasattr(cls, "vpp"):
+            cls.pg_interfaces = []
+            return cls.pg_interfaces
+        pgmode = VppEnum.vl_api_pg_interface_mode_t
+        return cls.create_pg_interfaces_internal(
+            interfaces, gso, gso_size, pgmode.PG_API_MODE_IP6
+        )
+
+    @classmethod
+    def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0):
+        if not hasattr(cls, "vpp"):
+            cls.pg_interfaces = []
+            return cls.pg_interfaces
+        pgmode = VppEnum.vl_api_pg_interface_mode_t
+        return cls.create_pg_interfaces_internal(
+            interfaces, gso, gso_size, pgmode.PG_API_MODE_ETHERNET
+        )
+
+    @classmethod
+    def create_pg_ethernet_interfaces(cls, interfaces, gso=0, gso_size=0):
+        if not hasattr(cls, "vpp"):
+            cls.pg_interfaces = []
+            return cls.pg_interfaces
+        pgmode = VppEnum.vl_api_pg_interface_mode_t
+        return cls.create_pg_interfaces_internal(
+            interfaces, gso, gso_size, pgmode.PG_API_MODE_ETHERNET
+        )
+
+    @classmethod
+    def create_loopback_interfaces(cls, count):
         """
         Create loopback interfaces.
 
-        :param interfaces: iterable indexes of the interfaces.
+        :param count: number of interfaces created.
         :returns: List of created interfaces.
         """
-        result = []
-        for i in interfaces:
-            intf = VppLoInterface(cls, i)
+        if not hasattr(cls, "vpp"):
+            cls.lo_interfaces = []
+            return cls.lo_interfaces
+        result = [VppLoInterface(cls) for i in range(count)]
+        for intf in result:
             setattr(cls, intf.name, intf)
-            result.append(intf)
         cls.lo_interfaces = result
         return result
 
+    @classmethod
+    def create_bvi_interfaces(cls, count):
+        """
+        Create BVI interfaces.
+
+        :param count: number of interfaces created.
+        :returns: List of created interfaces.
+        """
+        if not hasattr(cls, "vpp"):
+            cls.bvi_interfaces = []
+            return cls.bvi_interfaces
+        result = [VppBviInterface(cls) for i in range(count)]
+        for intf in result:
+            setattr(cls, intf.name, intf)
+        cls.bvi_interfaces = result
+        return result
+
     @staticmethod
-    def extend_packet(packet, size):
+    def extend_packet(packet, size, padding=" "):
         """
-        Extend packet to given size by padding with spaces
+        Extend packet to given size by padding with spaces or custom padding
         NOTE: Currently works only when Raw layer is present.
 
         :param packet: packet
         :param size: target size
+        :param padding: padding used to extend the payload
 
         """
         packet_len = len(packet) + 4
         extend = size - packet_len
         if extend > 0:
-            packet[Raw].load += ' ' * extend
+            num = (extend // len(padding)) + 1
+            packet[Raw].load += (padding * num)[:extend].encode("ascii")
 
     @classmethod
     def reset_packet_infos(cls):
-        """ Reset the list of packet info objects and packet counts to zero """
+        """Reset the list of packet info objects and packet counts to zero"""
         cls._packet_infos = {}
         cls._packet_count_for_dst_if_idx = {}
 
@@ -519,26 +306,34 @@ class VppTestCase(unittest.TestCase):
 
         :returns: string containing serialized data from packet info
         """
-        return "%d %d %d %d %d" % (info.index, info.src, info.dst,
-                                   info.ip, info.proto)
+
+        # retrieve payload, currently 18 bytes (4 x ints + 1 short)
+        return pack("iiiih", info.index, info.src, info.dst, info.ip, info.proto)
 
     @staticmethod
-    def payload_to_info(payload):
+    def payload_to_info(payload, payload_field="load"):
         """
         Convert packet payload to _PacketInfo object
 
         :param payload: packet payload
-
+        :type payload:  <class 'scapy.packet.Raw'>
+        :param payload_field: packet fieldname of payload "load" for
+                <class 'scapy.packet.Raw'>
+        :type payload_field: str
         :returns: _PacketInfo object containing de-serialized data from payload
 
         """
-        numbers = payload.split()
+
+        # retrieve payload, currently 18 bytes (4 x ints + 1 short)
+        payload_b = getattr(payload, payload_field)[:18]
+
         info = _PacketInfo()
-        info.index = int(numbers[0])
-        info.src = int(numbers[1])
-        info.dst = int(numbers[2])
-        info.ip = int(numbers[3])
-        info.proto = int(numbers[4])
+        info.index, info.src, info.dst, info.ip, info.proto = unpack("iiiih", payload_b)
+
+        # some SRv6 TCs depend on get an exception if bad values are detected
+        if info.index > 0x4000:
+            raise ValueError("Index value is invalid")
+
         return info
 
     def get_next_packet_info(self, info):
@@ -594,325 +389,222 @@ class VppTestCase(unittest.TestCase):
             if info.dst == dst_index:
                 return info
 
-    def assert_equal(self, real_value, expected_value, name_or_class=None):
-        if name_or_class is None:
-            self.assertEqual(real_value, expected_value)
+    def assert_packet_checksums_valid(self, packet, ignore_zero_udp_checksums=True):
+        received = packet.__class__(scapy.compat.raw(packet))
+        udp_layers = ["UDP", "UDPerror"]
+        checksum_fields = ["cksum", "chksum"]
+        checksums = []
+        counter = 0
+        temp = received.__class__(scapy.compat.raw(received))
+        while True:
+            layer = temp.getlayer(counter)
+            if layer:
+                layer = layer.copy()
+                layer.remove_payload()
+                for cf in checksum_fields:
+                    if hasattr(layer, cf):
+                        if (
+                            ignore_zero_udp_checksums
+                            and 0 == getattr(layer, cf)
+                            and layer.name in udp_layers
+                        ):
+                            continue
+                        delattr(temp.getlayer(counter), cf)
+                        checksums.append((counter, cf))
+            else:
+                break
+            counter = counter + 1
+        if 0 == len(checksums):
             return
-        try:
-            msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
-            msg = msg % (getdoc(name_or_class).strip(),
-                         real_value, str(name_or_class(real_value)),
-                         expected_value, str(name_or_class(expected_value)))
-        except:
-            msg = "Invalid %s: %s does not match expected value %s" % (
-                name_or_class, real_value, expected_value)
-
-        self.assertEqual(real_value, expected_value, msg)
-
-    def assert_in_range(self,
-                        real_value,
-                        expected_min,
-                        expected_max,
-                        name=None):
-        if name is None:
-            msg = None
-        else:
-            msg = "Invalid %s: %s out of range <%s,%s>" % (
-                name, real_value, expected_min, expected_max)
-        self.assertTrue(expected_min <= real_value <= expected_max, msg)
-
-    @classmethod
-    def sleep(cls, timeout, remark=None):
-        if hasattr(cls, 'logger'):
-            cls.logger.debug("Sleeping for %ss (%s)" % (timeout, remark))
-        time.sleep(timeout)
-
-
-class TestCasePrinter(object):
-    _shared_state = {}
-
-    def __init__(self):
-        self.__dict__ = self._shared_state
-        if not hasattr(self, "_test_case_set"):
-            self._test_case_set = set()
-
-    def print_test_case_heading_if_first_time(self, case):
-        if case.__class__ not in self._test_case_set:
-            print(double_line_delim)
-            print(colorize(getdoc(case.__class__).splitlines()[0], YELLOW))
-            print(double_line_delim)
-            self._test_case_set.add(case.__class__)
-
-
-class VppTestResult(unittest.TestResult):
-    """
-    @property result_string
-     String variable to store the test case result string.
-    @property errors
-     List variable containing 2-tuples of TestCase instances and strings
-     holding formatted tracebacks. Each tuple represents a test which
-     raised an unexpected exception.
-    @property failures
-     List variable containing 2-tuples of TestCase instances and strings
-     holding formatted tracebacks. Each tuple represents a test where
-     a failure was explicitly signalled using the TestCase.assert*()
-     methods.
-    """
-
-    def __init__(self, stream, descriptions, verbosity):
-        """
-        :param stream File descriptor to store where to report test results.
-            Set to the standard error stream by default.
-        :param descriptions Boolean variable to store information if to use
-            test case descriptions.
-        :param verbosity Integer variable to store required verbosity level.
-        """
-        unittest.TestResult.__init__(self, stream, descriptions, verbosity)
-        self.stream = stream
-        self.descriptions = descriptions
-        self.verbosity = verbosity
-        self.result_string = None
-        self.printer = TestCasePrinter()
-
-    def addSuccess(self, test):
-        """
-        Record a test succeeded result
-
-        :param test:
-
-        """
-        if hasattr(test, 'logger'):
-            test.logger.debug("--- addSuccess() %s.%s(%s) called"
-                              % (test.__class__.__name__,
-                                 test._testMethodName,
-                                 test._testMethodDoc))
-        unittest.TestResult.addSuccess(self, test)
-        self.result_string = colorize("OK", GREEN)
-
-    def addSkip(self, test, reason):
-        """
-        Record a test skipped.
-
-        :param test:
-        :param reason:
-
-        """
-        if hasattr(test, 'logger'):
-            test.logger.debug("--- addSkip() %s.%s(%s) called, reason is %s"
-                              % (test.__class__.__name__,
-                                 test._testMethodName,
-                                 test._testMethodDoc,
-                                 reason))
-        unittest.TestResult.addSkip(self, test, reason)
-        self.result_string = colorize("SKIP", YELLOW)
-
-    def addFailure(self, test, err):
-        """
-        Record a test failed result
-
-        :param test:
-        :param err: error message
-
-        """
-        if hasattr(test, 'logger'):
-            test.logger.debug("--- addFailure() %s.%s(%s) called, err is %s"
-                              % (test.__class__.__name__,
-                                 test._testMethodName,
-                                 test._testMethodDoc, err))
-            test.logger.debug("formatted exception is:\n%s" %
-                              "".join(format_exception(*err)))
-        unittest.TestResult.addFailure(self, test, err)
-        if hasattr(test, 'tempdir'):
-            self.result_string = colorize("FAIL", RED) + \
-                ' [ temp dir used by test case: ' + test.tempdir + ' ]'
-        else:
-            self.result_string = colorize("FAIL", RED) + ' [no temp dir]'
-
-    def addError(self, test, err):
-        """
-        Record a test error result
-
-        :param test:
-        :param err: error message
-
-        """
-        if hasattr(test, 'logger'):
-            test.logger.debug("--- addError() %s.%s(%s) called, err is %s"
-                              % (test.__class__.__name__,
-                                 test._testMethodName,
-                                 test._testMethodDoc, err))
-            test.logger.debug("formatted exception is:\n%s" %
-                              "".join(format_exception(*err)))
-        unittest.TestResult.addError(self, test, err)
-        if hasattr(test, 'tempdir'):
-            self.result_string = colorize("ERROR", RED) + \
-                ' [ temp dir used by test case: ' + test.tempdir + ' ]'
-        else:
-            self.result_string = colorize("ERROR", RED) + ' [no temp dir]'
-
-    def getDescription(self, test):
-        """
-        Get test description
-
-        :param test:
-        :returns: test description
-
-        """
-        # TODO: if none print warning not raise exception
-        short_description = test.shortDescription()
-        if self.descriptions and short_description:
-            return short_description
-        else:
-            return str(test)
-
-    def startTest(self, test):
-        """
-        Start a test
-
-        :param test:
-
-        """
-        self.printer.print_test_case_heading_if_first_time(test)
-        unittest.TestResult.startTest(self, test)
-        if self.verbosity > 0:
-            self.stream.writeln(
-                "Starting " + self.getDescription(test) + " ...")
-            self.stream.writeln(single_line_delim)
-
-    def stopTest(self, test):
-        """
-        Stop a test
-
-        :param test:
-
-        """
-        unittest.TestResult.stopTest(self, test)
-        if self.verbosity > 0:
-            self.stream.writeln(single_line_delim)
-            self.stream.writeln("%-73s%s" % (self.getDescription(test),
-                                             self.result_string))
-            self.stream.writeln(single_line_delim)
-        else:
-            self.stream.writeln("%-73s%s" % (self.getDescription(test),
-                                             self.result_string))
-
-    def printErrors(self):
-        """
-        Print errors from running the test case
-        """
-        self.stream.writeln()
-        self.printErrorList('ERROR', self.errors)
-        self.printErrorList('FAIL', self.failures)
-
-    def printErrorList(self, flavour, errors):
-        """
-        Print error list to the output stream together with error type
-        and test case description.
-
-        :param flavour: error type
-        :param errors: iterable errors
-
-        """
-        for test, err in errors:
-            self.stream.writeln(double_line_delim)
-            self.stream.writeln("%s: %s" %
-                                (flavour, self.getDescription(test)))
-            self.stream.writeln(single_line_delim)
-            self.stream.writeln("%s" % err)
-
-
-class VppTestRunner(unittest.TextTestRunner):
-    """
-    A basic test runner implementation which prints results to standard error.
-    """
-    @property
-    def resultclass(self):
-        """Class maintaining the results of the tests"""
-        return VppTestResult
-
-    def __init__(self, stream=sys.stderr, descriptions=True, verbosity=1,
-                 failfast=False, buffer=False, resultclass=None):
-        # ignore stream setting here, use hard-coded stdout to be in sync
-        # with prints from VppTestCase methods ...
-        super(VppTestRunner, self).__init__(sys.stdout, descriptions,
-                                            verbosity, failfast, buffer,
-                                            resultclass)
-
-    test_option = "TEST"
+        temp = temp.__class__(scapy.compat.raw(temp))
+        for layer, cf in reversed(checksums):
+            calc_sum = getattr(temp[layer], cf)
+            self.assert_equal(
+                getattr(received[layer], cf),
+                calc_sum,
+                "packet checksum on layer #%d: %s" % (layer, temp[layer].name),
+            )
+            self.logger.debug(
+                "Checksum field `%s` on `%s` layer has correct value `%s`"
+                % (cf, temp[layer].name, calc_sum)
+            )
+
+    def assert_checksum_valid(
+        self,
+        received_packet,
+        layer,
+        checksum_field_names=["chksum", "cksum"],
+        ignore_zero_checksum=False,
+    ):
+        """Check checksum of received packet on given layer"""
+        layer_copy = received_packet[layer].copy()
+        layer_copy.remove_payload()
+        field_name = None
+        for f in checksum_field_names:
+            if hasattr(layer_copy, f):
+                field_name = f
+                break
+        if field_name is None:
+            raise Exception(
+                f"Layer `{layer}` has none of checksum fields: `{checksum_field_names}`."
+            )
+        received_packet_checksum = getattr(received_packet[layer], field_name)
+        if ignore_zero_checksum and 0 == received_packet_checksum:
+            return
+        recalculated = received_packet.__class__(scapy.compat.raw(received_packet))
+        delattr(recalculated[layer], field_name)
+        recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
+        self.assert_equal(
+            received_packet_checksum,
+            getattr(recalculated[layer], field_name),
+            f"packet checksum (field: {field_name}) on layer: %s" % layer,
+        )
+
+    def assert_ip_checksum_valid(self, received_packet, ignore_zero_checksum=False):
+        self.assert_checksum_valid(
+            received_packet, "IP", ignore_zero_checksum=ignore_zero_checksum
+        )
+
+    def assert_tcp_checksum_valid(self, received_packet, ignore_zero_checksum=False):
+        self.assert_checksum_valid(
+            received_packet, "TCP", ignore_zero_checksum=ignore_zero_checksum
+        )
+
+    def assert_udp_checksum_valid(self, received_packet, ignore_zero_checksum=True):
+        self.assert_checksum_valid(
+            received_packet, "UDP", ignore_zero_checksum=ignore_zero_checksum
+        )
+
+    def assert_embedded_icmp_checksum_valid(self, received_packet):
+        if received_packet.haslayer(IPerror):
+            self.assert_checksum_valid(received_packet, "IPerror")
+        if received_packet.haslayer(TCPerror):
+            self.assert_checksum_valid(received_packet, "TCPerror")
+        if received_packet.haslayer(UDPerror):
+            self.assert_checksum_valid(
+                received_packet, "UDPerror", ignore_zero_checksum=True
+            )
+        if received_packet.haslayer(ICMPerror):
+            self.assert_checksum_valid(received_packet, "ICMPerror")
+
+    def assert_icmp_checksum_valid(self, received_packet):
+        self.assert_checksum_valid(received_packet, "ICMP")
+        self.assert_embedded_icmp_checksum_valid(received_packet)
+
+    def assert_icmpv6_checksum_valid(self, pkt):
+        if pkt.haslayer(ICMPv6DestUnreach):
+            self.assert_checksum_valid(pkt, "ICMPv6DestUnreach")
+            self.assert_embedded_icmp_checksum_valid(pkt)
+        if pkt.haslayer(ICMPv6EchoRequest):
+            self.assert_checksum_valid(pkt, "ICMPv6EchoRequest")
+        if pkt.haslayer(ICMPv6EchoReply):
+            self.assert_checksum_valid(pkt, "ICMPv6EchoReply")
+
+    def assert_packet_counter_equal(self, counter, expected_value):
+        counter_value = self.get_counter(counter)
+        self.assert_equal(
+            counter_value, expected_value, "packet counter `%s'" % counter
+        )
+
+    def pg_send(self, intf, pkts, worker=None, trace=True):
+        intf.add_stream(pkts, worker=worker)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start(trace=trace)
+
+    def send_and_assert_no_replies(
+        self, intf, pkts, remark="", timeout=None, stats_diff=None, trace=True, msg=None
+    ):
+        if stats_diff:
+            stats_snapshot = self.snapshot_stats(stats_diff)
+
+        self.pg_send(intf, pkts)
 
-    def parse_test_option(self):
         try:
-            f = os.getenv(self.test_option)
-        except:
-            f = None
-        filter_file_name = None
-        filter_class_name = None
-        filter_func_name = None
-        if f:
-            if '.' in f:
-                parts = f.split('.')
-                if len(parts) > 3:
-                    raise Exception("Unrecognized %s option: %s" %
-                                    (self.test_option, f))
-                if len(parts) > 2:
-                    if parts[2] not in ('*', ''):
-                        filter_func_name = parts[2]
-                if parts[1] not in ('*', ''):
-                    filter_class_name = parts[1]
-                if parts[0] not in ('*', ''):
-                    if parts[0].startswith('test_'):
-                        filter_file_name = parts[0]
-                    else:
-                        filter_file_name = 'test_%s' % parts[0]
-            else:
-                if f.startswith('test_'):
-                    filter_file_name = f
-                else:
-                    filter_file_name = 'test_%s' % f
-        return filter_file_name, filter_class_name, filter_func_name
-
-    def filter_tests(self, tests, filter_file, filter_class, filter_func):
-        result = unittest.suite.TestSuite()
-        for t in tests:
-            if isinstance(t, unittest.suite.TestSuite):
-                # this is a bunch of tests, recursively filter...
-                x = self.filter_tests(t, filter_file, filter_class,
-                                      filter_func)
-                if x.countTestCases() > 0:
-                    result.addTest(x)
-            elif isinstance(t, unittest.TestCase):
-                # this is a single test
-                parts = t.id().split('.')
-                # t.id() for common cases like this:
-                # test_classifier.TestClassifier.test_acl_ip
-                # apply filtering only if it is so
-                if len(parts) == 3:
-                    if filter_file and filter_file != parts[0]:
-                        continue
-                    if filter_class and filter_class != parts[1]:
-                        continue
-                    if filter_func and filter_func != parts[2]:
-                        continue
-                result.addTest(t)
-            else:
-                # unexpected object, don't touch it
-                result.addTest(t)
-        return result
-
-    def run(self, test):
-        """
-        Run the tests
+            if not timeout:
+                timeout = 1
+            for i in self.pg_interfaces:
+                i.assert_nothing_captured(timeout=timeout, remark=remark)
+                timeout = 0.1
+        finally:
+            if trace:
+                if msg:
+                    self.logger.debug(f"send_and_assert_no_replies: {msg}")
+                self.logger.debug(self.vapi.cli("show trace"))
+
+        if stats_diff:
+            self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
+
+    def send_and_expect(
+        self,
+        intf,
+        pkts,
+        output,
+        n_rx=None,
+        worker=None,
+        trace=True,
+        msg=None,
+        stats_diff=None,
+    ):
+        if stats_diff:
+            stats_snapshot = self.snapshot_stats(stats_diff)
+
+        if not n_rx:
+            n_rx = 1 if isinstance(pkts, Packet) else len(pkts)
+        self.pg_send(intf, pkts, worker=worker, trace=trace)
+        rx = output.get_capture(n_rx)
+        if trace:
+            if msg:
+                self.logger.debug(f"send_and_expect: {msg}")
+            self.logger.debug(self.vapi.cli("show trace"))
 
-        :param test:
+        if stats_diff:
+            self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
+
+        return rx
+
+    def send_and_expect_load_balancing(
+        self, input, pkts, outputs, worker=None, trace=True
+    ):
+        self.pg_send(input, pkts, worker=worker, trace=trace)
+        rxs = []
+        for oo in outputs:
+            rx = oo._get_capture(1)
+            self.assertNotEqual(0, len(rx), f"0 != len(rx) ({len(rx)})")
+            rxs.append(rx)
+        if trace:
+            self.logger.debug(self.vapi.cli("show trace"))
+        return rxs
 
-        """
-        gc.disable()  # disable garbage collection, we'll do that manually
-        print("Running tests using custom test runner")  # debug message
-        filter_file, filter_class, filter_func = self.parse_test_option()
-        print("Active filters: file=%s, class=%s, function=%s" % (
-            filter_file, filter_class, filter_func))
-        filtered = self.filter_tests(test, filter_file, filter_class,
-                                     filter_func)
-        print("%s out of %s tests match specified filters" % (
-            filtered.countTestCases(), test.countTestCases()))
-        return super(VppTestRunner, self).run(filtered)
+    def send_and_expect_some(self, intf, pkts, output, worker=None, trace=True):
+        self.pg_send(intf, pkts, worker=worker, trace=trace)
+        rx = output._get_capture(1)
+        if trace:
+            self.logger.debug(self.vapi.cli("show trace"))
+        self.assertTrue(len(rx) > 0)
+        self.assertTrue(
+            len(rx) <= len(pkts), f"len(rx) ({len(rx)}) > len(pkts) ({len(pkts)})"
+        )
+        return rx
+
+    def send_and_expect_only(self, intf, pkts, output, timeout=None, stats_diff=None):
+        if stats_diff:
+            stats_snapshot = self.snapshot_stats(stats_diff)
+
+        self.pg_send(intf, pkts)
+        rx = output.get_capture(len(pkts))
+        outputs = [output]
+        if not timeout:
+            timeout = 1
+        for i in self.pg_interfaces:
+            if i not in outputs:
+                i.assert_nothing_captured(timeout=timeout)
+                timeout = 0.1
+
+        if stats_diff:
+            self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
+
+        return rx
+
+
+if __name__ == "__main__":
+    pass