8 from asfframework import VppTestCase, VppTestRunner
9 from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath
13 r = os.path.exists("/dev/qat_dev_processes")
17 # print("NO QAT! EXIT!")
21 def checkOpenSSLVersion():
23 r = "OPENSSL_ROOT_DIR" in os.environ
25 ssl = os.environ["OPENSSL_ROOT_DIR"] + "/bin/openssl version"
27 ssl, stdin=subprocess.PIPE, stdout=subprocess.PIPE, shell=True
30 output = p.stdout.read()
35 # print("openssl version error!")
37 ssl_ver_src = re.findall(r"(\d+)\.+\d+.+\d+", output)
38 ssl_ver = int(ssl_ver_src[0])
44 # print("NO OPENSSL_ROOT_DIR!")
51 ret = checkQat() & checkOpenSSLVersion()
55 class TestTLS(VppTestCase):
56 """TLS Qat Test Case."""
60 super(TestTLS, cls).setUpClass()
63 def tearDownClass(cls):
64 super(TestTLS, cls).tearDownClass()
67 super(TestTLS, self).setUp()
69 self.vapi.session_enable_disable(is_enable=1)
70 self.create_loopback_interfaces(2)
74 for i in self.lo_interfaces:
78 tbl = VppIpTable(self, table_id)
81 i.set_table_ip4(table_id)
85 # Configure namespaces
86 self.vapi.app_namespace_add_del_v4(
87 namespace_id="0", sw_if_index=self.loop0.sw_if_index
89 self.vapi.app_namespace_add_del_v4(
90 namespace_id="1", sw_if_index=self.loop1.sw_if_index
94 for i in self.lo_interfaces:
98 self.vapi.session_enable_disable(is_enable=0)
99 super(TestTLS, self).tearDown()
101 @unittest.skipUnless(checkAll(), "QAT or OpenSSL not satisfied,skip.")
102 def test_tls_transfer(self):
103 """TLS qat echo client/server transfer"""
105 # Add inter-table routes
108 self.loop1.local_ip4,
110 [VppRoutePath("0.0.0.0", 0xFFFFFFFF, nh_table_id=1)],
115 self.loop0.local_ip4,
117 [VppRoutePath("0.0.0.0", 0xFFFFFFFF, nh_table_id=0)],
120 ip_t01.add_vpp_config()
121 ip_t10.add_vpp_config()
123 # Enable QAT engine and TLS async
124 r = self.vapi.tls_openssl_set_engine(
125 async_enable=1, engine="qat", algorithm="RSA,PKEY_CRYPTO", ciphers="RSA"
127 self.assertIsNotNone(r, "No response msg ")
129 # Start builtin server and client
130 uri = "tls://" + self.loop0.local_ip4 + "/1234"
131 error = self.vapi.cli(
132 "test echo server appns 0 fifo-size 4 tls-engine 1 uri " + uri
135 self.logger.critical(error)
136 self.assertNotIn("failed", error)
138 error = self.vapi.cli(
139 "test echo client mbytes 10 appns 1 "
140 "fifo-size 4 no-output test-bytes "
142 "syn-timeout 2 uri " + uri
145 self.logger.critical(error)
146 self.assertNotIn("failed", error)
148 # Delete inter-table routes
149 ip_t01.remove_vpp_config()
150 ip_t10.remove_vpp_config()
153 if __name__ == "__main__":
154 unittest.main(testRunner=VppTestRunner)