make test: improve stability 20/5120/2
authorKlement Sekera <ksekera@cisco.com>
Tue, 14 Feb 2017 01:55:31 +0000 (02:55 +0100)
committerKlement Sekera <ksekera@cisco.com>
Tue, 14 Feb 2017 02:38:37 +0000 (03:38 +0100)
Disable automatic garbage collection and run it manually before
running each test case to minimize stalls. Improve vpp subprocess
cleanup. Reduce helper thread count to one and properly clean that
thread once it's not needed.

Change-Id: I3ea78ed9628552b5ef3ff29cc7bcf2d3fc42f2c3
Signed-off-by: Klement Sekera <ksekera@cisco.com>
test/Makefile
test/framework.py
test/util.py
test/vpp_object.py
test/vpp_papi_provider.py
test/vpp_pg_interface.py

index 5c0d48f..65b5a9b 100644 (file)
@@ -5,8 +5,14 @@ ifndef VPP_PYTHON_PREFIX
        $(error VPP_PYTHON_PREFIX is not set)
 endif
 
+UNITTEST_EXTRA_OPTS=""
+
+ifeq ($(FAILFAST),1)
+UNITTEST_EXTRA_OPTS="-f"
+endif
+
 PYTHON_VENV_PATH=$(VPP_PYTHON_PREFIX)/virtualenv
-PYTHON_DEPENDS=scapy==2.3.3 pexpect
+PYTHON_DEPENDS=scapy==2.3.3 pexpect subprocess32
 SCAPY_SOURCE=$(PYTHON_VENV_PATH)/lib/python2.7/site-packages/
 BUILD_COV_DIR = $(BR)/test-cov
 
@@ -35,7 +41,7 @@ $(PAPI_INSTALL_DONE): $(PIP_PATCH_DONE)
        @touch $@
 
 define retest-func
-       @bash -c "source $(PYTHON_VENV_PATH)/bin/activate && python run_tests.py discover -p test_\"*.py\""
+       @bash -c "source $(PYTHON_VENV_PATH)/bin/activate && python run_tests.py discover $(UNITTEST_EXTRA_OPTS) -p test_\"*.py\""
 endef
 
 test: reset verify-python-path $(PAPI_INSTALL_DONE)
@@ -103,6 +109,7 @@ help:
        @echo ""
        @echo "Arguments controlling test runs:"
        @echo " V=[0|1|2]            - set test verbosity level"
+       @echo " FAILFAST=[0|1]       - fail fast if 1, complete all tests if 0"
        @echo " DEBUG=<type>         - set VPP debugging kind"
        @echo "    DEBUG=core        - detect coredump and load it in gdb on crash"
        @echo "    DEBUG=gdb         - allow easy debugging by printing VPP PID "
index 6a0ec96..8dd61aa 100644 (file)
@@ -1,23 +1,33 @@
 #!/usr/bin/env python
 
-import subprocess
+from __future__ import print_function
+import gc
+import sys
+import os
+import select
 import unittest
 import tempfile
 import time
 import resource
 from collections import deque
-from threading import Thread
+from threading import Thread, Event
 from inspect import getdoc
 from traceback import format_exception
+from logging import FileHandler, DEBUG, Formatter
+from scapy.packet import Raw
 from hook import StepHook, PollHook
 from vpp_pg_interface import VppPGInterface
 from vpp_sub_interface import VppSubInterface
 from vpp_lo_interface import VppLoInterface
 from vpp_papi_provider import VppPapiProvider
-from scapy.packet import Raw
-from logging import FileHandler, DEBUG
 from log import *
 from vpp_object import VppObjectRegistry
