# Copyright (c) 2018 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """wrk implementation into CSIT framework. """ import re from robot.api import logger from resources.libraries.python.ssh import SSH from resources.libraries.python.topology import NodeType from resources.libraries.python.CpuUtils import CpuUtils from resources.libraries.python.constants import Constants from resources.tools.wrk.wrk_traffic_profile_parser import WrkTrafficProfile from resources.tools.wrk.wrk_errors import WrkError REGEX_LATENCY_STATS = \ r"Latency\s*" \ r"(\d*\.*\d*\S*)\s*" \ r"(\d*\.*\d*\S*)\s*" \ r"(\d*\.*\d*\S*)\s*" \ r"(\d*\.*\d*\%)" REGEX_RPS_STATS = \ r"Req/Sec\s*" \ r"(\d*\.*\d*\S*)\s*" \ r"(\d*\.*\d*\S*)\s*" \ r"(\d*\.*\d*\S*)\s*" \ r"(\d*\.*\d*\%)" REGEX_RPS = r"Requests/sec:\s*" \ r"(\d*\.*\S*)" REGEX_BW = r"Transfer/sec:\s*" \ r"(\d*\.*\S*)" REGEX_LATENCY_DIST = \ r"Latency Distribution\n" \ r"\s*50\%\s*(\d*\.*\d*\D*)\n" \ r"\s*75\%\s*(\d*\.*\d*\D*)\n" \ r"\s*90\%\s*(\d*\.*\d*\D*)\n" \ r"\s*99\%\s*(\d*\.*\d*\D*)\n" # Split number and multiplicand, e.g. 14.25k --> 14.25 and k REGEX_NUM = r"(\d*\.*\d*)(\D*)" def install_wrk(tg_node): """Install wrk on the TG node. :param tg_node: Traffic generator node. :type tg_node: dict :raises: RuntimeError if the given node is not a TG node or if the installation fails. """ if tg_node['type'] != NodeType.TG: raise RuntimeError('Node type is not a TG.') ssh = SSH() ssh.connect(tg_node) ret, _, _ = ssh.exec_command( "sudo -E " "sh -c '{0}/resources/tools/wrk/wrk_utils.sh install false'". format(Constants.REMOTE_FW_DIR), timeout=1800) if int(ret) != 0: raise RuntimeError('Installation of wrk on TG node failed.') def destroy_wrk(tg_node): """Destroy wrk on the TG node. :param tg_node: Traffic generator node. :type tg_node: dict :raises: RuntimeError if the given node is not a TG node or the removal of wrk failed. """ if tg_node['type'] != NodeType.TG: raise RuntimeError('Node type is not a TG.') ssh = SSH() ssh.connect(tg_node) ret, _, _ = ssh.exec_command( "sudo -E " "sh -c '{0}/resources/tools/wrk/wrk_utils.sh destroy'". format(Constants.REMOTE_FW_DIR), timeout=1800) if int(ret) != 0: raise RuntimeError('Removal of wrk from the TG node failed.') def run_wrk(tg_node, profile_name, tg_numa, test_type): """Send the traffic as defined in the profile. :param tg_node: Traffic generator node. :param profile_name: The name of wrk traffic profile. :param tg_numa: Numa node on which wrk will run. :param test_type: The type of the tests: cps, rps, bw :type profile_name: str :type tg_node: dict :type tg_numa: int :type test_type: str :returns: Message with measured data. :rtype: str :raises: RuntimeError if node type is not a TG. """ if tg_node['type'] != NodeType.TG: raise RuntimeError('Node type is not a TG.') # Parse and validate the profile profile_path = ("resources/traffic_profiles/wrk/{0}.yaml". format(profile_name)) profile = WrkTrafficProfile(profile_path).traffic_profile cores = CpuUtils.cpu_list_per_node(tg_node, tg_numa) first_cpu = cores[profile["first-cpu"]] if len(profile["urls"]) == 1 and profile["cpus"] == 1: params = [ "traffic_1_url_1_core", str(first_cpu), str(profile["nr-of-threads"]), str(profile["nr-of-connections"]), "{0}s".format(profile["duration"]), "'{0}'".format(profile["header"]), str(profile["timeout"]), str(profile["script"]), str(profile["latency"]), "'{0}'".format(" ".join(profile["urls"])) ] elif len(profile["urls"]) == profile["cpus"]: params = [ "traffic_n_urls_n_cores", str(first_cpu), str(profile["nr-of-threads"]), str(profile["nr-of-connections"]), "{0}s".format(profile["duration"]), "'{0}'".format(profile["header"]), str(profile["timeout"]), str(profile["script"]), str(profile["latency"]), "'{0}'".format(" ".join(profile["urls"])) ] else: params = [ "traffic_n_urls_m_cores", str(first_cpu), str(profile["cpus"] / len(profile["urls"])), str(profile["nr-of-threads"]), str(profile["nr-of-connections"]), "{0}s".format(profile["duration"]), "'{0}'".format(profile["header"]), str(profile["timeout"]), str(profile["script"]), str(profile["latency"]), "'{0}'".format(" ".join(profile["urls"])) ] args = " ".join(params) ssh = SSH() ssh.connect(tg_node) ret, stdout, _ = ssh.exec_command( "{0}/resources/tools/wrk/wrk_utils.sh {1}". format(Constants.REMOTE_FW_DIR, args), timeout=1800) if int(ret) != 0: raise RuntimeError('wrk runtime error.') stats = _parse_wrk_output(stdout) log_msg = "\nMeasured values:\n" if test_type == "cps": log_msg += "Connections/sec: Avg / Stdev / Max / +/- Stdev\n" for item in stats["rps-stats-lst"]: log_msg += "{0} / {1} / {2} / {3}\n".format(*item) log_msg += "Total cps: {0}cps\n".format(stats["rps-sum"]) elif test_type == "rps": log_msg += "Requests/sec: Avg / Stdev / Max / +/- Stdev\n" for item in stats["rps-stats-lst"]: log_msg += "{0} / {1} / {2} / {3}\n".format(*item) log_msg += "Total rps: {0}cps\n".format(stats["rps-sum"]) elif test_type == "bw": log_msg += "Transfer/sec: {0}Bps".format(stats["bw-sum"]) logger.info(log_msg) return log_msg def _parse_wrk_output(msg): """Parse the wrk stdout with the results. :param msg: stdout of wrk. :type msg: str :returns: Parsed results. :rtype: dict :raises: WrkError if the message does not include the results. """ if "Thread Stats" not in msg: raise WrkError("The output of wrk does not include the results.") msg_lst = msg.splitlines(False) stats = { "latency-dist-lst": list(), "latency-stats-lst": list(), "rps-stats-lst": list(), "rps-lst": list(), "bw-lst": list(), "rps-sum": 0, "bw-sum": None } for line in msg_lst: if "Latency Distribution" in line: # Latency distribution - 50%, 75%, 90%, 99% pass elif "Latency" in line: # Latency statistics - Avg, Stdev, Max, +/- Stdev pass elif "Req/Sec" in line: # rps statistics - Avg, Stdev, Max, +/- Stdev stats["rps-stats-lst"].append(( _evaluate_number(re.search(REGEX_RPS_STATS, line).group(1)), _evaluate_number(re.search(REGEX_RPS_STATS, line).group(2)), _evaluate_number(re.search(REGEX_RPS_STATS, line).group(3)), _evaluate_number(re.search(REGEX_RPS_STATS, line).group(4)))) elif "Requests/sec:" in line: # rps (cps) stats["rps-lst"].append( _evaluate_number(re.search(REGEX_RPS, line).group(1))) elif "Transfer/sec:" in line: # BW stats["bw-lst"].append( _evaluate_number(re.search(REGEX_BW, line).group(1))) for item in stats["rps-stats-lst"]: stats["rps-sum"] += item[0] stats["bw-sum"] = sum(stats["bw-lst"]) return stats def _evaluate_number(num): """Evaluate the numeric value of the number with multiplicands, e.g.: 12.25k --> 12250 :param num: Number to evaluate. :type num: str :returns: Evaluated number. :rtype: float :raises: WrkError if it is not possible to evaluate the given number. """ val = re.search(REGEX_NUM, num) try: val_num = float(val.group(1)) except ValueError: raise WrkError("The output of wrk does not include the results " "or the format of results has changed.") val_mul = val.group(2).lower() if val_mul: if "k" in val_mul: val_num *= 1000 elif "m" in val_mul: val_num *= 1000000 elif "g" in val_mul: val_num *= 1000000000 elif "b" in val_mul: pass elif "%" in val_mul: pass elif "" in val_mul: pass else: raise WrkError("The multiplicand {0} is not defined.". format(val_mul)) return val_num