efeda69c92553b84892133830734ad8698326dec
[csit.git] / resources / libraries / python / KubernetesUtils.py
1 # Copyright (c) 2017 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """Library to control Kubernetes kubectl."""
15
16 from time import sleep
17
18 from resources.libraries.python.constants import Constants
19 from resources.libraries.python.topology import NodeType
20 from resources.libraries.python.ssh import SSH
21 from resources.libraries.python.CpuUtils import CpuUtils
22 from resources.libraries.python.VppConfigGenerator import VppConfigGenerator
23
24 __all__ = ["KubernetesUtils"]
25
26 # Maximum number of retries to check if PODs are running or deleted.
27 MAX_RETRY = 48
28
29 class KubernetesUtils(object):
30     """Kubernetes utilities class."""
31
32     def __init__(self):
33         """Initialize KubernetesUtils class."""
34         pass
35
36     @staticmethod
37     def setup_kubernetes_on_node(node):
38         """Set up Kubernetes on node.
39
40         :param node: DUT node.
41         :type node: dict
42         :raises RuntimeError: If Kubernetes setup failed on node.
43         """
44         ssh = SSH()
45         ssh.connect(node)
46
47         cmd = '{dir}/{lib}/k8s_setup.sh deploy_calico'\
48             .format(dir=Constants.REMOTE_FW_DIR,
49                     lib=Constants.RESOURCES_LIB_SH)
50         (ret_code, _, _) = ssh.exec_command(cmd, timeout=240)
51         if int(ret_code) != 0:
52             raise RuntimeError('Failed to setup Kubernetes on {node}.'
53                                .format(node=node['host']))
54
55         KubernetesUtils.wait_for_kubernetes_pods_on_node(node,
56                                                          nspace='kube-system')
57
58     @staticmethod
59     def setup_kubernetes_on_all_duts(nodes):
60         """Set up Kubernetes on all DUTs.
61
62         :param nodes: Topology nodes.
63         :type nodes: dict
64         """
65         for node in nodes.values():
66             if node['type'] == NodeType.DUT:
67                 KubernetesUtils.setup_kubernetes_on_node(node)
68
69     @staticmethod
70     def destroy_kubernetes_on_node(node):
71         """Destroy Kubernetes on node.
72
73         :param node: DUT node.
74         :type node: dict
75         :raises RuntimeError: If destroying Kubernetes failed.
76         """
77         ssh = SSH()
78         ssh.connect(node)
79
80         cmd = '{dir}/{lib}/k8s_setup.sh destroy'\
81             .format(dir=Constants.REMOTE_FW_DIR,
82                     lib=Constants.RESOURCES_LIB_SH)
83         (ret_code, _, _) = ssh.exec_command(cmd, timeout=120)
84         if int(ret_code) != 0:
85             raise RuntimeError('Failed to destroy Kubernetes on {node}.'
86                                .format(node=node['host']))
87
88     @staticmethod
89     def destroy_kubernetes_on_all_duts(nodes):
90         """Destroy Kubernetes on all DUTs.
91
92         :param nodes: Topology nodes.
93         :type nodes: dict
94         """
95         for node in nodes.values():
96             if node['type'] == NodeType.DUT:
97                 KubernetesUtils.destroy_kubernetes_on_node(node)
98
99     @staticmethod
100     def apply_kubernetes_resource_on_node(node, yaml_file, **kwargs):
101         """Apply Kubernetes resource on node.
102
103         :param node: DUT node.
104         :param yaml_file: YAML configuration file.
105         :param kwargs: Key-value pairs to replace in YAML template.
106         :type node: dict
107         :type yaml_file: str
108         :type kwargs: dict
109         :raises RuntimeError: If applying Kubernetes template failed.
110         """
111         ssh = SSH()
112         ssh.connect(node)
113
114         fqn_file = '{tpl}/{yaml}'.format(tpl=Constants.RESOURCES_TPL_K8S,
115                                          yaml=yaml_file)
116         with open(fqn_file, 'r') as src_file:
117             stream = src_file.read()
118             data = reduce(lambda a, kv: a.replace(*kv), kwargs.iteritems(),
119                           stream)
120             cmd = 'cat <<EOF | kubectl apply -f - \n{data}\nEOF'.format(
121                 data=data)
122             (ret_code, _, _) = ssh.exec_command_sudo(cmd)
123             if int(ret_code) != 0:
124                 raise RuntimeError('Failed to apply Kubernetes template {yaml} '
125                                    'on {node}.'.format(yaml=yaml_file,
126                                                        node=node['host']))
127
128     @staticmethod
129     def apply_kubernetes_resource_on_all_duts(nodes, yaml_file, **kwargs):
130         """Apply Kubernetes resource on all DUTs.
131
132         :param nodes: Topology nodes.
133         :param yaml_file: YAML configuration file.
134         :param kwargs: Key-value pairs to replace in YAML template.
135         :type nodes: dict
136         :type yaml_file: str
137         :type kwargs: dict
138         """
139         for node in nodes.values():
140             if node['type'] == NodeType.DUT:
141                 KubernetesUtils.apply_kubernetes_resource_on_node(node,
142                                                                   yaml_file,
143                                                                   **kwargs)
144
145     @staticmethod
146     def create_kubernetes_cm_from_file_on_node(node, nspace, name, **kwargs):
147         """Create Kubernetes ConfigMap from file on node.
148
149         :param node: DUT node.
150         :param nspace: Kubernetes namespace.
151         :param name: ConfigMap name.
152         :param kwargs: Named parameters.
153         :type node: dict
154         :type nspace: str
155         :type name: str
156         :param kwargs: dict
157         :raises RuntimeError: If creating Kubernetes ConfigMap failed.
158         """
159         ssh = SSH()
160         ssh.connect(node)
161
162         nspace = '-n {nspace}'.format(nspace=nspace) if nspace else ''
163
164         from_file = '{0}'.format(' '.join('--from-file={0}={1} '\
165             .format(key, kwargs[key]) for key in kwargs))
166
167         cmd = 'kubectl create {nspace} configmap {name} {from_file}'\
168             .format(nspace=nspace, name=name, from_file=from_file)
169         (ret_code, _, _) = ssh.exec_command_sudo(cmd)
170         if int(ret_code) != 0:
171             raise RuntimeError('Failed to create Kubernetes ConfigMap '
172                                'on {node}.'.format(node=node['host']))
173
174     @staticmethod
175     def create_kubernetes_cm_from_file_on_all_duts(nodes, nspace, name,
176                                                    **kwargs):
177         """Create Kubernetes ConfigMap from file on all DUTs.
178
179         :param nodes: Topology nodes.
180         :param nspace: Kubernetes namespace.
181         :param name: ConfigMap name.
182         :param kwargs: Named parameters.
183         :type nodes: dict
184         :type nspace: str
185         :type name: str
186         :param kwargs: dict
187         """
188         for node in nodes.values():
189             if node['type'] == NodeType.DUT:
190                 KubernetesUtils.create_kubernetes_cm_from_file_on_node(node,
191                                                                        nspace,
192                                                                        name,
193                                                                        **kwargs)
194
195     @staticmethod
196     def delete_kubernetes_resource_on_node(node, nspace, name=None,
197                                            rtype='po,cm,deploy,rs,rc,svc'):
198         """Delete Kubernetes resource on node.
199
200         :param node: DUT node.
201         :param nspace: Kubernetes namespace.
202         :param rtype: Kubernetes resource type.
203         :param name: Name of resource (Default: all).
204         :type node: dict
205         :type nspace: str
206         :type rtype: str
207         :type name: str
208         :raises RuntimeError: If retrieving or deleting Kubernetes resource
209         failed.
210         """
211         ssh = SSH()
212         ssh.connect(node)
213
214         name = '{name}'.format(name=name) if name else '--all'
215         nspace = '-n {nspace}'.format(nspace=nspace) if nspace else ''
216
217         cmd = 'kubectl delete {nspace} {rtype} {name}'\
218             .format(nspace=nspace, rtype=rtype, name=name)
219         (ret_code, _, _) = ssh.exec_command_sudo(cmd)
220         if int(ret_code) != 0:
221             raise RuntimeError('Failed to delete Kubernetes resources '
222                                'on {node}.'.format(node=node['host']))
223
224         cmd = 'kubectl get {nspace} pods -a --no-headers'\
225             .format(nspace=nspace)
226         for _ in range(MAX_RETRY):
227             (ret_code, stdout, stderr) = ssh.exec_command_sudo(cmd)
228             if int(ret_code) != 0:
229                 raise RuntimeError('Failed to retrieve Kubernetes resources on '
230                                    '{node}.'.format(node=node['host']))
231             if name == '--all':
232                 ready = False
233                 for line in stderr.splitlines():
234                     if 'No resources found.' in line:
235                         ready = True
236                 if ready:
237                     break
238             else:
239                 ready = False
240                 for line in stdout.splitlines():
241                     try:
242                         state = line.split()[1].split('/')
243                         ready = True if 'Running' in line and\
244                             state == state[::-1] else False
245                         if not ready:
246                             break
247                     except (ValueError, IndexError):
248                         ready = False
249                 if ready:
250                     break
251             sleep(5)
252         else:
253             raise RuntimeError('Failed to delete Kubernetes resources on '
254                                '{node}.'.format(node=node['host']))
255
256     @staticmethod
257     def delete_kubernetes_resource_on_all_duts(nodes, nspace, name=None,
258                                                rtype='po,cm,deploy,rs,rc,svc'):
259         """Delete all Kubernetes resource on all DUTs.
260
261         :param nodes: Topology nodes.
262         :param nspace: Kubernetes namespace.
263         :param rtype: Kubernetes resource type.
264         :param name: Name of resource.
265         :type nodes: dict
266         :type nspace: str
267         :type rtype: str
268         :type name: str
269         """
270         for node in nodes.values():
271             if node['type'] == NodeType.DUT:
272                 KubernetesUtils.delete_kubernetes_resource_on_node(node, nspace,
273                                                                    name, rtype)
274
275     @staticmethod
276     def describe_kubernetes_resource_on_node(node, nspace):
277         """Describe all Kubernetes PODs in namespace on node.
278
279         :param node: DUT node.
280         :param nspace: Kubernetes namespace.
281         :type node: dict
282         :type nspace: str
283         """
284         ssh = SSH()
285         ssh.connect(node)
286
287         nspace = '-n {nspace}'.format(nspace=nspace) if nspace else ''
288
289         cmd = 'kubectl describe {nspace} all'.format(nspace=nspace)
290         ssh.exec_command_sudo(cmd)
291
292     @staticmethod
293     def describe_kubernetes_resource_on_all_duts(nodes, nspace):
294         """Describe all Kubernetes PODs in namespace on all DUTs.
295
296         :param nodes: Topology nodes.
297         :param nspace: Kubernetes namespace.
298         :type nodes: dict
299         :type nspace: str
300         """
301         for node in nodes.values():
302             if node['type'] == NodeType.DUT:
303                 KubernetesUtils.describe_kubernetes_resource_on_node(node,
304                                                                      nspace)
305
306     @staticmethod
307     def get_kubernetes_logs_on_node(node, nspace):
308         """Get Kubernetes logs from all PODs in namespace on node.
309
310         :param node: DUT node.
311         :param nspace: Kubernetes namespace.
312         :type node: dict
313         :type nspace: str
314         """
315         ssh = SSH()
316         ssh.connect(node)
317
318         nspace = '-n {nspace}'.format(nspace=nspace) if nspace else ''
319
320         cmd = "for p in $(kubectl get pods {nspace} -o jsonpath="\
321             "'{{.items[*].metadata.name}}'); do echo $p; kubectl logs "\
322             "{nspace} $p; done".format(nspace=nspace)
323         ssh.exec_command(cmd)
324
325         cmd = "kubectl exec {nspace} etcdv3 -- etcdctl --endpoints "\
326             "\"localhost:22379\" get \"/\" --prefix=true".format(nspace=nspace)
327         ssh.exec_command(cmd)
328
329     @staticmethod
330     def get_kubernetes_logs_on_all_duts(nodes, nspace):
331         """Get Kubernetes logs from all PODs in namespace on all DUTs.
332
333         :param nodes: Topology nodes.
334         :param nspace: Kubernetes namespace.
335         :type nodes: dict
336         :type nspace: str
337         """
338         for node in nodes.values():
339             if node['type'] == NodeType.DUT:
340                 KubernetesUtils.get_kubernetes_logs_on_node(node, nspace)
341
342     @staticmethod
343     def wait_for_kubernetes_pods_on_node(node, nspace):
344         """Wait for Kubernetes PODs to become ready on node.
345
346         :param node: DUT node.
347         :param nspace: Kubernetes namespace.
348         :type node: dict
349         :type nspace: str
350         :raises RuntimeError: If Kubernetes PODs are not in Running state.
351         """
352         ssh = SSH()
353         ssh.connect(node)
354
355         nspace = '-n {nspace}'.format(nspace=nspace) if nspace \
356             else '--all-namespaces'
357
358         cmd = 'kubectl get {nspace} pods -a --no-headers' \
359             .format(nspace=nspace)
360         for _ in range(MAX_RETRY):
361             (ret_code, stdout, _) = ssh.exec_command_sudo(cmd)
362             if int(ret_code) == 0:
363                 ready = False
364                 for line in stdout.splitlines():
365                     try:
366                         state = line.split()[1].split('/')
367                         ready = True if 'Running' in line and \
368                             state == state[::-1] else False
369                         if not ready:
370                             break
371                     except (ValueError, IndexError):
372                         ready = False
373                 if ready:
374                     break
375             sleep(5)
376         else:
377             raise RuntimeError('Kubernetes PODs are not running on {node}.'
378                                .format(node=node['host']))
379
380     @staticmethod
381     def wait_for_kubernetes_pods_on_all_duts(nodes, nspace):
382         """Wait for Kubernetes to become ready on all DUTs.
383
384         :param nodes: Topology nodes.
385         :param nspace: Kubernetes namespace.
386         :type nodes: dict
387         :type nspace: str
388         """
389         for node in nodes.values():
390             if node['type'] == NodeType.DUT:
391                 KubernetesUtils.wait_for_kubernetes_pods_on_node(node, nspace)
392
393     @staticmethod
394     def set_kubernetes_pods_affinity_on_node(node):
395         """Set affinity for all Kubernetes PODs except VPP on node.
396
397         :param node: DUT node.
398         :type node: dict
399         """
400         ssh = SSH()
401         ssh.connect(node)
402
403         cmd = '{dir}/{lib}/k8s_setup.sh affinity_non_vpp'\
404             .format(dir=Constants.REMOTE_FW_DIR,
405                     lib=Constants.RESOURCES_LIB_SH)
406         ssh.exec_command(cmd)
407
408     @staticmethod
409     def set_kubernetes_pods_affinity_on_all_duts(nodes):
410         """Set affinity for all Kubernetes PODs except VPP on all DUTs.
411
412         :param nodes: Topology nodes.
413         :type nodes: dict
414         """
415         for node in nodes.values():
416             if node['type'] == NodeType.DUT:
417                 KubernetesUtils.set_kubernetes_pods_affinity_on_node(node)
418
419     @staticmethod
420     def create_kubernetes_vswitch_startup_config(**kwargs):
421         """Create Kubernetes VSWITCH startup configuration.
422
423         :param kwargs: Key-value pairs used to create configuration.
424         :param kwargs: dict
425         """
426         cpuset_cpus = \
427             CpuUtils.cpu_slice_of_list_per_node(node=kwargs['node'],
428                                                 cpu_node=kwargs['cpu_node'],
429                                                 skip_cnt=kwargs['cpu_skip'],
430                                                 cpu_cnt=kwargs['cpu_cnt'],
431                                                 smt_used=kwargs['smt_used'])
432
433         # Create config instance
434         vpp_config = VppConfigGenerator()
435         vpp_config.set_node(kwargs['node'])
436         vpp_config.add_unix_cli_listen(value='0.0.0.0:5002')
437         vpp_config.add_unix_nodaemon()
438         vpp_config.add_dpdk_socketmem('1024,1024')
439         vpp_config.add_heapsize('3G')
440         vpp_config.add_ip6_hash_buckets('2000000')
441         vpp_config.add_ip6_heap_size('3G')
442         if kwargs['framesize'] < 1522:
443             vpp_config.add_dpdk_no_multi_seg()
444         vpp_config.add_dpdk_dev_default_rxq(kwargs['rxq'])
445         vpp_config.add_dpdk_dev(kwargs['if1'], kwargs['if2'])
446         # We will pop first core from list to be main core
447         vpp_config.add_cpu_main_core(str(cpuset_cpus.pop(0)))
448         # if this is not only core in list, the rest will be used as workers.
449         if cpuset_cpus:
450             corelist_workers = ','.join(str(cpu) for cpu in cpuset_cpus)
451             vpp_config.add_cpu_corelist_workers(corelist_workers)
452         vpp_config.apply_config(filename=kwargs['filename'], restart_vpp=False)
453
454     @staticmethod
455     def create_kubernetes_vnf_startup_config(**kwargs):
456         """Create Kubernetes VNF startup configuration.
457
458         :param kwargs: Key-value pairs used to create configuration.
459         :param kwargs: dict
460         """
461         skip_cnt = kwargs['cpu_skip'] + (kwargs['i'] - 1) * \
462             (kwargs['cpu_cnt'] - 1)
463         cpuset_cpus = \
464             CpuUtils.cpu_slice_of_list_per_node(node=kwargs['node'],
465                                                 cpu_node=kwargs['cpu_node'],
466                                                 skip_cnt=skip_cnt,
467                                                 cpu_cnt=kwargs['cpu_cnt']-1,
468                                                 smt_used=kwargs['smt_used'])
469         cpuset_main = \
470             CpuUtils.cpu_slice_of_list_per_node(node=kwargs['node'],
471                                                 cpu_node=kwargs['cpu_node'],
472                                                 skip_cnt=1,
473                                                 cpu_cnt=1,
474                                                 smt_used=kwargs['smt_used'])
475         # Create config instance
476         vpp_config = VppConfigGenerator()
477         vpp_config.set_node(kwargs['node'])
478         vpp_config.add_unix_cli_listen(value='0.0.0.0:5002')
479         vpp_config.add_unix_nodaemon()
480         # We will pop first core from list to be main core
481         vpp_config.add_cpu_main_core(str(cpuset_main.pop(0)))
482         # if this is not only core in list, the rest will be used as workers.
483         if cpuset_cpus:
484             corelist_workers = ','.join(str(cpu) for cpu in cpuset_cpus)
485             vpp_config.add_cpu_corelist_workers(corelist_workers)
486         vpp_config.add_plugin('disable', 'dpdk_plugin.so')
487         vpp_config.apply_config(filename=kwargs['filename'], restart_vpp=False)