+if os.name == 'posix' and sys.version_info[0] < 3:
+    # using subprocess32 is recommended by python official documentation
+    # @ https://docs.python.org/2/library/subprocess.html
+    import subprocess32 as subprocess
+else:
+    import subprocess
 
 """
   Test framework module.
@@ -51,9 +61,21 @@ class _PacketInfo(object):
         return index and src and dst and data
 
 
-def pump_output(out, deque):
-    for line in iter(out.readline, b''):
-        deque.append(line)
+def pump_output(testclass):
+    """ pump output from vpp stdout/stderr to proper queues """
+    while not testclass.pump_thread_stop_flag.wait(0):
+        readable = select.select([testclass.vpp.stdout.fileno(),
+                                  testclass.vpp.stderr.fileno(),
+                                  testclass.pump_thread_wakeup_pipe[0]],
+                                 [], [])[0]
+        if testclass.vpp.stdout.fileno() in readable:
+            read = os.read(testclass.vpp.stdout.fileno(), 1024)
+            testclass.vpp_stdout_deque.append(read)
+        if testclass.vpp.stderr.fileno() in readable:
+            read = os.read(testclass.vpp.stderr.fileno(), 1024)
+            testclass.vpp_stderr_deque.append(read)
+        # ignoring the dummy pipe here intentionally - the flag will take care
+        # of properly terminating the loop
 
 
 class VppTestCase(unittest.TestCase):
@@ -181,10 +203,14 @@ class VppTestCase(unittest.TestCase):
         Perform class setup before running the testcase
         Remove shared memory files, start vpp and connect the vpp-api
         """
+        gc.collect()  # run garbage collection first
         cls.logger = getLogger(cls.__name__)
         cls.tempdir = tempfile.mkdtemp(
             prefix='vpp-unittest-' + cls.__name__ + '-')
         file_handler = FileHandler("%s/log.txt" % cls.tempdir)
+        file_handler.setFormatter(
+            Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
+                      datefmt="%H:%M:%S"))
         file_handler.setLevel(DEBUG)
         cls.logger.addHandler(file_handler)
         cls.shm_prefix = cls.tempdir.split("/")[-1]
@@ -206,20 +232,18 @@ class VppTestCase(unittest.TestCase):
         try:
             cls.run_vpp()
             cls.vpp_stdout_deque = deque()
-            cls.vpp_stdout_reader_thread = Thread(target=pump_output, args=(
-                cls.vpp.stdout, cls.vpp_stdout_deque))
-            cls.vpp_stdout_reader_thread.start()
             cls.vpp_stderr_deque = deque()
-            cls.vpp_stderr_reader_thread = Thread(target=pump_output, args=(
-                cls.vpp.stderr, cls.vpp_stderr_deque))
-            cls.vpp_stderr_reader_thread.start()
+            cls.pump_thread_stop_flag = Event()
+            cls.pump_thread_wakeup_pipe = os.pipe()
+            cls.pump_thread = Thread(target=pump_output, args=(cls,))
+            cls.pump_thread.start()
             cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls)
             if cls.step:
                 hook = StepHook(cls)
             else:
                 hook = PollHook(cls)
             cls.vapi.register_hook(hook)
-            time.sleep(0.1)
+            cls.sleep(0.1, "after vpp startup, before initial poll")
             hook.poll_vpp()
             try:
                 cls.vapi.connect()
@@ -251,12 +275,25 @@ class VppTestCase(unittest.TestCase):
                 raw_input("When done debugging, press ENTER to kill the "
                           "process and finish running the testcase...")
 
+        os.write(cls.pump_thread_wakeup_pipe[1], 'ding dong wake up')
+        cls.pump_thread_stop_flag.set()
+        if hasattr(cls, 'pump_thread'):
+            cls.logger.debug("Waiting for pump thread to stop")
+            cls.pump_thread.join()
+        if hasattr(cls, 'vpp_stderr_reader_thread'):
+            cls.logger.debug("Waiting for stdderr pump to stop")
+            cls.vpp_stderr_reader_thread.join()
+
         if hasattr(cls, 'vpp'):
             if hasattr(cls, 'vapi'):
                 cls.vapi.disconnect()
+                del cls.vapi
             cls.vpp.poll()
             if cls.vpp.returncode is None:
+                cls.logger.debug("Sending TERM to vpp")
                 cls.vpp.terminate()
+                cls.logger.debug("Waiting for vpp to die")
+                cls.vpp.communicate()
             del cls.vpp
 
         if hasattr(cls, 'vpp_stdout_deque'):
@@ -306,7 +343,7 @@ class VppTestCase(unittest.TestCase):
                            self._testMethodDoc))
         if self.vpp_dead:
             raise Exception("VPP is dead when setting up the test")
