Add test suites for crypto sw scheduler engine
[csit.git] / resources / libraries / python / IPsecUtil.py
index 99a470d..2bc10d3 100644 (file)
@@ -320,6 +320,30 @@ class IPsecUtil:
         with PapiSocketExecutor(node) as papi_exec:
             papi_exec.add(cmd, **args).get_reply(err_msg)
 
+    @staticmethod
+    def vpp_ipsec_crypto_sw_scheduler_set_worker(
+            node, worker_index, crypto_enable=False):
+        """Enable or disable crypto on specific vpp worker threads.
+
+        :param node: VPP node to enable or disable crypto for worker threads.
+        :param worker_index: VPP worker thread index.
+        :param crypto_enable: Disable or enable crypto work.
+        :type node: dict
+        :type worker_index: int
+        :type crypto_enable: bool
+        :raises RuntimeError: If failed to enable or disable crypto for worker
+            thread or if no API reply received.
+        """
+        cmd = u"crypto_sw_scheduler_set_worker"
+        err_msg = f"Failed to disable/enable crypto for worker thread " \
+            f"on host {node[u'host']}"
+        args = dict(
+            worker_index=worker_index,
+            crypto_enable=crypto_enable
+        )
+        with PapiSocketExecutor(node) as papi_exec:
+            papi_exec.add(cmd, **args).get_reply(err_msg)
+
     @staticmethod
     def vpp_ipsec_add_sad_entry(
             node, sad_id, spi, crypto_alg, crypto_key, integ_alg=None,