CSIT-866: wrk onboarding in CSIT
[csit.git] / resources / tools / wrk / wrk.py
diff --git a/resources/tools/wrk/wrk.py b/resources/tools/wrk/wrk.py
new file mode 100644 (file)
index 0000000..33cfd08
--- /dev/null
@@ -0,0 +1,291 @@
+# Copyright (c) 2018 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""wrk implementation into CSIT framework.
+"""
+
+import re
+
+from robot.api import logger
+
+from resources.libraries.python.ssh import SSH
+from resources.libraries.python.topology import NodeType
+from resources.libraries.python.CpuUtils import CpuUtils
+from resources.libraries.python.constants import Constants
+
+from resources.tools.wrk.wrk_traffic_profile_parser import WrkTrafficProfile
+from resources.tools.wrk.wrk_errors import WrkError
+
+
+REGEX_LATENCY_STATS = \
+    r"Latency\s*" \
+    r"(\d*\.*\d*\S*)\s*" \
+    r"(\d*\.*\d*\S*)\s*" \
+    r"(\d*\.*\d*\S*)\s*" \
+    r"(\d*\.*\d*\%)"
+REGEX_RPS_STATS = \
+    r"Req/Sec\s*" \
+    r"(\d*\.*\d*\S*)\s*" \
+    r"(\d*\.*\d*\S*)\s*" \
+    r"(\d*\.*\d*\S*)\s*" \
+    r"(\d*\.*\d*\%)"
+REGEX_RPS = r"Requests/sec:\s*" \
+            r"(\d*\.*\S*)"
+REGEX_BW = r"Transfer/sec:\s*" \
+           r"(\d*\.*\S*)"
+REGEX_LATENCY_DIST = \
+    r"Latency Distribution\n" \
+    r"\s*50\%\s*(\d*\.*\d*\D*)\n" \
+    r"\s*75\%\s*(\d*\.*\d*\D*)\n" \
+    r"\s*90\%\s*(\d*\.*\d*\D*)\n" \
+    r"\s*99\%\s*(\d*\.*\d*\D*)\n"
+
+# Split number and multiplicand, e.g. 14.25k --> 14.25 and k
+REGEX_NUM = r"(\d*\.*\d*)(\D*)"
+
+
+def install_wrk(tg_node):
+    """Install wrk on the TG node.
+
+    :param tg_node: Traffic generator node.
+    :type tg_node: dict
+    :raises: RuntimeError if the given node is not a TG node or if the
+    installation fails.
+    """
+
+    if tg_node['type'] != NodeType.TG:
+        raise RuntimeError('Node type is not a TG.')
+
+    ssh = SSH()
+    ssh.connect(tg_node)
+
+    ret, _, _ = ssh.exec_command(
+        "sudo -E "
+        "sh -c '{0}/resources/tools/wrk/wrk_utils.sh install false'".
+        format(Constants.REMOTE_FW_DIR), timeout=1800)
+    if int(ret) != 0:
+        raise RuntimeError('Installation of wrk on TG node failed.')
+
+
+def destroy_wrk(tg_node):
+    """Destroy wrk on the TG node.
+
+    :param tg_node: Traffic generator node.
+    :type tg_node: dict
+    :raises: RuntimeError if the given node is not a TG node or the removal of
+    wrk failed.
+    """
+
+    if tg_node['type'] != NodeType.TG:
+        raise RuntimeError('Node type is not a TG.')
+
+    ssh = SSH()
+    ssh.connect(tg_node)
+
+    ret, _, _ = ssh.exec_command(
+        "sudo -E "
+        "sh -c '{0}/resources/tools/wrk/wrk_utils.sh destroy'".
+        format(Constants.REMOTE_FW_DIR), timeout=1800)
+    if int(ret) != 0:
+        raise RuntimeError('Removal of wrk from the TG node failed.')
+
+
+def run_wrk(tg_node, profile_name, tg_numa, test_type):
+    """Send the traffic as defined in the profile.
+
+    :param tg_node: Traffic generator node.
+    :param profile_name: The name of wrk traffic profile.
+    :param tg_numa: Numa node on which wrk will run.
+    :param test_type: The type of the tests: cps, rps, bw
+    :type profile_name: str
+    :type tg_node: dict
+    :type tg_numa: int
+    :type test_type: str
+    :returns: Message with measured data.
+    :rtype: str
+    :raises: RuntimeError if node type is not a TG.
+    """
+
+    if tg_node['type'] != NodeType.TG:
+        raise RuntimeError('Node type is not a TG.')
+
+    # Parse and validate the profile
+    profile_path = ("resources/traffic_profiles/wrk/{0}.yaml".
+                    format(profile_name))
+    profile = WrkTrafficProfile(profile_path).traffic_profile
+
+    cores = CpuUtils.cpu_list_per_node(tg_node, tg_numa)
+    first_cpu = cores[profile["first-cpu"]]
+
+    if len(profile["urls"]) == 1 and profile["cpus"] == 1:
+        params = [
+            "traffic_1_url_1_core",
+            str(first_cpu),
+            str(profile["nr-of-threads"]),
+            str(profile["nr-of-connections"]),
+            "{0}s".format(profile["duration"]),
+            "'{0}'".format(profile["header"]),
+            str(profile["timeout"]),
+            str(profile["script"]),
+            str(profile["latency"]),
+            "'{0}'".format(" ".join(profile["urls"]))
+        ]
+    elif len(profile["urls"]) == profile["cpus"]:
+        params = [
+            "traffic_n_urls_n_cores",
+            str(first_cpu),
+            str(profile["nr-of-threads"]),
+            str(profile["nr-of-connections"]),
+            "{0}s".format(profile["duration"]),
+            "'{0}'".format(profile["header"]),
+            str(profile["timeout"]),
+            str(profile["script"]),
+            str(profile["latency"]),
+            "'{0}'".format(" ".join(profile["urls"]))
+        ]
+    else:
+        params = [
+            "traffic_n_urls_m_cores",
+            str(first_cpu),
+            str(profile["cpus"] / len(profile["urls"])),
+            str(profile["nr-of-threads"]),
+            str(profile["nr-of-connections"]),
+            "{0}s".format(profile["duration"]),
+            "'{0}'".format(profile["header"]),
+            str(profile["timeout"]),
+            str(profile["script"]),
+            str(profile["latency"]),
+            "'{0}'".format(" ".join(profile["urls"]))
+        ]
+    args = " ".join(params)
+
+    ssh = SSH()
+    ssh.connect(tg_node)
+
+    ret, stdout, _ = ssh.exec_command(
+        "{0}/resources/tools/wrk/wrk_utils.sh {1}".
+        format(Constants.REMOTE_FW_DIR, args), timeout=1800)
+    if int(ret) != 0:
+        raise RuntimeError('wrk runtime error.')
+
+    stats = _parse_wrk_output(stdout)
+
+    log_msg = "\nMeasured values:\n"
+    if test_type == "cps":
+        log_msg += "Connections/sec: Avg / Stdev / Max  / +/- Stdev\n"
+        for item in stats["rps-stats-lst"]:
+            log_msg += "{0} / {1} / {2} / {3}\n".format(*item)
+        log_msg += "Total cps: {0}cps\n".format(stats["rps-sum"])
+    elif test_type == "rps":
+        log_msg += "Requests/sec: Avg / Stdev / Max  / +/- Stdev\n"
+        for item in stats["rps-stats-lst"]:
+            log_msg += "{0} / {1} / {2} / {3}\n".format(*item)
+        log_msg += "Total rps: {0}cps\n".format(stats["rps-sum"])
+    elif test_type == "bw":
+        log_msg += "Transfer/sec: {0}Bps".format(stats["bw-sum"])
+
+    logger.info(log_msg)
+
+    return log_msg
+
+
+def _parse_wrk_output(msg):
+    """Parse the wrk stdout with the results.
+
+    :param msg: stdout of wrk.
+    :type msg: str
+    :returns: Parsed results.
+    :rtype: dict
+    :raises: WrkError if the message does not include the results.
+    """
+
+    if "Thread Stats" not in msg:
+        raise WrkError("The output of wrk does not include the results.")
+
+    msg_lst = msg.splitlines(False)
+
+    stats = {
+        "latency-dist-lst": list(),
+        "latency-stats-lst": list(),
+        "rps-stats-lst": list(),
+        "rps-lst": list(),
+        "bw-lst": list(),
+        "rps-sum": 0,
+        "bw-sum": None
+    }
+
+    for line in msg_lst:
+        if "Latency Distribution" in line:
+            # Latency distribution - 50%, 75%, 90%, 99%
+            pass
+        elif "Latency" in line:
+            # Latency statistics - Avg, Stdev, Max, +/- Stdev
+            pass
+        elif "Req/Sec" in line:
+            # rps statistics - Avg, Stdev, Max, +/- Stdev
+            stats["rps-stats-lst"].append((
+                _evaluate_number(re.search(REGEX_RPS_STATS, line).group(1)),
+                _evaluate_number(re.search(REGEX_RPS_STATS, line).group(2)),
+                _evaluate_number(re.search(REGEX_RPS_STATS, line).group(3)),
+                _evaluate_number(re.search(REGEX_RPS_STATS, line).group(4))))
+        elif "Requests/sec:" in line:
+            # rps (cps)
+            stats["rps-lst"].append(
+                _evaluate_number(re.search(REGEX_RPS, line).group(1)))
+        elif "Transfer/sec:" in line:
+            # BW
+            stats["bw-lst"].append(
+                _evaluate_number(re.search(REGEX_BW, line).group(1)))
+
+    for item in stats["rps-stats-lst"]:
+        stats["rps-sum"] += item[0]
+    stats["bw-sum"] = sum(stats["bw-lst"])
+
+    return stats
+
+
+def _evaluate_number(num):
+    """Evaluate the numeric value of the number with multiplicands, e.g.:
+    12.25k --> 12250
+
+    :param num: Number to evaluate.
+    :type num: str
+    :returns: Evaluated number.
+    :rtype: float
+    :raises: WrkError if it is not possible to evaluate the given number.
+    """
+
+    val = re.search(REGEX_NUM, num)
+    try:
+        val_num = float(val.group(1))
+    except ValueError:
+        raise WrkError("The output of wrk does not include the results "
+                       "or the format of results has changed.")
+    val_mul = val.group(2).lower()
+    if val_mul:
+        if "k" in val_mul:
+            val_num *= 1000
+        elif "m" in val_mul:
+            val_num *= 1000000
+        elif "g" in val_mul:
+            val_num *= 1000000000
+        elif "b" in val_mul:
+            pass
+        elif "%" in val_mul:
+            pass
+        elif "" in val_mul:
+            pass
+        else:
+            raise WrkError("The multiplicand {0} is not defined.".
+                           format(val_mul))
+    return val_num