-        time.sleep(.1)
+        self.sleep(.1, "during setUp")
         self.vpp_stdout_deque.append(
             "--- test setUp() for %s.%s(%s) starts here ---\n" %
             (self.__class__.__name__, self._testMethodName,
@@ -351,9 +388,7 @@ class VppTestCase(unittest.TestCase):
         for stamp, cap_name in cls._zombie_captures:
             wait = stamp + capture_ttl - now
             if wait > 0:
-                cls.logger.debug("Waiting for %ss before deleting capture %s",
-                                 wait, cap_name)
-                time.sleep(wait)
+                cls.sleep(wait, "before deleting capture %s" % cap_name)
                 now = time.time()
             cls.logger.debug("Removing zombie capture %s" % cap_name)
             cls.vapi.cli('packet-generator delete %s' % cap_name)
@@ -552,8 +587,10 @@ class VppTestCase(unittest.TestCase):
                 name, real_value, expected_min, expected_max)
         self.assertTrue(expected_min <= real_value <= expected_max, msg)
 
-    def sleep(self, timeout):
-        self.logger.debug("Sleeping for %ss" % timeout)
+    @classmethod
+    def sleep(cls, timeout, remark=None):
+        if hasattr(cls, 'logger'):
+            cls.logger.debug("Sleeping for %ss (%s)" % (timeout, remark))
         time.sleep(timeout)
 
 
@@ -817,6 +854,7 @@ class VppTestRunner(unittest.TextTestRunner):
         :param test:
 
         """
+        gc.disable()  # disable garbage collection, we'll do that manually
         print("Running tests using custom test runner")  # debug message
         filter_file, filter_class, filter_func = self.parse_test_option()
         print("Active filters: file=%s, class=%s, function=%s" % (
index 24e9af4..a648490 100644 (file)
@@ -1,3 +1,5 @@
+""" test framework utilities """
+
 import socket
 import sys
 from abc import abstractmethod, ABCMeta
index 1997bf5..0d74baa 100644 (file)
@@ -1,3 +1,5 @@
+""" abstract vpp object and object registry """
+
 from abc import ABCMeta, abstractmethod
 
 
@@ -5,9 +7,6 @@ class VppObject(object):
     """ Abstract vpp object """
     __metaclass__ = ABCMeta
 
-    def __init__(self):
-        VppObjectRegistry().register(self)
-
     @abstractmethod
     def add_vpp_config(self):
         """ Add the configuration for this object to vpp. """
@@ -42,13 +41,13 @@ class VppObjectRegistry(object):
         if not hasattr(self, "_object_dict"):
             self._object_dict = dict()
 
-    def register(self, o, logger):
+    def register(self, obj, logger):
         """ Register an object in the registry. """
-        if not o.object_id() in self._object_dict:
-            self._object_registry.append(o)
-            self._object_dict[o.object_id()] = o
+        if obj.object_id() not in self._object_dict:
+            self._object_registry.append(obj)
+            self._object_dict[obj.object_id()] = obj
         else:
-            logger.debug("REG: duplicate add, ignoring (%s)" % o)
+            logger.debug("REG: duplicate add, ignoring (%s)" % obj)
 
     def remove_vpp_config(self, logger):
         """
@@ -60,23 +59,23 @@ class VppObjectRegistry(object):
             return
         logger.info("REG: Removing VPP configuration for registered objects")
         # remove the config in reverse order as there might be dependencies
-        for o in reversed(self._object_registry):
-            if o.query_vpp_config():
-                logger.info("REG: Removing configuration for %s" % o)
-                o.remove_vpp_config()
+        for obj in reversed(self._object_registry):
+            if obj.query_vpp_config():
+                logger.info("REG: Removing configuration for %s" % obj)
+                obj.remove_vpp_config()
             else:
                 logger.info(
                     "REG: Skipping removal for %s, configuration not present" %
-                    o)
+                    obj)
         failed = []
-        for o in self._object_registry:
-            if o.query_vpp_config():
-                failed.append(o)
+        for obj in self._object_registry:
+            if obj.query_vpp_config():
+                failed.append(obj)
         self._object_registry = []
         self._object_dict = dict()
         if failed:
             logger.error("REG: Couldn't remove configuration for object(s):")
