#!/usr/bin/env python
from __future__ import print_function
-
-import copy
import gc
+import sys
import os
-import random
import select
-import six
-import sys
+import unittest
import tempfile
import time
-import unittest
+import faulthandler
+import random
+import copy
+import psutil
from collections import deque
-from inspect import getdoc, isclass
-from logging import FileHandler, DEBUG, Formatter
from threading import Thread, Event
+from inspect import getdoc, isclass
from traceback import format_exception
-
-import faulthandler
-import psutil
-from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
-from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
-from scapy.layers.inet6 import ICMPv6EchoReply
+from logging import FileHandler, DEBUG, Formatter
from scapy.packet import Raw
-from vpp_papi.vpp_stats import VPPStats
-
from hook import StepHook, PollHook, VppDiedError
+from vpp_pg_interface import VppPGInterface
+from vpp_sub_interface import VppSubInterface
+from vpp_lo_interface import VppLoInterface
+from vpp_papi_provider import VppPapiProvider
+from vpp_papi.vpp_stats import VPPStats
from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
get_logger, colorize
-from util import ppp, is_core_present
-from vpp_lo_interface import VppLoInterface
from vpp_object import VppObjectRegistry
-from vpp_papi_provider import VppPapiProvider
-from vpp_pg_interface import VppPGInterface
-from vpp_sub_interface import VppSubInterface
-
+from util import ppp, is_core_present
+from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
+from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
+from scapy.layers.inet6 import ICMPv6EchoReply
if os.name == 'posix' and sys.version_info[0] < 3:
# using subprocess32 is recommended by python official documentation
# @ https://docs.python.org/2/library/subprocess.html
else:
raise Exception("Unrecognized DEBUG option: '%s'" % d)
- @classmethod
- def get_least_used_cpu(self):
+ @staticmethod
+ def get_least_used_cpu():
cpu_usage_list = [set(range(psutil.cpu_count()))]
vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
if 'vpp_main' == p.info['name']]
return random.choice(tuple(min_usage_set))
- @staticmethod
+ @classmethod
def print_header(cls):
if not hasattr(cls, '_header_printed'):
print(double_line_delim)
print("Now is the time to attach a gdb by running the above "
"command and set up breakpoints etc.")
print(single_line_delim)
- six.input("Press ENTER to continue running the testcase...")
+ raw_input("Press ENTER to continue running the testcase...")
@classmethod
def run_vpp(cls):
"""
gc.collect() # run garbage collection first
random.seed()
- cls.print_header(cls)
+ cls.print_header()
cls.logger = get_logger(cls.__name__)
if hasattr(cls, 'parallel_handler'):
cls.logger.addHandler(cls.parallel_handler)
print(double_line_delim)
print("VPP or GDB server is still running")
print(single_line_delim)
- six.input("When done debugging, press ENTER to kill the "
+ raw_input("When done debugging, press ENTER to kill the "
"process and finish running the testcase...")
# first signal that we want to stop the pump thread, then wake it up
:param test:
"""
- test.print_header(test.__class__)
+ test.print_header()
unittest.TestResult.startTest(self, test)
if self.verbosity > 0: