def _is_skip_aarch64_set():
return os.getenv('SKIP_AARCH64', 'n').lower() in ('yes', 'y', '1')
+
is_skip_aarch64_set = _is_skip_aarch64_set()
def _is_platform_aarch64():
return platform.machine() == 'aarch64'
+
is_platform_aarch64 = _is_platform_aarch64()
s = os.getenv("EXTENDED_TESTS", "n")
return True if s.lower() in ("y", "yes", "1") else False
+
running_extended_tests = _running_extended_tests()
os_id = os.getenv("OS_ID", "")
return True if "centos" in os_id.lower() else False
+
running_on_centos = _running_on_centos
cls.set_debug_flags(d)
cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
cls.plugin_path = os.getenv('VPP_PLUGIN_PATH')
+ cls.test_plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
plugin_path = None
if cls.plugin_path is not None:
cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
if plugin_path is not None:
cls.vpp_cmdline.extend(["plugin_path", plugin_path])
+ if cls.test_plugin_path is not None:
+ cls.vpp_cmdline.extend(["test_plugin_path", cls.test_plugin_path])
+
cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
if not ok:
cls.logger.critical("Couldn't stat : {}".format(cls.stats_sock))
+ @classmethod
+ def wait_for_coredump(cls):
+ corefile = cls.tempdir + "/core"
+ if os.path.isfile(corefile):
+ cls.logger.error("Waiting for coredump to complete: %s", corefile)
+ curr_size = os.path.getsize(corefile)
+ deadline = time.time() + 60
+ ok = False
+ while time.time() < deadline:
+ cls.sleep(1)
+ size = curr_size
+ curr_size = os.path.getsize(corefile)
+ if size == curr_size:
+ ok = True
+ break
+ if not ok:
+ cls.logger.error("Timed out waiting for coredump to complete:"
+ " %s", corefile)
+ else:
+ cls.logger.error("Coredump complete: %s, size %d",
+ corefile, curr_size)
+
@classmethod
def setUpClass(cls):
"""
del cls.vapi
cls.vpp.poll()
if cls.vpp.returncode is None:
+ cls.wait_for_coredump()
cls.logger.debug("Sending TERM to vpp")
cls.vpp.terminate()
cls.logger.debug("Waiting for vpp to die")
packet_len = len(packet) + 4
extend = size - packet_len
if extend > 0:
- num = (extend / len(padding)) + 1
- packet[Raw].load += (padding * num)[:extend]
+ num = (extend // len(padding)) + 1
+ packet[Raw].load += (padding * num)[:extend].encode("ascii")
@classmethod
def reset_packet_infos(cls):
for cf in checksum_fields:
if hasattr(layer, cf):
if ignore_zero_udp_checksums and \
- 0 == getattr(layer, cf) and \
- layer.name in udp_layers:
+ 0 == getattr(layer, cf) and \
+ layer.name in udp_layers:
continue
delattr(layer, cf)
checksums.append((counter, cf))
if pkt.haslayer(ICMPv6EchoReply):
self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
- def assert_packet_counter_equal(self, counter, expected_value):
+ def get_packet_counter(self, counter):
if counter.startswith("/"):
counter_value = self.statistics.get_counter(counter)
- self.assert_equal(counter_value, expected_value,
- "packet counter `%s'" % counter)
else:
counters = self.vapi.cli("sh errors").split('\n')
- counter_value = -1
+ counter_value = 0
for i in range(1, len(counters) - 1):
results = counters[i].split()
if results[1] == counter:
counter_value = int(results[0])
break
+ return counter_value
+
+ def assert_packet_counter_equal(self, counter, expected_value):
+ counter_value = self.get_packet_counter(counter)
+ self.assert_equal(counter_value, expected_value,
+ "packet counter `%s'" % counter)
+
+ def assert_error_counter_equal(self, counter, expected_value):
+ counter_value = self.statistics.get_err_counter(counter)
+ self.assert_equal(counter_value, expected_value,
+ "error counter `%s'" % counter)
@classmethod
def sleep(cls, timeout, remark=None):
self.logger.info(single_line_delim)
self.result = self.process.returncode
+
if __name__ == '__main__':
pass