-            for x in failed:
-                logger.error(repr(x))
+            for obj in failed:
+                logger.error(repr(obj))
             raise Exception("Couldn't remove configuration for object(s): %s" %
                             (", ".join(str(x) for x in failed)))
index 39efa9e..7a508a4 100644 (file)
@@ -87,6 +87,12 @@ class VppPapiProvider(object):
 
     def wait_for_event(self, timeout, name=None):
         """ Wait for and return next event. """
+        if name:
+            self.test_class.logger.debug("Expecting event within %ss",
+                                         timeout)
+        else:
+            self.test_class.logger.debug("Expecting event '%s' within %ss",
+                                         name, timeout)
         if self._events:
             self.test_class.logger.debug("Not waiting, event already queued")
         limit = time.time() + timeout
@@ -101,8 +107,6 @@ class VppPapiProvider(object):
                                              (name, e))
                 return e
             time.sleep(0)  # yield
-        if name is not None:
-            raise Exception("Event %s did not occur within timeout" % name)
         raise Exception("Event did not occur within timeout")
 
     def __call__(self, name, event):
index d6e6684..4707f0b 100644 (file)
@@ -16,6 +16,11 @@ from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_ismaddr
 from scapy.utils import inet_pton, inet_ntop
 
 
+class CaptureTimeoutError(Exception):
+    """ Exception raised if capture or packet doesn't appear within timeout """
+    pass
+
+
 def is_ipv6_misc(p):
     """ Is packet one of uninteresting IPv6 broadcasts? """
     if p.haslayer(ICMPv6ND_RA):
@@ -103,13 +108,15 @@ class VppPGInterface(VppInterface):
         """ Enable capture on this packet-generator interface"""
         try:
             if os.path.isfile(self.out_path):
-                os.rename(self.out_path,
-                          "%s/history.[timestamp:%f].[%s-counter:%04d].%s" %
-                          (self.test.tempdir,
-                           time.time(),
-                           self.name,
-                           self.out_history_counter,
-                           self._out_file))
+                name = "%s/history.[timestamp:%f].[%s-counter:%04d].%s" % \
+                    (self.test.tempdir,
+                     time.time(),
+                     self.name,
+                     self.out_history_counter,
+                     self._out_file)
+                self.test.logger.debug("Renaming %s->%s" %
+                                       (self.out_path, name))
+                os.rename(self.out_path, name)
         except:
             pass
         # FIXME this should be an API, but no such exists atm
@@ -125,13 +132,15 @@ class VppPGInterface(VppInterface):
         """
         try:
             if os.path.isfile(self.in_path):
-                os.rename(self.in_path,
-                          "%s/history.[timestamp:%f].[%s-counter:%04d].%s" %
-                          (self.test.tempdir,
-                           time.time(),
-                           self.name,
-                           self.in_history_counter,
-                           self._in_file))
+                name = "%s/history.[timestamp:%f].[%s-counter:%04d].%s" %\
+                    (self.test.tempdir,
+                     time.time(),
+                     self.name,
+                     self.in_history_counter,
+                     self._in_file)
+                self.test.logger.debug("Renaming %s->%s" %
+                                       (self.in_path, name))
+                os.rename(self.in_path, name)
         except:
             pass
         wrpcap(self.in_path, pkts)
@@ -263,57 +272,50 @@ class VppPGInterface(VppInterface):
 
         :returns: True/False if the file is present or appears within timeout
         """
-        limit = time.time() + timeout
+        deadline = time.time() + timeout
         if not os.path.isfile(self.out_path):
             self.test.logger.debug("Waiting for capture file %s to appear, "
                                    "timeout is %ss" % (self.out_path, timeout))
         else:
-            self.test.logger.debug(
-                "Capture file %s already exists" %
-                self.out_path)
+            self.test.logger.debug("Capture file %s already exists" %
+                                   self.out_path)
             return True
