1 # Copyright (c) 2018 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
6 # http://www.apache.org/licenses/LICENSE-2.0
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
14 """wrk implementation into CSIT framework.
19 from robot.api import logger
21 from resources.libraries.python.ssh import SSH
22 from resources.libraries.python.topology import NodeType
23 from resources.libraries.python.CpuUtils import CpuUtils
24 from resources.libraries.python.constants import Constants
26 from resources.tools.wrk.wrk_traffic_profile_parser import WrkTrafficProfile
27 from resources.tools.wrk.wrk_errors import WrkError
30 REGEX_LATENCY_STATS = \
32 r"(\d*\.*\d*\S*)\s*" \
33 r"(\d*\.*\d*\S*)\s*" \
34 r"(\d*\.*\d*\S*)\s*" \
38 r"(\d*\.*\d*\S*)\s*" \
39 r"(\d*\.*\d*\S*)\s*" \
40 r"(\d*\.*\d*\S*)\s*" \
42 REGEX_RPS = r"Requests/sec:\s*" \
44 REGEX_BW = r"Transfer/sec:\s*" \
46 REGEX_LATENCY_DIST = \
47 r"Latency Distribution\n" \
48 r"\s*50\%\s*(\d*\.*\d*\D*)\n" \
49 r"\s*75\%\s*(\d*\.*\d*\D*)\n" \
50 r"\s*90\%\s*(\d*\.*\d*\D*)\n" \
51 r"\s*99\%\s*(\d*\.*\d*\D*)\n"
53 # Split number and multiplicand, e.g. 14.25k --> 14.25 and k
54 REGEX_NUM = r"(\d*\.*\d*)(\D*)"
57 def install_wrk(tg_node):
58 """Install wrk on the TG node.
60 :param tg_node: Traffic generator node.
62 :raises: RuntimeError if the given node is not a TG node or if the
66 if tg_node['type'] != NodeType.TG:
67 raise RuntimeError('Node type is not a TG.')
72 ret, _, _ = ssh.exec_command(
74 "sh -c '{0}/resources/tools/wrk/wrk_utils.sh install false'".
75 format(Constants.REMOTE_FW_DIR), timeout=1800)
77 raise RuntimeError('Installation of wrk on TG node failed.')
80 def destroy_wrk(tg_node):
81 """Destroy wrk on the TG node.
83 :param tg_node: Traffic generator node.
85 :raises: RuntimeError if the given node is not a TG node or the removal of
89 if tg_node['type'] != NodeType.TG:
90 raise RuntimeError('Node type is not a TG.')
95 ret, _, _ = ssh.exec_command(
97 "sh -c '{0}/resources/tools/wrk/wrk_utils.sh destroy'".
98 format(Constants.REMOTE_FW_DIR), timeout=1800)
100 raise RuntimeError('Removal of wrk from the TG node failed.')
103 def run_wrk(tg_node, profile_name, tg_numa, test_type):
104 """Send the traffic as defined in the profile.
106 :param tg_node: Traffic generator node.
107 :param profile_name: The name of wrk traffic profile.
108 :param tg_numa: Numa node on which wrk will run.
109 :param test_type: The type of the tests: cps, rps, bw
110 :type profile_name: str
114 :returns: Message with measured data.
116 :raises: RuntimeError if node type is not a TG.
119 if tg_node['type'] != NodeType.TG:
120 raise RuntimeError('Node type is not a TG.')
122 # Parse and validate the profile
123 profile_path = ("resources/traffic_profiles/wrk/{0}.yaml".
124 format(profile_name))
125 profile = WrkTrafficProfile(profile_path).traffic_profile
127 cores = CpuUtils.cpu_list_per_node(tg_node, tg_numa)
128 first_cpu = cores[profile["first-cpu"]]
130 if len(profile["urls"]) == 1 and profile["cpus"] == 1:
132 "traffic_1_url_1_core",
134 str(profile["nr-of-threads"]),
135 str(profile["nr-of-connections"]),
136 "{0}s".format(profile["duration"]),
137 "'{0}'".format(profile["header"]),
138 str(profile["timeout"]),
139 str(profile["script"]),
140 str(profile["latency"]),
141 "'{0}'".format(" ".join(profile["urls"]))
143 elif len(profile["urls"]) == profile["cpus"]:
145 "traffic_n_urls_n_cores",
147 str(profile["nr-of-threads"]),
148 str(profile["nr-of-connections"]),
149 "{0}s".format(profile["duration"]),
150 "'{0}'".format(profile["header"]),
151 str(profile["timeout"]),
152 str(profile["script"]),
153 str(profile["latency"]),
154 "'{0}'".format(" ".join(profile["urls"]))
158 "traffic_n_urls_m_cores",
160 str(profile["cpus"] / len(profile["urls"])),
161 str(profile["nr-of-threads"]),
162 str(profile["nr-of-connections"]),
163 "{0}s".format(profile["duration"]),
164 "'{0}'".format(profile["header"]),
165 str(profile["timeout"]),
166 str(profile["script"]),
167 str(profile["latency"]),
168 "'{0}'".format(" ".join(profile["urls"]))
170 args = " ".join(params)
175 ret, stdout, _ = ssh.exec_command(
176 "{0}/resources/tools/wrk/wrk_utils.sh {1}".
177 format(Constants.REMOTE_FW_DIR, args), timeout=1800)
179 raise RuntimeError('wrk runtime error.')
181 stats = _parse_wrk_output(stdout)
183 log_msg = "\nMeasured values:\n"
184 if test_type == "cps":
185 log_msg += "Connections/sec: Avg / Stdev / Max / +/- Stdev\n"
186 for item in stats["rps-stats-lst"]:
187 log_msg += "{0} / {1} / {2} / {3}\n".format(*item)
188 log_msg += "Total cps: {0}cps\n".format(stats["rps-sum"])
189 elif test_type == "rps":
190 log_msg += "Requests/sec: Avg / Stdev / Max / +/- Stdev\n"
191 for item in stats["rps-stats-lst"]:
192 log_msg += "{0} / {1} / {2} / {3}\n".format(*item)
193 log_msg += "Total rps: {0}cps\n".format(stats["rps-sum"])
194 elif test_type == "bw":
195 log_msg += "Transfer/sec: {0}Bps".format(stats["bw-sum"])
202 def _parse_wrk_output(msg):
203 """Parse the wrk stdout with the results.
205 :param msg: stdout of wrk.
207 :returns: Parsed results.
209 :raises: WrkError if the message does not include the results.
212 if "Thread Stats" not in msg:
213 raise WrkError("The output of wrk does not include the results.")
215 msg_lst = msg.splitlines(False)
218 "latency-dist-lst": list(),
219 "latency-stats-lst": list(),
220 "rps-stats-lst": list(),
228 if "Latency Distribution" in line:
229 # Latency distribution - 50%, 75%, 90%, 99%
231 elif "Latency" in line:
232 # Latency statistics - Avg, Stdev, Max, +/- Stdev
234 elif "Req/Sec" in line:
235 # rps statistics - Avg, Stdev, Max, +/- Stdev
236 stats["rps-stats-lst"].append((
237 _evaluate_number(re.search(REGEX_RPS_STATS, line).group(1)),
238 _evaluate_number(re.search(REGEX_RPS_STATS, line).group(2)),
239 _evaluate_number(re.search(REGEX_RPS_STATS, line).group(3)),
240 _evaluate_number(re.search(REGEX_RPS_STATS, line).group(4))))
241 elif "Requests/sec:" in line:
243 stats["rps-lst"].append(
244 _evaluate_number(re.search(REGEX_RPS, line).group(1)))
245 elif "Transfer/sec:" in line:
247 stats["bw-lst"].append(
248 _evaluate_number(re.search(REGEX_BW, line).group(1)))
250 for item in stats["rps-stats-lst"]:
251 stats["rps-sum"] += item[0]
252 stats["bw-sum"] = sum(stats["bw-lst"])
257 def _evaluate_number(num):
258 """Evaluate the numeric value of the number with multiplicands, e.g.:
261 :param num: Number to evaluate.
263 :returns: Evaluated number.
265 :raises: WrkError if it is not possible to evaluate the given number.
268 val = re.search(REGEX_NUM, num)
270 val_num = float(val.group(1))
272 raise WrkError("The output of wrk does not include the results "
273 "or the format of results has changed.")
274 val_mul = val.group(2).lower()
281 val_num *= 1000000000
289 raise WrkError("The multiplicand {0} is not defined.".