perf: add TCP Nginx+LDPRELOAD suites
[csit.git] / resources / libraries / python / KubernetesUtils.py
1 # Copyright (c) 2019 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """Library to control Kubernetes kubectl."""
15
16 from functools import reduce
17 from io import open
18 from time import sleep
19
20 from resources.libraries.python.Constants import Constants
21 from resources.libraries.python.CpuUtils import CpuUtils
22 from resources.libraries.python.ssh import SSH, exec_cmd_no_error
23 from resources.libraries.python.topology import NodeType
24 from resources.libraries.python.VppConfigGenerator import VppConfigGenerator
25
26 __all__ = [u"KubernetesUtils"]
27
28 # Maximum number of retries to check if PODs are running or deleted.
29 MAX_RETRY = 48
30
31
32 class KubernetesUtils:
33     """Kubernetes utilities class."""
34
35     def __init__(self):
36         """Initialize KubernetesUtils class."""
37
38     @staticmethod
39     def load_docker_image_on_node(node, image_path):
40         """Load Docker container image from file on node.
41
42         :param node: DUT node.
43         :param image_path: Container image path.
44         :type node: dict
45         :type image_path: str
46         :raises RuntimeError: If loading image failed on node.
47         """
48         command = f"docker load -i {image_path}"
49         message = f"Failed to load Docker image on {node[u'host']}."
50         exec_cmd_no_error(
51             node, command, timeout=240, sudo=True, message=message
52         )
53
54         command = u"docker rmi $(sudo docker images -f 'dangling=true' -q)"
55         message = f"Failed to clean Docker images on {node[u'host']}."
56         try:
57             exec_cmd_no_error(
58                 node, command, timeout=240, sudo=True, message=message
59             )
60         except RuntimeError:
61             pass
62
63     @staticmethod
64     def load_docker_image_on_all_duts(nodes, image_path):
65         """Load Docker container image from file on all DUTs.
66
67         :param nodes: Topology nodes.
68         :param image_path: Container image path.
69         :type nodes: dict
70         :type image_path: str
71         """
72         for node in nodes.values():
73             if node[u"type"] == NodeType.DUT:
74                 KubernetesUtils.load_docker_image_on_node(node, image_path)
75
76     @staticmethod
77     def setup_kubernetes_on_node(node):
78         """Set up Kubernetes on node.
79
80         :param node: DUT node.
81         :type node: dict
82         :raises RuntimeError: If Kubernetes setup failed on node.
83         """
84         ssh = SSH()
85         ssh.connect(node)
86
87         cmd = f"{Constants.REMOTE_FW_DIR}/{Constants.RESOURCES_LIB_SH}/" \
88             f"k8s_setup.sh deploy_calico"
89         ret_code, _, _ = ssh.exec_command(cmd, timeout=240)
90         if int(ret_code) != 0:
91             raise RuntimeError(
92                 "Failed to setup Kubernetes on {node[u'host']}."
93             )
94
95         KubernetesUtils.wait_for_kubernetes_pods_on_node(
96             node, nspace=u"kube-system"
97         )
98
99     @staticmethod
100     def setup_kubernetes_on_all_duts(nodes):
101         """Set up Kubernetes on all DUTs.
102
103         :param nodes: Topology nodes.
104         :type nodes: dict
105         """
106         for node in nodes.values():
107             if node[u"type"] == NodeType.DUT:
108                 KubernetesUtils.setup_kubernetes_on_node(node)
109
110     @staticmethod
111     def destroy_kubernetes_on_node(node):
112         """Destroy Kubernetes on node.
113
114         :param node: DUT node.
115         :type node: dict
116         :raises RuntimeError: If destroying Kubernetes failed.
117         """
118         ssh = SSH()
119         ssh.connect(node)
120
121         cmd = f"{Constants.REMOTE_FW_DIR}/{Constants.RESOURCES_LIB_SH}/" \
122             f"k8s_setup.sh destroy"
123
124         ret_code, _, _ = ssh.exec_command(cmd, timeout=120)
125         if int(ret_code) != 0:
126             raise RuntimeError(
127                 f"Failed to destroy Kubernetes on {node[u'host']}."
128             )
129
130     @staticmethod
131     def destroy_kubernetes_on_all_duts(nodes):
132         """Destroy Kubernetes on all DUTs.
133
134         :param nodes: Topology nodes.
135         :type nodes: dict
136         """
137         for node in nodes.values():
138             if node[u"type"] == NodeType.DUT:
139                 KubernetesUtils.destroy_kubernetes_on_node(node)
140
141     @staticmethod
142     def apply_kubernetes_resource_on_node(node, yaml_file, **kwargs):
143         """Apply Kubernetes resource on node.
144
145         :param node: DUT node.
146         :param yaml_file: YAML configuration file.
147         :param kwargs: Key-value pairs to replace in YAML template.
148         :type node: dict
149         :type yaml_file: str
150         :type kwargs: dict
151         :raises RuntimeError: If applying Kubernetes template failed.
152         """
153         ssh = SSH()
154         ssh.connect(node)
155
156         fqn_file = f"{Constants.RESOURCES_TPL_K8S}/{yaml_file}"
157         with open(fqn_file, 'r') as src_file:
158             stream = src_file.read()
159             data = reduce(
160                 lambda a, kv: a.replace(*kv), list(kwargs.items()), stream
161             )
162             cmd = f"cat <<EOF | kubectl apply -f - \n{data}\nEOF"
163
164             ret_code, _, _ = ssh.exec_command_sudo(cmd)
165             if int(ret_code) != 0:
166                 raise RuntimeError(
167                     f"Failed to apply Kubernetes template {yaml_file} "
168                     f"on {node[u'host']}."
169                 )
170
171     @staticmethod
172     def apply_kubernetes_resource_on_all_duts(nodes, yaml_file, **kwargs):
173         """Apply Kubernetes resource on all DUTs.
174
175         :param nodes: Topology nodes.
176         :param yaml_file: YAML configuration file.
177         :param kwargs: Key-value pairs to replace in YAML template.
178         :type nodes: dict
179         :type yaml_file: str
180         :type kwargs: dict
181         """
182         for node in nodes.values():
183             if node[u"type"] == NodeType.DUT:
184                 KubernetesUtils.apply_kubernetes_resource_on_node(
185                     node, yaml_file, **kwargs
186                 )
187
188     @staticmethod
189     def create_kubernetes_cm_from_file_on_node(node, nspace, name, **kwargs):
190         """Create Kubernetes ConfigMap from file on node.
191
192         :param node: DUT node.
193         :param nspace: Kubernetes namespace.
194         :param name: ConfigMap name.
195         :param kwargs: Named parameters.
196         :type node: dict
197         :type nspace: str
198         :type name: str
199         :param kwargs: dict
200         :raises RuntimeError: If creating Kubernetes ConfigMap failed.
201         """
202         ssh = SSH()
203         ssh.connect(node)
204
205         nspace = f"-n {nspace}" if nspace else u""
206         from_file = u" ".join(
207             f"--from-file={key}={kwargs[key]} " for key in kwargs
208         )
209         cmd = f"kubectl create {nspace} configmap {name} {from_file}"
210
211         ret_code, _, _ = ssh.exec_command_sudo(cmd)
212         if int(ret_code) != 0:
213             raise RuntimeError(
214                 f"Failed to create Kubernetes ConfigMap on {node[u'host']}."
215             )
216
217     @staticmethod
218     def create_kubernetes_cm_from_file_on_all_duts(
219             nodes, nspace, name, **kwargs):
220         """Create Kubernetes ConfigMap from file on all DUTs.
221
222         :param nodes: Topology nodes.
223         :param nspace: Kubernetes namespace.
224         :param name: ConfigMap name.
225         :param kwargs: Named parameters.
226         :type nodes: dict
227         :type nspace: str
228         :type name: str
229         :param kwargs: dict
230         """
231         for node in nodes.values():
232             if node[u"type"] == NodeType.DUT:
233                 KubernetesUtils.create_kubernetes_cm_from_file_on_node(
234                     node, nspace, name, **kwargs
235                 )
236
237     @staticmethod
238     def delete_kubernetes_resource_on_node(
239             node, nspace, name=None, rtype=u"po,cm,deploy,rs,rc,svc"):
240         """Delete Kubernetes resource on node.
241
242         :param node: DUT node.
243         :param nspace: Kubernetes namespace.
244         :param rtype: Kubernetes resource type.
245         :param name: Name of resource (Default: all).
246         :type node: dict
247         :type nspace: str
248         :type rtype: str
249         :type name: str
250         :raises RuntimeError: If retrieving or deleting Kubernetes resource
251             failed.
252         """
253         ssh = SSH()
254         ssh.connect(node)
255
256         name = f"{name}" if name else u"--all"
257         nspace = f"-n {nspace}" if nspace else u""
258         cmd = f"kubectl delete {nspace} {rtype} {name}"
259
260         ret_code, _, _ = ssh.exec_command_sudo(cmd, timeout=120)
261         if int(ret_code) != 0:
262             raise RuntimeError(
263                 f"Failed to delete Kubernetes resources on {node[u'host']}."
264             )
265
266         cmd = f"kubectl get {nspace} pods --no-headers"
267         for _ in range(MAX_RETRY):
268             ret_code, stdout, stderr = ssh.exec_command_sudo(cmd)
269             if int(ret_code) != 0:
270                 raise RuntimeError(
271                     f"Failed to retrieve Kubernetes resources "
272                     f"on {node[u'host']}."
273                 )
274             if name == u"--all":
275                 ready = False
276                 for line in stderr.splitlines():
277                     if u"No resources found." in line:
278                         ready = True
279                 if ready:
280                     break
281             else:
282                 ready = False
283                 for line in stdout.splitlines():
284                     try:
285                         state = line.split()[1].split(u"/")
286                         ready = bool(
287                             u"Running" in line and state == state[::-1]
288                         )
289                         if not ready:
290                             break
291                     except (ValueError, IndexError):
292                         ready = False
293                 if ready:
294                     break
295             sleep(5)
296         else:
297             raise RuntimeError(
298                 f"Failed to delete Kubernetes resources on {node[u'host']}."
299             )
300
301     @staticmethod
302     def delete_kubernetes_resource_on_all_duts(
303             nodes, nspace, name=None, rtype=u"po,cm,deploy,rs,rc,svc"):
304         """Delete all Kubernetes resource on all DUTs.
305
306         :param nodes: Topology nodes.
307         :param nspace: Kubernetes namespace.
308         :param rtype: Kubernetes resource type.
309         :param name: Name of resource.
310         :type nodes: dict
311         :type nspace: str
312         :type rtype: str
313         :type name: str
314         """
315         for node in nodes.values():
316             if node[u"type"] == NodeType.DUT:
317                 KubernetesUtils.delete_kubernetes_resource_on_node(
318                     node, nspace, name, rtype
319                 )
320
321     @staticmethod
322     def describe_kubernetes_resource_on_node(node, nspace):
323         """Describe all Kubernetes PODs in namespace on node.
324
325         :param node: DUT node.
326         :param nspace: Kubernetes namespace.
327         :type node: dict
328         :type nspace: str
329         """
330         ssh = SSH()
331         ssh.connect(node)
332
333         nspace = f"-n {nspace}" if nspace else u""
334         cmd = f"kubectl describe {nspace} all"
335
336         ssh.exec_command_sudo(cmd)
337
338     @staticmethod
339     def describe_kubernetes_resource_on_all_duts(nodes, nspace):
340         """Describe all Kubernetes PODs in namespace on all DUTs.
341
342         :param nodes: Topology nodes.
343         :param nspace: Kubernetes namespace.
344         :type nodes: dict
345         :type nspace: str
346         """
347         for node in nodes.values():
348             if node[u"type"] == NodeType.DUT:
349                 KubernetesUtils.describe_kubernetes_resource_on_node(
350                     node, nspace
351                 )
352
353     @staticmethod
354     def get_kubernetes_logs_on_node(node, nspace):
355         """Get Kubernetes logs from all PODs in namespace on node.
356
357         :param node: DUT node.
358         :param nspace: Kubernetes namespace.
359         :type node: dict
360         :type nspace: str
361         """
362         ssh = SSH()
363         ssh.connect(node)
364
365         nspace = f"-n {nspace}" if nspace else u""
366         cmd = f"for p in $(kubectl get pods {nspace} " \
367             f"-o jsonpath='{{.items[*].metadata.name}}'); do echo $p; " \
368             f"kubectl logs {nspace} $p; done"
369
370         ssh.exec_command(cmd)
371
372         cmd = f"kubectl exec {nspace} etcdv3 -- etcdctl " \
373             f"--endpoints \"localhost:22379\" get \"/\" --prefix=true"
374
375         ssh.exec_command(cmd)
376
377     @staticmethod
378     def get_kubernetes_logs_on_all_duts(nodes, nspace):
379         """Get Kubernetes logs from all PODs in namespace on all DUTs.
380
381         :param nodes: Topology nodes.
382         :param nspace: Kubernetes namespace.
383         :type nodes: dict
384         :type nspace: str
385         """
386         for node in nodes.values():
387             if node[u"type"] == NodeType.DUT:
388                 KubernetesUtils.get_kubernetes_logs_on_node(node, nspace)
389
390     @staticmethod
391     def wait_for_kubernetes_pods_on_node(node, nspace):
392         """Wait for Kubernetes PODs to become ready on node.
393
394         :param node: DUT node.
395         :param nspace: Kubernetes namespace.
396         :type node: dict
397         :type nspace: str
398         :raises RuntimeError: If Kubernetes PODs are not in Running state.
399         """
400         ssh = SSH()
401         ssh.connect(node)
402
403         nspace = f"-n {nspace}" if nspace else u"--all-namespaces"
404         cmd = f"kubectl get {nspace} pods --no-headers"
405
406         for _ in range(MAX_RETRY):
407             ret_code, stdout, _ = ssh.exec_command_sudo(cmd)
408             if int(ret_code) == 0:
409                 ready = False
410                 for line in stdout.splitlines():
411                     try:
412                         state = line.split()[1].split(u"/")
413                         ready = bool(
414                             u"Running" in line and state == state[::-1]
415                         )
416                         if not ready:
417                             break
418                     except (ValueError, IndexError):
419                         ready = False
420                 if ready:
421                     break
422             sleep(5)
423         else:
424             raise RuntimeError(
425                 f"Kubernetes PODs are not running on {node[u'host']}."
426             )
427
428     @staticmethod
429     def wait_for_kubernetes_pods_on_all_duts(nodes, nspace):
430         """Wait for Kubernetes to become ready on all DUTs.
431
432         :param nodes: Topology nodes.
433         :param nspace: Kubernetes namespace.
434         :type nodes: dict
435         :type nspace: str
436         """
437         for node in nodes.values():
438             if node[u"type"] == NodeType.DUT:
439                 KubernetesUtils.wait_for_kubernetes_pods_on_node(node, nspace)
440
441     @staticmethod
442     def set_kubernetes_pods_affinity_on_node(node):
443         """Set affinity for all Kubernetes PODs except VPP on node.
444
445         :param node: DUT node.
446         :type node: dict
447         """
448         ssh = SSH()
449         ssh.connect(node)
450
451         cmd = f"{Constants.REMOTE_FW_DIR}/{Constants.RESOURCES_LIB_SH}/" \
452             f"k8s_setup.sh affinity_non_vpp"
453
454         ssh.exec_command(cmd)
455
456     @staticmethod
457     def set_kubernetes_pods_affinity_on_all_duts(nodes):
458         """Set affinity for all Kubernetes PODs except VPP on all DUTs.
459
460         :param nodes: Topology nodes.
461         :type nodes: dict
462         """
463         for node in nodes.values():
464             if node[u"type"] == NodeType.DUT:
465                 KubernetesUtils.set_kubernetes_pods_affinity_on_node(node)
466
467     @staticmethod
468     def create_kubernetes_vswitch_startup_config(**kwargs):
469         """Create Kubernetes VSWITCH startup configuration.
470
471         :param kwargs: Key-value pairs used to create configuration.
472         :param kwargs: dict
473         """
474         smt_used = CpuUtils.is_smt_enabled(kwargs[u"node"][u"cpuinfo"])
475
476         cpuset_cpus = CpuUtils.cpu_slice_of_list_per_node(
477             node=kwargs[u"node"], cpu_node=kwargs[u"cpu_node"], skip_cnt=2,
478             cpu_cnt=kwargs[u"phy_cores"], smt_used=smt_used
479         )
480         cpuset_main = CpuUtils.cpu_slice_of_list_per_node(
481             node=kwargs[u"node"], cpu_node=kwargs[u"cpu_node"], skip_cnt=1,
482             cpu_cnt=1, smt_used=smt_used
483         )
484
485         # Create config instance
486         vpp_config = VppConfigGenerator()
487         vpp_config.set_node(kwargs[u"node"])
488         vpp_config.add_unix_cli_listen(value=u"0.0.0.0:5002")
489         vpp_config.add_unix_nodaemon()
490         vpp_config.add_socksvr()
491         vpp_config.add_heapsize(u"4G")
492         vpp_config.add_ip_heap_size(u"4G")
493         vpp_config.add_ip6_heap_size(u"4G")
494         vpp_config.add_ip6_hash_buckets(u"2000000")
495         if not kwargs[u"jumbo"]:
496             vpp_config.add_dpdk_no_multi_seg()
497         vpp_config.add_dpdk_no_tx_checksum_offload()
498         vpp_config.add_dpdk_dev_default_rxq(kwargs[u"rxq_count_int"])
499         vpp_config.add_dpdk_dev(kwargs[u"if1"], kwargs[u"if2"])
500         vpp_config.add_buffers_per_numa(kwargs[u"buffers_per_numa"])
501         # We will pop first core from list to be main core
502         vpp_config.add_cpu_main_core(str(cpuset_main.pop(0)))
503         # if this is not only core in list, the rest will be used as workers.
504         if cpuset_cpus:
505             corelist_workers = u",".join(str(cpu) for cpu in cpuset_cpus)
506             vpp_config.add_cpu_corelist_workers(corelist_workers)
507         vpp_config.write_config(filename=kwargs[u"filename"])
508
509     @staticmethod
510     def create_kubernetes_vnf_startup_config(**kwargs):
511         """Create Kubernetes VNF startup configuration.
512
513         :param kwargs: Key-value pairs used to create configuration.
514         :param kwargs: dict
515         """
516         smt_used = CpuUtils.is_smt_enabled(kwargs[u"node"][u"cpuinfo"])
517         skip_cnt = kwargs[u"cpu_skip"] + (kwargs[u"i"] - 1) * \
518             (kwargs[u"phy_cores"] - 1)
519         cpuset_cpus = CpuUtils.cpu_slice_of_list_per_node(
520             node=kwargs[u"node"], cpu_node=kwargs[u"cpu_node"],
521             skip_cnt=skip_cnt, cpu_cnt=kwargs[u"phy_cores"]-1, smt_used=smt_used
522         )
523         cpuset_main = CpuUtils.cpu_slice_of_list_per_node(
524             node=kwargs[u"node"], cpu_node=kwargs[u"cpu_node"], skip_cnt=1,
525             cpu_cnt=1, smt_used=smt_used
526         )
527         # Create config instance
528         vpp_config = VppConfigGenerator()
529         vpp_config.set_node(kwargs[u"node"])
530         vpp_config.add_unix_cli_listen(value=u"0.0.0.0:5002")
531         vpp_config.add_unix_nodaemon()
532         vpp_config.add_socksvr()
533         # We will pop first core from list to be main core
534         vpp_config.add_cpu_main_core(str(cpuset_main.pop(0)))
535         # if this is not only core in list, the rest will be used as workers.
536         if cpuset_cpus:
537             corelist_workers = u",".join(str(cpu) for cpu in cpuset_cpus)
538             vpp_config.add_cpu_corelist_workers(corelist_workers)
539         vpp_config.add_plugin(u"disable", [u"dpdk_plugin.so"])
540         vpp_config.write_config(filename=kwargs[u"filename"])