-        while time.time() < limit:
+        while time.time() < deadline:
             if os.path.isfile(self.out_path):
                 break
             time.sleep(0)  # yield
         if os.path.isfile(self.out_path):
             self.test.logger.debug("Capture file appeared after %fs" %
-                                   (time.time() - (limit - timeout)))
+                                   (time.time() - (deadline - timeout)))
         else:
             self.test.logger.debug("Timeout - capture file still nowhere")
             return False
         return True
 
-    def wait_for_packet_data(self, deadline):
+    def verify_enough_packet_data_in_pcap(self):
         """
-        Wait until enough data is available in the file handled by internal
-        pcap reader so that a whole packet can be read.
+        Check if enough data is available in file handled by internal pcap
+        reader so that a whole packet can be read.
 
-        :param deadline: timestamp by which the data must arrive
-        :raises Exception: if not enough data by deadline
+        :returns: True if enough data present, else False
         """
         orig_pos = self._pcap_reader.f.tell()  # save file position
         enough_data = False
-        while time.time() < deadline:
-            # read packet header from pcap
-            hdr = self._pcap_reader.f.read(16)
-            if len(hdr) < 16:
-                time.sleep(0)  # yield
-                continue  # cannot read full header, continue looping
-            # find the capture length - caplen
+        # read packet header from pcap
+        packet_header_size = 16
+        caplen = None
+        end_pos = None
+        hdr = self._pcap_reader.f.read(packet_header_size)
+        if len(hdr) == packet_header_size:
+            # parse the capture length - caplen
             sec, usec, caplen, wirelen = struct.unpack(
                 self._pcap_reader.endian + "IIII", hdr)
             self._pcap_reader.f.seek(0, 2)  # seek to end of file
             end_pos = self._pcap_reader.f.tell()  # get position at end
             if end_pos >= orig_pos + len(hdr) + caplen:
                 enough_data = True  # yay, we have enough data
-                break
-            self.test.logger.debug("Partial packet data in pcap")
-            time.sleep(0)  # yield
         self._pcap_reader.f.seek(orig_pos, 0)  # restore original position
-        if not enough_data:
-            raise Exception(
-                "Not enough data to read a full packet within deadline")
+        return enough_data
 
     def wait_for_packet(self, timeout, filter_out_fn=is_ipv6_misc):
         """
@@ -327,8 +329,8 @@ class VppPGInterface(VppInterface):
         deadline = time.time() + timeout
         if self._pcap_reader is None:
             if not self.wait_for_capture_file(timeout):
-                raise Exception("Capture file %s did not appear within "
-                                "timeout" % self.out_path)
+                raise CaptureTimeoutError("Capture file %s did not appear "
+                                          "within timeout" % self.out_path)
             while time.time() < deadline:
                 try:
                     self._pcap_reader = PcapReader(self.out_path)
@@ -338,12 +340,20 @@ class VppPGInterface(VppInterface):
                         "Exception in scapy.PcapReader(%s): %s" %
                         (self.out_path, format_exc()))
         if not self._pcap_reader:
-            raise Exception("Capture file %s did not appear within "
-                            "timeout" % self.out_path)
+            raise CaptureTimeoutError("Capture file %s did not appear within "
+                                      "timeout" % self.out_path)
 
-        self.test.logger.debug("Waiting for packet")
-        while time.time() < deadline:
-            self.wait_for_packet_data(deadline)
+        poll = False
+        if timeout > 0:
+            self.test.logger.debug("Waiting for packet")
+        else:
+            poll = True
+            self.test.logger.debug("Polling for packet")
+        while time.time() < deadline or poll:
+            if not self.verify_enough_packet_data_in_pcap():
+                time.sleep(0)  # yield
+                poll = False
+                continue
             p = self._pcap_reader.recv()
             if p is not None:
                 if filter_out_fn is not None and filter_out_fn(p):
@@ -356,8 +366,9 @@ class VppPGInterface(VppInterface):
                         (time.time() - (deadline - timeout)))
                     return p
             time.sleep(0)  # yield
+            poll = False
         self.test.logger.debug("Timeout - no packets received")
-        raise Exception("Packet didn't arrive within timeout")
+        raise CaptureTimeoutError("Packet didn't arrive within timeout")
 
     def create_arp_req(self):
         """Create ARP request applicable for this interface"""