1 # Copyright (c) 2018 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
6 # http://www.apache.org/licenses/LICENSE-2.0
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
14 """wrk implementation into CSIT framework.
19 from copy import deepcopy
20 from time import sleep
22 from robot.api import logger
24 from resources.libraries.python.ssh import SSH
25 from resources.libraries.python.topology import NodeType
26 from resources.libraries.python.CpuUtils import CpuUtils
27 from resources.libraries.python.Constants import Constants
29 from resources.tools.wrk.wrk_traffic_profile_parser import WrkTrafficProfile
30 from resources.tools.wrk.wrk_errors import WrkError
33 REGEX_LATENCY_STATS = \
35 r"(\d*\.*\d*\S*)\s*" \
36 r"(\d*\.*\d*\S*)\s*" \
37 r"(\d*\.*\d*\S*)\s*" \
41 r"(\d*\.*\d*\S*)\s*" \
42 r"(\d*\.*\d*\S*)\s*" \
43 r"(\d*\.*\d*\S*)\s*" \
45 REGEX_RPS = r"Requests/sec:\s*" \
47 REGEX_BW = r"Transfer/sec:\s*" \
49 REGEX_LATENCY_DIST = \
50 r"Latency Distribution\n" \
51 r"\s*50\%\s*(\d*\.*\d*\D*)\n" \
52 r"\s*75\%\s*(\d*\.*\d*\D*)\n" \
53 r"\s*90\%\s*(\d*\.*\d*\D*)\n" \
54 r"\s*99\%\s*(\d*\.*\d*\D*)\n"
56 # Split number and multiplicand, e.g. 14.25k --> 14.25 and k
57 REGEX_NUM = r"(\d*\.*\d*)(\D*)"
60 def install_wrk(tg_node):
61 """Install wrk on the TG node.
63 :param tg_node: Traffic generator node.
65 :raises: RuntimeError if the given node is not a TG node or if the
69 if tg_node['type'] != NodeType.TG:
70 raise RuntimeError('Node type is not a TG.')
75 ret, _, _ = ssh.exec_command(
77 "sh -c '{0}/resources/tools/wrk/wrk_utils.sh install false'".
78 format(Constants.REMOTE_FW_DIR), timeout=1800)
80 raise RuntimeError('Installation of wrk on TG node failed.')
83 def destroy_wrk(tg_node):
84 """Destroy wrk on the TG node.
86 :param tg_node: Traffic generator node.
88 :raises: RuntimeError if the given node is not a TG node or the removal of
92 if tg_node['type'] != NodeType.TG:
93 raise RuntimeError('Node type is not a TG.')
98 ret, _, _ = ssh.exec_command(
100 "sh -c '{0}/resources/tools/wrk/wrk_utils.sh destroy'".
101 format(Constants.REMOTE_FW_DIR), timeout=1800)
103 raise RuntimeError('Removal of wrk from the TG node failed.')
106 def run_wrk(tg_node, profile_name, tg_numa, test_type, warm_up=False):
107 """Send the traffic as defined in the profile.
109 :param tg_node: Traffic generator node.
110 :param profile_name: The name of wrk traffic profile.
111 :param tg_numa: Numa node on which wrk will run.
112 :param test_type: The type of the tests: cps, rps, bw
113 :param warm_up: If True, warm-up traffic is generated before test traffic.
114 :type profile_name: str
119 :returns: Message with measured data.
121 :raises: RuntimeError if node type is not a TG.
124 if tg_node['type'] != NodeType.TG:
125 raise RuntimeError('Node type is not a TG.')
127 # Parse and validate the profile
128 profile_path = ("resources/traffic_profiles/wrk/{0}.yaml".
129 format(profile_name))
130 profile = WrkTrafficProfile(profile_path).traffic_profile
132 cores = CpuUtils.cpu_list_per_node(tg_node, tg_numa)
133 first_cpu = cores[profile["first-cpu"]]
135 if len(profile["urls"]) == 1 and profile["cpus"] == 1:
137 "traffic_1_url_1_core",
139 str(profile["nr-of-threads"]),
140 str(profile["nr-of-connections"]),
141 "{0}s".format(profile["duration"]),
142 "'{0}'".format(profile["header"]),
143 str(profile["timeout"]),
144 str(profile["script"]),
145 str(profile["latency"]),
146 "'{0}'".format(" ".join(profile["urls"]))
149 warm_up_params = deepcopy(params)
150 warm_up_params[4] = "10s"
151 elif len(profile["urls"]) == profile["cpus"]:
153 "traffic_n_urls_n_cores",
155 str(profile["nr-of-threads"]),
156 str(profile["nr-of-connections"]),
157 "{0}s".format(profile["duration"]),
158 "'{0}'".format(profile["header"]),
159 str(profile["timeout"]),
160 str(profile["script"]),
161 str(profile["latency"]),
162 "'{0}'".format(" ".join(profile["urls"]))
165 warm_up_params = deepcopy(params)
166 warm_up_params[4] = "10s"
169 "traffic_n_urls_m_cores",
171 str(profile["cpus"] / len(profile["urls"])),
172 str(profile["nr-of-threads"]),
173 str(profile["nr-of-connections"]),
174 "{0}s".format(profile["duration"]),
175 "'{0}'".format(profile["header"]),
176 str(profile["timeout"]),
177 str(profile["script"]),
178 str(profile["latency"]),
179 "'{0}'".format(" ".join(profile["urls"]))
182 warm_up_params = deepcopy(params)
183 warm_up_params[5] = "10s"
185 args = " ".join(params)
191 warm_up_args = " ".join(warm_up_params)
192 ret, _, _ = ssh.exec_command(
193 "{0}/resources/tools/wrk/wrk_utils.sh {1}".
194 format(Constants.REMOTE_FW_DIR, warm_up_args), timeout=1800)
196 raise RuntimeError('wrk runtime error.')
199 ret, stdout, _ = ssh.exec_command(
200 "{0}/resources/tools/wrk/wrk_utils.sh {1}".
201 format(Constants.REMOTE_FW_DIR, args), timeout=1800)
203 raise RuntimeError('wrk runtime error.')
205 stats = _parse_wrk_output(stdout)
207 log_msg = "\nMeasured values:\n"
208 if test_type == "cps":
209 log_msg += "Connections/sec: Avg / Stdev / Max / +/- Stdev\n"
210 for item in stats["rps-stats-lst"]:
211 log_msg += "{0} / {1} / {2} / {3}\n".format(*item)
212 log_msg += "Total cps: {0}cps\n".format(stats["rps-sum"])
213 elif test_type == "rps":
214 log_msg += "Requests/sec: Avg / Stdev / Max / +/- Stdev\n"
215 for item in stats["rps-stats-lst"]:
216 log_msg += "{0} / {1} / {2} / {3}\n".format(*item)
217 log_msg += "Total rps: {0}rps\n".format(stats["rps-sum"])
218 elif test_type == "bw":
219 log_msg += "Transfer/sec: {0}Bps".format(stats["bw-sum"])
226 def _parse_wrk_output(msg):
227 """Parse the wrk stdout with the results.
229 :param msg: stdout of wrk.
231 :returns: Parsed results.
233 :raises: WrkError if the message does not include the results.
236 if "Thread Stats" not in msg:
237 raise WrkError("The output of wrk does not include the results.")
239 msg_lst = msg.splitlines(False)
242 "latency-dist-lst": list(),
243 "latency-stats-lst": list(),
244 "rps-stats-lst": list(),
252 if "Latency Distribution" in line:
253 # Latency distribution - 50%, 75%, 90%, 99%
255 elif "Latency" in line:
256 # Latency statistics - Avg, Stdev, Max, +/- Stdev
258 elif "Req/Sec" in line:
259 # rps statistics - Avg, Stdev, Max, +/- Stdev
260 stats["rps-stats-lst"].append((
261 _evaluate_number(re.search(REGEX_RPS_STATS, line).group(1)),
262 _evaluate_number(re.search(REGEX_RPS_STATS, line).group(2)),
263 _evaluate_number(re.search(REGEX_RPS_STATS, line).group(3)),
264 _evaluate_number(re.search(REGEX_RPS_STATS, line).group(4))))
265 elif "Requests/sec:" in line:
267 stats["rps-lst"].append(
268 _evaluate_number(re.search(REGEX_RPS, line).group(1)))
269 elif "Transfer/sec:" in line:
271 stats["bw-lst"].append(
272 _evaluate_number(re.search(REGEX_BW, line).group(1)))
274 for item in stats["rps-stats-lst"]:
275 stats["rps-sum"] += item[0]
276 stats["bw-sum"] = sum(stats["bw-lst"])
281 def _evaluate_number(num):
282 """Evaluate the numeric value of the number with multiplicands, e.g.:
285 :param num: Number to evaluate.
287 :returns: Evaluated number.
289 :raises: WrkError if it is not possible to evaluate the given number.
292 val = re.search(REGEX_NUM, num)
294 val_num = float(val.group(1))
296 raise WrkError("The output of wrk does not include the results "
297 "or the format of results has changed.")
298 val_mul = val.group(2).lower()
305 val_num *= 1000000000
313 raise WrkError("The multiplicand {0} is not defined.".