89c5bd2a57221de12ae7ccd7aa349046ba0a5ca5
[csit.git] / resources / libraries / python / KubernetesUtils.py
1 # Copyright (c) 2017 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """Library to control Kubernetes kubectl."""
15
16 from time import sleep
17
18 from resources.libraries.python.constants import Constants
19 from resources.libraries.python.topology import NodeType
20 from resources.libraries.python.ssh import SSH
21 from resources.libraries.python.CpuUtils import CpuUtils
22 from resources.libraries.python.VppConfigGenerator import VppConfigGenerator
23
24 __all__ = ["KubernetesUtils"]
25
26 # Maximum number of retries to check if PODs are running or deleted.
27 MAX_RETRY = 48
28
29 class KubernetesUtils(object):
30     """Kubernetes utilities class."""
31
32     def __init__(self):
33         """Initialize KubernetesUtils class."""
34         pass
35
36     @staticmethod
37     def setup_kubernetes_on_node(node):
38         """Set up Kubernetes on node.
39
40         :param node: DUT node.
41         :type node: dict
42         :raises RuntimeError: If Kubernetes setup failed on node.
43         """
44         ssh = SSH()
45         ssh.connect(node)
46
47         cmd = '{dir}/{lib}/k8s_setup.sh deploy_calico'\
48             .format(dir=Constants.REMOTE_FW_DIR,
49                     lib=Constants.RESOURCES_LIB_SH)
50         (ret_code, _, _) = ssh.exec_command(cmd, timeout=240)
51         if int(ret_code) != 0:
52             raise RuntimeError('Failed to setup Kubernetes on {node}.'
53                                .format(node=node['host']))
54
55         KubernetesUtils.wait_for_kubernetes_pods_on_node(node,
56                                                          nspace='kube-system')
57
58     @staticmethod
59     def setup_kubernetes_on_all_duts(nodes):
60         """Set up Kubernetes on all DUTs.
61
62         :param nodes: Topology nodes.
63         :type nodes: dict
64         """
65         for node in nodes.values():
66             if node['type'] == NodeType.DUT:
67                 KubernetesUtils.setup_kubernetes_on_node(node)
68
69     @staticmethod
70     def destroy_kubernetes_on_node(node):
71         """Destroy Kubernetes on node.
72
73         :param node: DUT node.
74         :type node: dict
75         :raises RuntimeError: If destroying Kubernetes failed.
76         """
77         ssh = SSH()
78         ssh.connect(node)
79
80         cmd = '{dir}/{lib}/k8s_setup.sh destroy'\
81             .format(dir=Constants.REMOTE_FW_DIR,
82                     lib=Constants.RESOURCES_LIB_SH)
83         (ret_code, _, _) = ssh.exec_command(cmd, timeout=120)
84         if int(ret_code) != 0:
85             raise RuntimeError('Failed to destroy Kubernetes on {node}.'
86                                .format(node=node['host']))
87
88     @staticmethod
89     def destroy_kubernetes_on_all_duts(nodes):
90         """Destroy Kubernetes on all DUTs.
91
92         :param nodes: Topology nodes.
93         :type nodes: dict
94         """
95         for node in nodes.values():
96             if node['type'] == NodeType.DUT:
97                 KubernetesUtils.destroy_kubernetes_on_node(node)
98
99     @staticmethod
100     def apply_kubernetes_resource_on_node(node, yaml_file, **kwargs):
101         """Apply Kubernetes resource on node.
102
103         :param node: DUT node.
104         :param yaml_file: YAML configuration file.
105         :param kwargs: Key-value pairs to replace in YAML template.
106         :type node: dict
107         :type yaml_file: str
108         :type kwargs: dict
109         :raises RuntimeError: If applying Kubernetes template failed.
110         """
111         ssh = SSH()
112         ssh.connect(node)
113
114         fqn_file = '{tpl}/{yaml}'.format(tpl=Constants.RESOURCES_TPL_K8S,
115                                          yaml=yaml_file)
116         with open(fqn_file, 'r') as src_file:
117             stream = src_file.read()
118             data = reduce(lambda a, kv: a.replace(*kv), kwargs.iteritems(),
119                           stream)
120             cmd = 'cat <<EOF | kubectl apply -f - \n{data}\nEOF'.format(
121                 data=data)
122             (ret_code, _, _) = ssh.exec_command_sudo(cmd)
123             if int(ret_code) != 0:
124                 raise RuntimeError('Failed to apply Kubernetes template {yaml} '
125                                    'on {node}.'.format(yaml=yaml_file,
126                                                        node=node['host']))
127
128     @staticmethod
129     def apply_kubernetes_resource_on_all_duts(nodes, yaml_file, **kwargs):
130         """Apply Kubernetes resource on all DUTs.
131
132         :param nodes: Topology nodes.
133         :param yaml_file: YAML configuration file.
134         :param kwargs: Key-value pairs to replace in YAML template.
135         :type nodes: dict
136         :type yaml_file: str
137         :type kwargs: dict
138         """
139         for node in nodes.values():
140             if node['type'] == NodeType.DUT:
141                 KubernetesUtils.apply_kubernetes_resource_on_node(node,
142                                                                   yaml_file,
143                                                                   **kwargs)
144
145     @staticmethod
146     def create_kubernetes_cm_from_file_on_node(node, nspace, name, **kwargs):
147         """Create Kubernetes ConfigMap from file on node.
148
149         :param node: DUT node.
150         :param nspace: Kubernetes namespace.
151         :param name: ConfigMap name.
152         :param kwargs: Named parameters.
153         :type node: dict
154         :type nspace: str
155         :type name: str
156         :param kwargs: dict
157         :raises RuntimeError: If creating Kubernetes ConfigMap failed.
158         """
159         ssh = SSH()
160         ssh.connect(node)
161
162         nspace = '-n {nspace}'.format(nspace=nspace) if nspace else ''
163
164         from_file = '{0}'.format(' '.join('--from-file={0}={1} '\
165             .format(key, kwargs[key]) for key in kwargs))
166
167         cmd = 'kubectl create {nspace} configmap {name} {from_file}'\
168             .format(nspace=nspace, name=name, from_file=from_file)
169         (ret_code, _, _) = ssh.exec_command_sudo(cmd)
170         if int(ret_code) != 0:
171             raise RuntimeError('Failed to create Kubernetes ConfigMap '
172                                'on {node}.'.format(node=node['host']))
173
174     @staticmethod
175     def create_kubernetes_cm_from_file_on_all_duts(nodes, nspace, name,
176                                                    **kwargs):
177         """Create Kubernetes ConfigMap from file on all DUTs.
178
179         :param nodes: Topology nodes.
180         :param nspace: Kubernetes namespace.
181         :param name: ConfigMap name.
182         :param kwargs: Named parameters.
183         :type nodes: dict
184         :type nspace: str
185         :type name: str
186         :param kwargs: dict
187         """
188         for node in nodes.values():
189             if node['type'] == NodeType.DUT:
190                 KubernetesUtils.create_kubernetes_cm_from_file_on_node(node,
191                                                                        nspace,
192                                                                        name,
193                                                                        **kwargs)
194
195     @staticmethod
196     def delete_kubernetes_resource_on_node(node, nspace, name=None,
197                                            rtype='po,cm,deploy,rs,rc,svc'):
198         """Delete Kubernetes resource on node.
199
200         :param node: DUT node.
201         :param nspace: Kubernetes namespace.
202         :param rtype: Kubernetes resource type.
203         :param name: Name of resource (Default: all).
204         :type node: dict
205         :type nspace: str
206         :type rtype: str
207         :type name: str
208         :raises RuntimeError: If retrieving or deleting Kubernetes resource
209         failed.
210         """
211         ssh = SSH()
212         ssh.connect(node)
213
214         name = '{name}'.format(name=name) if name else '--all'
215         nspace = '-n {nspace}'.format(nspace=nspace) if nspace else ''
216
217         cmd = 'kubectl delete {nspace} {rtype} {name}'\
218             .format(nspace=nspace, rtype=rtype, name=name)
219         (ret_code, _, _) = ssh.exec_command_sudo(cmd)
220         if int(ret_code) != 0:
221             raise RuntimeError('Failed to delete Kubernetes resources '
222                                'on {node}.'.format(node=node['host']))
223
224         cmd = 'kubectl get {nspace} pods -a --no-headers'\
225             .format(nspace=nspace)
226         for _ in range(MAX_RETRY):
227             (ret_code, stdout, stderr) = ssh.exec_command_sudo(cmd)
228             if int(ret_code) != 0:
229                 raise RuntimeError('Failed to retrieve Kubernetes resources on '
230                                    '{node}.'.format(node=node['host']))
231             if name == '--all':
232                 ready = False
233                 for line in stderr.splitlines():
234                     if 'No resources found.' in line:
235                         ready = True
236                 if ready:
237                     break
238             else:
239                 ready = False
240                 for line in stdout.splitlines():
241                     try:
242                         state = line.split()[1].split('/')
243                         ready = True if 'Running' in line and\
244                             state == state[::-1] else False
245                         if not ready:
246                             break
247                     except (ValueError, IndexError):
248                         ready = False
249                 if ready:
250                     break
251             sleep(5)
252         else:
253             raise RuntimeError('Failed to delete Kubernetes resources on '
254                                '{node}.'.format(node=node['host']))
255
256     @staticmethod
257     def delete_kubernetes_resource_on_all_duts(nodes, nspace, name=None,
258                                                rtype='po,cm,deploy,rs,rc,svc'):
259         """Delete all Kubernetes resource on all DUTs.
260
261         :param nodes: Topology nodes.
262         :param nspace: Kubernetes namespace.
263         :param rtype: Kubernetes resource type.
264         :param name: Name of resource.
265         :type nodes: dict
266         :type nspace: str
267         :type rtype: str
268         :type name: str
269         """
270         for node in nodes.values():
271             if node['type'] == NodeType.DUT:
272                 KubernetesUtils.delete_kubernetes_resource_on_node(node, nspace,
273                                                                    name, rtype)
274
275     @staticmethod
276     def describe_kubernetes_resource_on_node(node, nspace):
277         """Describe all Kubernetes PODs in namespace on node.
278
279         :param node: DUT node.
280         :param nspace: Kubernetes namespace.
281         :type node: dict
282         :type nspace: str
283         """
284         ssh = SSH()
285         ssh.connect(node)
286
287         nspace = '-n {nspace}'.format(nspace=nspace) if nspace else ''
288
289         cmd = 'kubectl describe {nspace} all'.format(nspace=nspace)
290         ssh.exec_command_sudo(cmd)
291
292     @staticmethod
293     def describe_kubernetes_resource_on_all_duts(nodes, nspace):
294         """Describe all Kubernetes PODs in namespace on all DUTs.
295
296         :param nodes: Topology nodes.
297         :param nspace: Kubernetes namespace.
298         :type nodes: dict
299         :type nspace: str
300         """
301         for node in nodes.values():
302             if node['type'] == NodeType.DUT:
303                 KubernetesUtils.describe_kubernetes_resource_on_node(node,
304                                                                      nspace)
305
306     @staticmethod
307     def get_kubernetes_logs_on_node(node, nspace):
308         """Get Kubernetes logs from all PODs in namespace on node.
309
310         :param node: DUT node.
311         :param nspace: Kubernetes namespace.
312         :type node: dict
313         :type nspace: str
314         """
315         ssh = SSH()
316         ssh.connect(node)
317
318         nspace = '-n {nspace}'.format(nspace=nspace) if nspace else ''
319
320         cmd = "for p in $(kubectl get pods {nspace} -o jsonpath="\
321             "'{{.items[*].metadata.name}}'); do echo $p; kubectl logs "\
322             "{nspace} $p; done".format(nspace=nspace)
323         ssh.exec_command(cmd)
324
325     @staticmethod
326     def get_kubernetes_logs_on_all_duts(nodes, nspace):
327         """Get Kubernetes logs from all PODs in namespace on all DUTs.
328
329         :param nodes: Topology nodes.
330         :param nspace: Kubernetes namespace.
331         :type nodes: dict
332         :type nspace: str
333         """
334         for node in nodes.values():
335             if node['type'] == NodeType.DUT:
336                 KubernetesUtils.get_kubernetes_logs_on_node(node, nspace)
337
338     @staticmethod
339     def wait_for_kubernetes_pods_on_node(node, nspace):
340         """Wait for Kubernetes PODs to become ready on node.
341
342         :param node: DUT node.
343         :param nspace: Kubernetes namespace.
344         :type node: dict
345         :type nspace: str
346         :raises RuntimeError: If Kubernetes PODs are not in Running state.
347         """
348         ssh = SSH()
349         ssh.connect(node)
350
351         nspace = '-n {nspace}'.format(nspace=nspace) if nspace \
352             else '--all-namespaces'
353
354         cmd = 'kubectl get {nspace} pods -a --no-headers' \
355             .format(nspace=nspace)
356         for _ in range(MAX_RETRY):
357             (ret_code, stdout, _) = ssh.exec_command_sudo(cmd)
358             if int(ret_code) == 0:
359                 ready = False
360                 for line in stdout.splitlines():
361                     try:
362                         state = line.split()[1].split('/')
363                         ready = True if 'Running' in line and \
364                             state == state[::-1] else False
365                         if not ready:
366                             break
367                     except (ValueError, IndexError):
368                         ready = False
369                 if ready:
370                     break
371             sleep(5)
372         else:
373             raise RuntimeError('Kubernetes PODs are not running on {node}.'
374                                .format(node=node['host']))
375
376     @staticmethod
377     def wait_for_kubernetes_pods_on_all_duts(nodes, nspace):
378         """Wait for Kubernetes to become ready on all DUTs.
379
380         :param nodes: Topology nodes.
381         :param nspace: Kubernetes namespace.
382         :type nodes: dict
383         :type nspace: str
384         """
385         for node in nodes.values():
386             if node['type'] == NodeType.DUT:
387                 KubernetesUtils.wait_for_kubernetes_pods_on_node(node, nspace)
388
389     @staticmethod
390     def set_kubernetes_pods_affinity_on_node(node):
391         """Set affinity for all Kubernetes PODs except VPP on node.
392
393         :param node: DUT node.
394         :type node: dict
395         """
396         ssh = SSH()
397         ssh.connect(node)
398
399         cmd = '{dir}/{lib}/k8s_setup.sh affinity_non_vpp'\
400             .format(dir=Constants.REMOTE_FW_DIR,
401                     lib=Constants.RESOURCES_LIB_SH)
402         ssh.exec_command(cmd)
403
404     @staticmethod
405     def set_kubernetes_pods_affinity_on_all_duts(nodes):
406         """Set affinity for all Kubernetes PODs except VPP on all DUTs.
407
408         :param nodes: Topology nodes.
409         :type nodes: dict
410         """
411         for node in nodes.values():
412             if node['type'] == NodeType.DUT:
413                 KubernetesUtils.set_kubernetes_pods_affinity_on_node(node)
414
415     @staticmethod
416     def create_kubernetes_vswitch_startup_config(**kwargs):
417         """Create Kubernetes VSWITCH startup configuration.
418
419         :param kwargs: Key-value pairs used to create configuration.
420         :param kwargs: dict
421         """
422         cpuset_cpus = \
423             CpuUtils.cpu_slice_of_list_per_node(node=kwargs['node'],
424                                                 cpu_node=kwargs['cpu_node'],
425                                                 skip_cnt=kwargs['cpu_skip'],
426                                                 cpu_cnt=kwargs['cpu_cnt'],
427                                                 smt_used=kwargs['smt_used'])
428
429         # Create config instance
430         vpp_config = VppConfigGenerator()
431         vpp_config.set_node(kwargs['node'])
432         vpp_config.add_unix_cli_listen(value='0.0.0.0:5002')
433         vpp_config.add_unix_nodaemon()
434         vpp_config.add_dpdk_socketmem('1024,1024')
435         vpp_config.add_heapsize('3G')
436         vpp_config.add_ip6_hash_buckets('2000000')
437         vpp_config.add_ip6_heap_size('3G')
438         if kwargs['framesize'] < 1522:
439             vpp_config.add_dpdk_no_multi_seg()
440         vpp_config.add_dpdk_dev_default_rxq(kwargs['rxq'])
441         vpp_config.add_dpdk_dev(kwargs['if1'], kwargs['if2'])
442         # We will pop first core from list to be main core
443         vpp_config.add_cpu_main_core(str(cpuset_cpus.pop(0)))
444         # if this is not only core in list, the rest will be used as workers.
445         if cpuset_cpus:
446             corelist_workers = ','.join(str(cpu) for cpu in cpuset_cpus)
447             vpp_config.add_cpu_corelist_workers(corelist_workers)
448         vpp_config.apply_config(filename=kwargs['filename'], restart_vpp=False)
449
450     @staticmethod
451     def create_kubernetes_vnf_startup_config(**kwargs):
452         """Create Kubernetes VNF startup configuration.
453
454         :param kwargs: Key-value pairs used to create configuration.
455         :param kwargs: dict
456         """
457         skip_cnt = kwargs['cpu_skip'] + (kwargs['i'] - 1) * \
458             (kwargs['cpu_cnt'] - 1)
459         cpuset_cpus = \
460             CpuUtils.cpu_slice_of_list_per_node(node=kwargs['node'],
461                                                 cpu_node=kwargs['cpu_node'],
462                                                 skip_cnt=skip_cnt,
463                                                 cpu_cnt=kwargs['cpu_cnt']-1,
464                                                 smt_used=kwargs['smt_used'])
465         cpuset_main = \
466             CpuUtils.cpu_slice_of_list_per_node(node=kwargs['node'],
467                                                 cpu_node=kwargs['cpu_node'],
468                                                 skip_cnt=1,
469                                                 cpu_cnt=1,
470                                                 smt_used=kwargs['smt_used'])
471         # Create config instance
472         vpp_config = VppConfigGenerator()
473         vpp_config.set_node(kwargs['node'])
474         vpp_config.add_unix_cli_listen(value='0.0.0.0:5002')
475         vpp_config.add_unix_nodaemon()
476         # We will pop first core from list to be main core
477         vpp_config.add_cpu_main_core(str(cpuset_main.pop(0)))
478         # if this is not only core in list, the rest will be used as workers.
479         if cpuset_cpus:
480             corelist_workers = ','.join(str(cpu) for cpu in cpuset_cpus)
481             vpp_config.add_cpu_corelist_workers(corelist_workers)
482         vpp_config.add_plugin_disable('dpdk_plugin.so')
483         vpp_config.apply_config(filename=kwargs['filename'], restart_vpp=False)