nat: TCP state tracking based on RFC 7857/RFC 6146
[vpp.git] / test / framework.py
old mode 100755 (executable)
new mode 100644 (file)
index b700461..8c0df28
@@ -23,9 +23,10 @@ from traceback import format_exception
 from logging import FileHandler, DEBUG, Formatter
 from enum import Enum
 from abc import ABC, abstractmethod
+from struct import pack, unpack
 
 import scapy.compat
-from scapy.packet import Raw
+from scapy.packet import Raw, Packet
 import hook as hookmodule
 from vpp_pg_interface import VppPGInterface
 from vpp_sub_interface import VppSubInterface
@@ -414,18 +415,7 @@ class VppTestCase(CPUInterface, unittest.TestCase):
         c = os.getenv("CACHE_OUTPUT", "1")
         cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
         cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
-        cls.plugin_path = os.getenv('VPP_PLUGIN_PATH')
-        cls.test_plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
-        cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
-        plugin_path = None
-        if cls.plugin_path is not None:
-            if cls.extern_plugin_path is not None:
-                plugin_path = "%s:%s" % (
-                    cls.plugin_path, cls.extern_plugin_path)
-            else:
-                plugin_path = cls.plugin_path
-        elif cls.extern_plugin_path is not None:
-            plugin_path = cls.extern_plugin_path
+        extern_plugin_path = os.getenv('EXTERN_PLUGINS')
         debug_cli = ""
         if cls.step or cls.debug_gdb or cls.debug_gdbserver:
             debug_cli = "cli-listen localhost:5002"
@@ -453,6 +443,9 @@ class VppTestCase(CPUInterface, unittest.TestCase):
             "api-trace", "{", "on", "}",
             "api-segment", "{", "prefix", cls.get_api_segment_prefix(), "}",
             "cpu", "{", "main-core", str(cls.cpus[0]), ]
+        if extern_plugin_path is not None:
+            cls.extra_vpp_plugin_config.append(
+                "add-path %s" % extern_plugin_path)
         if cls.get_vpp_worker_count():
             cls.vpp_cmdline.extend([
                 "corelist-workers", ",".join([str(x) for x in cls.cpus[1:]])])
@@ -472,10 +465,6 @@ class VppTestCase(CPUInterface, unittest.TestCase):
 
         if cls.extra_vpp_punt_config is not None:
             cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
-        if plugin_path is not None:
-            cls.vpp_cmdline.extend(["plugin_path", plugin_path])
-        if cls.test_plugin_path is not None:
-            cls.vpp_cmdline.extend(["test_plugin_path", cls.test_plugin_path])
 
         if not cls.debug_attach:
             cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
@@ -1042,8 +1031,10 @@ class VppTestCase(CPUInterface, unittest.TestCase):
 
         :returns: string containing serialized data from packet info
         """
-        return "%d %d %d %d %d" % (info.index, info.src, info.dst,
-                                   info.ip, info.proto)
+
+        # retrieve payload, currently 18 bytes (4 x ints + 1 short)
+        return pack('iiiih', info.index, info.src,
+                    info.dst, info.ip, info.proto)
 
     @staticmethod
     def payload_to_info(payload, payload_field='load'):
@@ -1058,13 +1049,18 @@ class VppTestCase(CPUInterface, unittest.TestCase):
         :returns: _PacketInfo object containing de-serialized data from payload
 
         """
-        numbers = getattr(payload, payload_field).split()
+
+        # retrieve payload, currently 18 bytes (4 x ints + 1 short)
+        payload_b = getattr(payload, payload_field)[:18]
+
         info = _PacketInfo()
-        info.index = int(numbers[0])
-        info.src = int(numbers[1])
-        info.dst = int(numbers[2])
-        info.ip = int(numbers[3])
-        info.proto = int(numbers[4])
+        info.index, info.src, info.dst, info.ip, info.proto \
+            = unpack('iiiih', payload_b)
+
+        # some SRv6 TCs depend on get an exception if bad values are detected
+        if info.index > 0x4000:
+            raise ValueError('Index value is invalid')
+
         return info
 
     def get_next_packet_info(self, info):
@@ -1288,12 +1284,17 @@ class VppTestCase(CPUInterface, unittest.TestCase):
             "Finished sleep (%s) - slept %es (wanted %es)",
             remark, after - before, timeout)
 
+    def virtual_sleep(self, timeout, remark=None):
+        self.logger.debug("Moving VPP time by %s (%s)", timeout, remark)
+        self.vapi.cli("set clock adjust %s" % timeout)
+
     def pg_send(self, intf, pkts, worker=None, trace=True):
         intf.add_stream(pkts, worker=worker)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start(trace=trace)
 
-    def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
+    def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None,
+                                   trace=True):
         self.pg_send(intf, pkts)
         if not timeout:
             timeout = 1
@@ -1301,13 +1302,17 @@ class VppTestCase(CPUInterface, unittest.TestCase):
             i.get_capture(0, timeout=timeout)
             i.assert_nothing_captured(remark=remark)
             timeout = 0.1
+        if trace:
+            self.logger.debug(self.vapi.cli("show trace"))
 
     def send_and_expect(self, intf, pkts, output, n_rx=None, worker=None,
                         trace=True):
         if not n_rx:
-            n_rx = len(pkts)
+            n_rx = 1 if isinstance(pkts, Packet) else len(pkts)
         self.pg_send(intf, pkts, worker=worker, trace=trace)
         rx = output.get_capture(n_rx)
+        if trace:
+            self.logger.debug(self.vapi.cli("show trace"))
         return rx
 
     def send_and_expect_only(self, intf, pkts, output, timeout=None):