-# Copyright (c) 2017 Cisco and/or its affiliates.
+# Copyright (c) 2018 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
"""Library to control Kubernetes kubectl."""
-import time
-import yaml
+from time import sleep
-from resources.libraries.python.constants import Constants
+from resources.libraries.python.Constants import Constants
from resources.libraries.python.topology import NodeType
-from resources.libraries.python.ssh import SSH
+from resources.libraries.python.ssh import SSH, exec_cmd_no_error
from resources.libraries.python.CpuUtils import CpuUtils
from resources.libraries.python.VppConfigGenerator import VppConfigGenerator
__all__ = ["KubernetesUtils"]
+# Maximum number of retries to check if PODs are running or deleted.
+MAX_RETRY = 48
class KubernetesUtils(object):
"""Kubernetes utilities class."""
"""Initialize KubernetesUtils class."""
pass
+ @staticmethod
+ def load_docker_image_on_node(node, image_path):
+ """Load Docker container image from file on node.
+
+ :param node: DUT node.
+ :param image_path: Container image path.
+ :type node: dict
+ :type image_path: str
+ :raises RuntimeError: If loading image failed on node.
+ """
+ command = 'docker load -i {image_path}'.\
+ format(image_path=image_path)
+ message = 'Failed to load Docker image on {node}.'.\
+ format(node=node['host'])
+ exec_cmd_no_error(node, command, timeout=240, sudo=True,
+ message=message)
+
+ command = "docker rmi $(sudo docker images -f 'dangling=true' -q)".\
+ format(image_path=image_path)
+ message = 'Failed to clean Docker images on {node}.'.\
+ format(node=node['host'])
+ try:
+ exec_cmd_no_error(node, command, timeout=240, sudo=True,
+ message=message)
+ except RuntimeError:
+ pass
+
+ @staticmethod
+ def load_docker_image_on_all_duts(nodes, image_path):
+ """Load Docker container image from file on all DUTs.
+
+ :param nodes: Topology nodes.
+ :param image_path: Container image path.
+ :type nodes: dict
+ :type image_path: str
+ """
+ for node in nodes.values():
+ if node['type'] == NodeType.DUT:
+ KubernetesUtils.load_docker_image_on_node(node, image_path)
+
@staticmethod
def setup_kubernetes_on_node(node):
"""Set up Kubernetes on node.
ssh = SSH()
ssh.connect(node)
- cmd = '{dir}/{lib}/k8s_setup.sh '.format(dir=Constants.REMOTE_FW_DIR,
- lib=Constants.RESOURCES_LIB_SH)
- (ret_code, _, _) = ssh.exec_command(cmd, timeout=120)
+ cmd = '{dir}/{lib}/k8s_setup.sh deploy_calico'\
+ .format(dir=Constants.REMOTE_FW_DIR,
+ lib=Constants.RESOURCES_LIB_SH)
+ (ret_code, _, _) = ssh.exec_command(cmd, timeout=240)
if int(ret_code) != 0:
raise RuntimeError('Failed to setup Kubernetes on {node}.'
.format(node=node['host']))
if node['type'] == NodeType.DUT:
KubernetesUtils.setup_kubernetes_on_node(node)
+ @staticmethod
+ def destroy_kubernetes_on_node(node):
+ """Destroy Kubernetes on node.
+
+ :param node: DUT node.
+ :type node: dict
+ :raises RuntimeError: If destroying Kubernetes failed.
+ """
+ ssh = SSH()
+ ssh.connect(node)
+
+ cmd = '{dir}/{lib}/k8s_setup.sh destroy'\
+ .format(dir=Constants.REMOTE_FW_DIR,
+ lib=Constants.RESOURCES_LIB_SH)
+ (ret_code, _, _) = ssh.exec_command(cmd, timeout=120)
+ if int(ret_code) != 0:
+ raise RuntimeError('Failed to destroy Kubernetes on {node}.'
+ .format(node=node['host']))
+
+ @staticmethod
+ def destroy_kubernetes_on_all_duts(nodes):
+ """Destroy Kubernetes on all DUTs.
+
+ :param nodes: Topology nodes.
+ :type nodes: dict
+ """
+ for node in nodes.values():
+ if node['type'] == NodeType.DUT:
+ KubernetesUtils.destroy_kubernetes_on_node(node)
+
@staticmethod
def apply_kubernetes_resource_on_node(node, yaml_file, **kwargs):
"""Apply Kubernetes resource on node.
ssh = SSH()
ssh.connect(node)
- stream = file('{tpl}/{yaml}'.format(tpl=Constants.RESOURCES_TPL_K8S,
- yaml=yaml_file), 'r')
-
- for data in yaml.load_all(stream):
+ fqn_file = '{tpl}/{yaml}'.format(tpl=Constants.RESOURCES_TPL_K8S,
+ yaml=yaml_file)
+ with open(fqn_file, 'r') as src_file:
+ stream = src_file.read()
data = reduce(lambda a, kv: a.replace(*kv), kwargs.iteritems(),
- yaml.dump(data, default_flow_style=False))
- # Workaround to avoid using RAW string anotated with | in YAML as
- # library + bash is misinterpreting spaces.
- data = data.replace('.conf:\n', '.conf: |\n')
+ stream)
cmd = 'cat <<EOF | kubectl apply -f - \n{data}\nEOF'.format(
data=data)
- (ret_code, _, _) = ssh.exec_command_sudo(cmd, timeout=120)
+ (ret_code, _, _) = ssh.exec_command_sudo(cmd)
if int(ret_code) != 0:
raise RuntimeError('Failed to apply Kubernetes template {yaml} '
'on {node}.'.format(yaml=yaml_file,
**kwargs)
@staticmethod
- def create_kubernetes_cm_from_file_on_node(node, name, key, src_file):
+ def create_kubernetes_cm_from_file_on_node(node, nspace, name, **kwargs):
"""Create Kubernetes ConfigMap from file on node.
:param node: DUT node.
+ :param nspace: Kubernetes namespace.
:param name: ConfigMap name.
- :param key: Key (destination file).
- :param src_file: Source file.
+ :param kwargs: Named parameters.
:type node: dict
+ :type nspace: str
:type name: str
- :type key: str
- :type src_file: str
+ :param kwargs: dict
:raises RuntimeError: If creating Kubernetes ConfigMap failed.
"""
ssh = SSH()
ssh.connect(node)
- cmd = 'kubectl create -n csit configmap {name} --from-file={key}='\
- '{src_file}'.format(name=name, key=key, src_file=src_file)
- (ret_code, _, _) = ssh.exec_command_sudo(cmd, timeout=120)
+ nspace = '-n {nspace}'.format(nspace=nspace) if nspace else ''
+
+ from_file = '{0}'.format(' '.join('--from-file={0}={1} '\
+ .format(key, kwargs[key]) for key in kwargs))
+
+ cmd = 'kubectl create {nspace} configmap {name} {from_file}'\
+ .format(nspace=nspace, name=name, from_file=from_file)
+ (ret_code, _, _) = ssh.exec_command_sudo(cmd)
if int(ret_code) != 0:
- raise RuntimeError('Failed to create Kubernetes ConfigMap {name} '
- 'on {node}.'.format(name=name,
- node=node['host']))
+ raise RuntimeError('Failed to create Kubernetes ConfigMap '
+ 'on {node}.'.format(node=node['host']))
@staticmethod
- def create_kubernetes_cm_from_file_on_all_duts(nodes, name, key, src_file):
+ def create_kubernetes_cm_from_file_on_all_duts(nodes, nspace, name,
+ **kwargs):
"""Create Kubernetes ConfigMap from file on all DUTs.
:param nodes: Topology nodes.
+ :param nspace: Kubernetes namespace.
:param name: ConfigMap name.
- :param key: Key (destination file).
- :param src_file: Source file.
+ :param kwargs: Named parameters.
:type nodes: dict
+ :type nspace: str
:type name: str
- :type key: str
- :type src_file: str
+ :param kwargs: dict
"""
for node in nodes.values():
if node['type'] == NodeType.DUT:
KubernetesUtils.create_kubernetes_cm_from_file_on_node(node,
+ nspace,
name,
- key,
- src_file)
+ **kwargs)
@staticmethod
- def delete_kubernetes_resource_on_node(node, rtype='po,cm', name=None):
+ def delete_kubernetes_resource_on_node(node, nspace, name=None,
+ rtype='po,cm,deploy,rs,rc,svc'):
"""Delete Kubernetes resource on node.
:param node: DUT node.
+ :param nspace: Kubernetes namespace.
:param rtype: Kubernetes resource type.
- :param name: Name of resource.
+ :param name: Name of resource (Default: all).
:type node: dict
+ :type nspace: str
:type rtype: str
:type name: str
- :raises RuntimeError: If deleting Kubernetes resource failed.
+ :raises RuntimeError: If retrieving or deleting Kubernetes resource
+ failed.
"""
ssh = SSH()
ssh.connect(node)
name = '{name}'.format(name=name) if name else '--all'
+ nspace = '-n {nspace}'.format(nspace=nspace) if nspace else ''
- cmd = 'kubectl delete -n csit {rtype} {name}'\
- .format(rtype=rtype, name=name)
+ cmd = 'kubectl delete {nspace} {rtype} {name}'\
+ .format(nspace=nspace, rtype=rtype, name=name)
(ret_code, _, _) = ssh.exec_command_sudo(cmd, timeout=120)
if int(ret_code) != 0:
- raise RuntimeError('Failed to delete Kubernetes resources in CSIT '
- 'namespace on {node}.'.format(node=node['host']))
+ raise RuntimeError('Failed to delete Kubernetes resources '
+ 'on {node}.'.format(node=node['host']))
- cmd = 'kubectl get -n csit pods --no-headers'
- for _ in range(24):
- (ret_code, stdout, _) = ssh.exec_command_sudo(cmd, timeout=120)
- if int(ret_code) == 0:
- ready = True
+ cmd = 'kubectl get {nspace} pods --no-headers'\
+ .format(nspace=nspace)
+ for _ in range(MAX_RETRY):
+ (ret_code, stdout, stderr) = ssh.exec_command_sudo(cmd)
+ if int(ret_code) != 0:
+ raise RuntimeError('Failed to retrieve Kubernetes resources on '
+ '{node}.'.format(node=node['host']))
+ if name == '--all':
+ ready = False
+ for line in stderr.splitlines():
+ if 'No resources found.' in line:
+ ready = True
+ if ready:
+ break
+ else:
+ ready = False
for line in stdout.splitlines():
- if 'No resources found.' not in line:
+ try:
+ state = line.split()[1].split('/')
+ ready = True if 'Running' in line and\
+ state == state[::-1] else False
+ if not ready:
+ break
+ except (ValueError, IndexError):
ready = False
if ready:
break
- time.sleep(5)
+ sleep(5)
else:
- raise RuntimeError('Failed to delete Kubernetes resources in CSIT '
- 'namespace on {node}.'.format(node=node['host']))
+ raise RuntimeError('Failed to delete Kubernetes resources on '
+ '{node}.'.format(node=node['host']))
@staticmethod
- def delete_kubernetes_resource_on_all_duts(nodes, rtype='po,cm', name=None):
+ def delete_kubernetes_resource_on_all_duts(nodes, nspace, name=None,
+ rtype='po,cm,deploy,rs,rc,svc'):
"""Delete all Kubernetes resource on all DUTs.
:param nodes: Topology nodes.
+ :param nspace: Kubernetes namespace.
:param rtype: Kubernetes resource type.
:param name: Name of resource.
:type nodes: dict
+ :type nspace: str
:type rtype: str
:type name: str
"""
for node in nodes.values():
if node['type'] == NodeType.DUT:
- KubernetesUtils.delete_kubernetes_resource_on_node(node, rtype,
- name)
+ KubernetesUtils.delete_kubernetes_resource_on_node(node, nspace,
+ name, rtype)
@staticmethod
- def describe_kubernetes_resource_on_node(node, rtype='po,cm'):
- """Describe Kubernetes resource on node.
+ def describe_kubernetes_resource_on_node(node, nspace):
+ """Describe all Kubernetes PODs in namespace on node.
:param node: DUT node.
- :param rtype: Kubernetes resource type.
+ :param nspace: Kubernetes namespace.
:type node: dict
- :type rtype: str
- :raises RuntimeError: If describing Kubernetes resource failed.
+ :type nspace: str
"""
ssh = SSH()
ssh.connect(node)
- cmd = 'kubectl describe -n csit {rtype}'.format(rtype=rtype)
- (ret_code, _, _) = ssh.exec_command_sudo(cmd, timeout=120)
- if int(ret_code) != 0:
- raise RuntimeError('Failed to describe Kubernetes resource on '
- '{node}.'.format(node=node['host']))
+ nspace = '-n {nspace}'.format(nspace=nspace) if nspace else ''
+
+ cmd = 'kubectl describe {nspace} all'.format(nspace=nspace)
+ ssh.exec_command_sudo(cmd)
@staticmethod
- def describe_kubernetes_resource_on_all_duts(nodes, rtype='po,cm'):
- """Describe Kubernetes resource on all DUTs.
+ def describe_kubernetes_resource_on_all_duts(nodes, nspace):
+ """Describe all Kubernetes PODs in namespace on all DUTs.
:param nodes: Topology nodes.
- :param rtype: Kubernetes resource type.
+ :param nspace: Kubernetes namespace.
:type nodes: dict
- :type rtype: str
+ :type nspace: str
"""
for node in nodes.values():
if node['type'] == NodeType.DUT:
KubernetesUtils.describe_kubernetes_resource_on_node(node,
- rtype)
+ nspace)
@staticmethod
- def get_kubernetes_logs_on_node(node, nspace='csit'):
- """Get Kubernetes logs on node.
+ def get_kubernetes_logs_on_node(node, nspace):
+ """Get Kubernetes logs from all PODs in namespace on node.
:param node: DUT node.
:param nspace: Kubernetes namespace.
ssh = SSH()
ssh.connect(node)
- cmd = "for p in $(kubectl get pods -n {namespace} --no-headers"\
- " | cut -f 1 -d ' '); do echo $p; kubectl logs -n {namespace} $p; "\
- "done".format(namespace=nspace)
- ssh.exec_command(cmd, timeout=120)
+ nspace = '-n {nspace}'.format(nspace=nspace) if nspace else ''
+
+ cmd = "for p in $(kubectl get pods {nspace} -o jsonpath="\
+ "'{{.items[*].metadata.name}}'); do echo $p; kubectl logs "\
+ "{nspace} $p; done".format(nspace=nspace)
+ ssh.exec_command(cmd)
+
+ cmd = "kubectl exec {nspace} etcdv3 -- etcdctl --endpoints "\
+ "\"localhost:22379\" get \"/\" --prefix=true".format(nspace=nspace)
+ ssh.exec_command(cmd)
@staticmethod
- def get_kubernetes_logs_on_all_duts(nodes, nspace='csit'):
- """Get Kubernetes logs on all DUTs.
+ def get_kubernetes_logs_on_all_duts(nodes, nspace):
+ """Get Kubernetes logs from all PODs in namespace on all DUTs.
:param nodes: Topology nodes.
:param nspace: Kubernetes namespace.
KubernetesUtils.get_kubernetes_logs_on_node(node, nspace)
@staticmethod
- def reset_kubernetes_on_node(node):
- """Reset Kubernetes on node.
-
- :param node: DUT node.
- :type node: dict
- :raises RuntimeError: If resetting Kubernetes failed.
- """
- ssh = SSH()
- ssh.connect(node)
-
- cmd = 'kubeadm reset && rm -rf $HOME/.kube'
- (ret_code, _, _) = ssh.exec_command_sudo(cmd, timeout=120)
- if int(ret_code) != 0:
- raise RuntimeError('Failed to reset Kubernetes on {node}.'
- .format(node=node['host']))
-
- @staticmethod
- def reset_kubernetes_on_all_duts(nodes):
- """Reset Kubernetes on all DUTs.
-
- :param nodes: Topology nodes.
- :type nodes: dict
- """
- for node in nodes.values():
- if node['type'] == NodeType.DUT:
- KubernetesUtils.reset_kubernetes_on_node(node)
-
- @staticmethod
- def wait_for_kubernetes_pods_on_node(node, nspace='csit'):
- """Wait for Kubernetes PODs to become in 'Running' state on node.
+ def wait_for_kubernetes_pods_on_node(node, nspace):
+ """Wait for Kubernetes PODs to become ready on node.
:param node: DUT node.
:param nspace: Kubernetes namespace.
:type node: dict
:type nspace: str
- :raises RuntimeError: If Kubernetes PODs are not ready.
+ :raises RuntimeError: If Kubernetes PODs are not in Running state.
"""
ssh = SSH()
ssh.connect(node)
- cmd = 'kubectl get -n {namespace} pods --no-headers'\
- .format(namespace=nspace)
- for _ in range(48):
- (ret_code, stdout, _) = ssh.exec_command_sudo(cmd, timeout=120)
+ nspace = '-n {nspace}'.format(nspace=nspace) if nspace \
+ else '--all-namespaces'
+
+ cmd = 'kubectl get {nspace} pods --no-headers' \
+ .format(nspace=nspace)
+ for _ in range(MAX_RETRY):
+ (ret_code, stdout, _) = ssh.exec_command_sudo(cmd)
if int(ret_code) == 0:
ready = False
for line in stdout.splitlines():
try:
state = line.split()[1].split('/')
- ready = True if 'Running' in line and\
+ ready = True if 'Running' in line and \
state == state[::-1] else False
if not ready:
break
- except ValueError, IndexError:
+ except (ValueError, IndexError):
ready = False
if ready:
break
- time.sleep(5)
+ sleep(5)
else:
- raise RuntimeError('Kubernetes PODs are not ready on {node}.'
+ raise RuntimeError('Kubernetes PODs are not running on {node}.'
.format(node=node['host']))
@staticmethod
- def wait_for_kubernetes_pods_on_all_duts(nodes, nspace='csit'):
- """Wait for Kubernetes PODs to become in Running state on all DUTs.
+ def wait_for_kubernetes_pods_on_all_duts(nodes, nspace):
+ """Wait for Kubernetes to become ready on all DUTs.
:param nodes: Topology nodes.
:param nspace: Kubernetes namespace.
if node['type'] == NodeType.DUT:
KubernetesUtils.wait_for_kubernetes_pods_on_node(node, nspace)
+ @staticmethod
+ def set_kubernetes_pods_affinity_on_node(node):
+ """Set affinity for all Kubernetes PODs except VPP on node.
+
+ :param node: DUT node.
+ :type node: dict
+ """
+ ssh = SSH()
+ ssh.connect(node)
+
+ cmd = '{dir}/{lib}/k8s_setup.sh affinity_non_vpp'\
+ .format(dir=Constants.REMOTE_FW_DIR,
+ lib=Constants.RESOURCES_LIB_SH)
+ ssh.exec_command(cmd)
+
+ @staticmethod
+ def set_kubernetes_pods_affinity_on_all_duts(nodes):
+ """Set affinity for all Kubernetes PODs except VPP on all DUTs.
+
+ :param nodes: Topology nodes.
+ :type nodes: dict
+ """
+ for node in nodes.values():
+ if node['type'] == NodeType.DUT:
+ KubernetesUtils.set_kubernetes_pods_affinity_on_node(node)
+
@staticmethod
def create_kubernetes_vswitch_startup_config(**kwargs):
"""Create Kubernetes VSWITCH startup configuration.
:param kwargs: Key-value pairs used to create configuration.
:param kwargs: dict
"""
+ smt_used = CpuUtils.is_smt_enabled(kwargs['node']['cpuinfo'])
+
cpuset_cpus = \
CpuUtils.cpu_slice_of_list_per_node(node=kwargs['node'],
cpu_node=kwargs['cpu_node'],
- skip_cnt=kwargs['cpu_skip'],
- cpu_cnt=kwargs['cpu_cnt'],
- smt_used=kwargs['smt_used'])
+ skip_cnt=2,
+ cpu_cnt=kwargs['phy_cores'],
+ smt_used=smt_used)
+ cpuset_main = \
+ CpuUtils.cpu_slice_of_list_per_node(node=kwargs['node'],
+ cpu_node=kwargs['cpu_node'],
+ skip_cnt=1,
+ cpu_cnt=1,
+ smt_used=smt_used)
# Create config instance
vpp_config = VppConfigGenerator()
vpp_config.set_node(kwargs['node'])
vpp_config.add_unix_cli_listen(value='0.0.0.0:5002')
vpp_config.add_unix_nodaemon()
- vpp_config.add_dpdk_socketmem('1024,1024')
- vpp_config.add_heapsize('3G')
+ vpp_config.add_socksvr()
+ vpp_config.add_heapsize('4G')
+ vpp_config.add_ip_heap_size('4G')
+ vpp_config.add_ip6_heap_size('4G')
vpp_config.add_ip6_hash_buckets('2000000')
- vpp_config.add_ip6_heap_size('3G')
- if kwargs['framesize'] < 1522:
+ if not kwargs['jumbo']:
vpp_config.add_dpdk_no_multi_seg()
- vpp_config.add_dpdk_dev_default_rxq(kwargs['rxq'])
+ vpp_config.add_dpdk_no_tx_checksum_offload()
+ vpp_config.add_dpdk_dev_default_rxq(kwargs['rxq_count_int'])
vpp_config.add_dpdk_dev(kwargs['if1'], kwargs['if2'])
+ vpp_config.add_buffers_per_numa(kwargs['buffers_per_numa'])
# We will pop first core from list to be main core
- vpp_config.add_cpu_main_core(str(cpuset_cpus.pop(0)))
+ vpp_config.add_cpu_main_core(str(cpuset_main.pop(0)))
# if this is not only core in list, the rest will be used as workers.
if cpuset_cpus:
corelist_workers = ','.join(str(cpu) for cpu in cpuset_cpus)
vpp_config.add_cpu_corelist_workers(corelist_workers)
- vpp_config.apply_config(filename=kwargs['filename'], restart_vpp=False)
+ vpp_config.write_config(filename=kwargs['filename'])
@staticmethod
def create_kubernetes_vnf_startup_config(**kwargs):
:param kwargs: Key-value pairs used to create configuration.
:param kwargs: dict
"""
- skip_cnt = kwargs['cpu_skip'] + (kwargs['i'] - 1) * kwargs['cpu_cnt']
+ smt_used = CpuUtils.is_smt_enabled(kwargs['node']['cpuinfo'])
+ skip_cnt = kwargs['cpu_skip'] + (kwargs['i'] - 1) * \
+ (kwargs['phy_cores'] - 1)
cpuset_cpus = \
CpuUtils.cpu_slice_of_list_per_node(node=kwargs['node'],
cpu_node=kwargs['cpu_node'],
skip_cnt=skip_cnt,
- cpu_cnt=kwargs['cpu_cnt'],
- smt_used=kwargs['smt_used'])
-
+ cpu_cnt=kwargs['phy_cores']-1,
+ smt_used=smt_used)
+ cpuset_main = \
+ CpuUtils.cpu_slice_of_list_per_node(node=kwargs['node'],
+ cpu_node=kwargs['cpu_node'],
+ skip_cnt=1,
+ cpu_cnt=1,
+ smt_used=smt_used)
# Create config instance
vpp_config = VppConfigGenerator()
vpp_config.set_node(kwargs['node'])
vpp_config.add_unix_cli_listen(value='0.0.0.0:5002')
vpp_config.add_unix_nodaemon()
+ vpp_config.add_socksvr()
# We will pop first core from list to be main core
- vpp_config.add_cpu_main_core(str(cpuset_cpus.pop(0)))
+ vpp_config.add_cpu_main_core(str(cpuset_main.pop(0)))
# if this is not only core in list, the rest will be used as workers.
if cpuset_cpus:
corelist_workers = ','.join(str(cpu) for cpu in cpuset_cpus)
vpp_config.add_cpu_corelist_workers(corelist_workers)
- vpp_config.add_plugin_disable('dpdk_plugin.so')
- vpp_config.apply_config(filename=kwargs['filename'], restart_vpp=False)
+ vpp_config.add_plugin('disable', 'dpdk_plugin.so')
+ vpp_config.write_config(filename=kwargs['filename'])