1 # Copyright (c) 2019 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
6 # http://www.apache.org/licenses/LICENSE-2.0
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
14 """wrk implementation into CSIT framework.
19 from copy import deepcopy
20 from time import sleep
22 from robot.api import logger
24 from resources.libraries.python.ssh import SSH
25 from resources.libraries.python.topology import NodeType
26 from resources.libraries.python.CpuUtils import CpuUtils
27 from resources.libraries.python.Constants import Constants
29 from resources.tools.wrk.wrk_traffic_profile_parser import WrkTrafficProfile
30 from resources.tools.wrk.wrk_errors import WrkError
33 REGEX_LATENCY_STATS = \
35 r"(\d*\.*\d*\S*)\s*" \
36 r"(\d*\.*\d*\S*)\s*" \
37 r"(\d*\.*\d*\S*)\s*" \
41 r"(\d*\.*\d*\S*)\s*" \
42 r"(\d*\.*\d*\S*)\s*" \
43 r"(\d*\.*\d*\S*)\s*" \
45 REGEX_RPS = r"Requests/sec:\s*" \
47 REGEX_BW = r"Transfer/sec:\s*" \
49 REGEX_LATENCY_DIST = \
50 r"Latency Distribution\n" \
51 r"\s*50\%\s*(\d*\.*\d*\D*)\n" \
52 r"\s*75\%\s*(\d*\.*\d*\D*)\n" \
53 r"\s*90\%\s*(\d*\.*\d*\D*)\n" \
54 r"\s*99\%\s*(\d*\.*\d*\D*)\n"
56 # Split number and multiplicand, e.g. 14.25k --> 14.25 and k
57 REGEX_NUM = r"(\d*\.*\d*)(\D*)"
60 def check_wrk(tg_node):
61 """Check if wrk is installed on the TG node.
63 :param tg_node: Traffic generator node.
65 :raises: RuntimeError if the given node is not a TG node or if the
66 command is not availble.
69 if tg_node[u"type"] != NodeType.TG:
70 raise RuntimeError(u"Node type is not a TG.")
75 ret, _, _ = ssh.exec_command(
76 f"sudo -E sh -c '{Constants.REMOTE_FW_DIR}/resources/tools/"
77 f"wrk/wrk_utils.sh installed'"
80 raise RuntimeError(u"WRK is not installed on TG node.")
83 def run_wrk(tg_node, profile_name, tg_numa, test_type, warm_up=False):
84 """Send the traffic as defined in the profile.
86 :param tg_node: Traffic generator node.
87 :param profile_name: The name of wrk traffic profile.
88 :param tg_numa: Numa node on which wrk will run.
89 :param test_type: The type of the tests: cps, rps, bw
90 :param warm_up: If True, warm-up traffic is generated before test traffic.
91 :type profile_name: str
96 :returns: Message with measured data.
98 :raises: RuntimeError if node type is not a TG.
101 if tg_node[u"type"] != NodeType.TG:
102 raise RuntimeError(u"Node type is not a TG.")
104 # Parse and validate the profile
105 profile_path = f"resources/traffic_profiles/wrk/{profile_name}.yaml"
106 profile = WrkTrafficProfile(profile_path).traffic_profile
108 cores = CpuUtils.cpu_list_per_node(tg_node, tg_numa)
109 first_cpu = cores[profile[u"first-cpu"]]
111 if len(profile[u"urls"]) == 1 and profile[u"cpus"] == 1:
113 u"traffic_1_url_1_core",
115 str(profile[u"nr-of-threads"]),
116 str(profile[u"nr-of-connections"]),
117 f"{profile[u'duration']}s",
118 f"'{profile[u'header']}'",
119 str(profile[u"timeout"]),
120 str(profile[u"script"]),
121 str(profile[u"latency"]),
122 f"'{u' '.join(profile[u'urls'])}'"
125 warm_up_params = deepcopy(params)
126 warm_up_params[4] = u"10s"
127 elif len(profile[u"urls"]) == profile[u"cpus"]:
129 u"traffic_n_urls_n_cores",
131 str(profile[u"nr-of-threads"]),
132 str(profile[u"nr-of-connections"]),
133 f"{profile[u'duration']}s",
134 f"'{profile[u'header']}'",
135 str(profile[u"timeout"]),
136 str(profile[u"script"]),
137 str(profile[u"latency"]),
138 f"'{u' '.join(profile[u'urls'])}'"
141 warm_up_params = deepcopy(params)
142 warm_up_params[4] = u"10s"
145 u"traffic_n_urls_m_cores",
147 str(profile[u"cpus"] // len(profile[u"urls"])),
148 str(profile[u"nr-of-threads"]),
149 str(profile[u"nr-of-connections"]),
150 f"{profile[u'duration']}s",
151 f"'{profile[u'header']}'",
152 str(profile[u"timeout"]),
153 str(profile[u"script"]),
154 str(profile[u"latency"]),
155 f"'{u' '.join(profile[u'urls'])}'"
158 warm_up_params = deepcopy(params)
159 warm_up_params[5] = u"10s"
161 args = u" ".join(params)
167 warm_up_args = u" ".join(warm_up_params)
168 ret, _, _ = ssh.exec_command(
169 f"{Constants.REMOTE_FW_DIR}/resources/tools/wrk/wrk_utils.sh "
170 f"{warm_up_args}", timeout=1800
173 raise RuntimeError(u"wrk runtime error.")
176 ret, stdout, _ = ssh.exec_command(
177 f"{Constants.REMOTE_FW_DIR}/resources/tools/wrk/wrk_utils.sh {args}",
181 raise RuntimeError('wrk runtime error.')
183 stats = _parse_wrk_output(stdout)
185 log_msg = u"\nMeasured values:\n"
186 if test_type == u"cps":
187 log_msg += u"Connections/sec: Avg / Stdev / Max / +/- Stdev\n"
188 for item in stats[u"rps-stats-lst"]:
189 log_msg += f"{0} / {1} / {2} / {3}\n".format(*item)
190 log_msg += f"Total cps: {stats[u'rps-sum']}cps\n"
191 elif test_type == u"rps":
192 log_msg += u"Requests/sec: Avg / Stdev / Max / +/- Stdev\n"
193 for item in stats[u"rps-stats-lst"]:
194 log_msg += f"{0} / {1} / {2} / {3}\n".format(*item)
195 log_msg += f"Total rps: {stats[u'rps-sum']}rps\n"
196 elif test_type == u"bw":
197 log_msg += f"Transfer/sec: {stats[u'bw-sum']}Bps"
204 def _parse_wrk_output(msg):
205 """Parse the wrk stdout with the results.
207 :param msg: stdout of wrk.
209 :returns: Parsed results.
211 :raises: WrkError if the message does not include the results.
214 if u"Thread Stats" not in msg:
215 raise WrkError(u"The output of wrk does not include the results.")
217 msg_lst = msg.splitlines(False)
220 u"latency-dist-lst": list(),
221 u"latency-stats-lst": list(),
222 u"rps-stats-lst": list(),
230 if u"Latency Distribution" in line:
231 # Latency distribution - 50%, 75%, 90%, 99%
233 elif u"Latency" in line:
234 # Latency statistics - Avg, Stdev, Max, +/- Stdev
236 elif u"Req/Sec" in line:
237 # rps statistics - Avg, Stdev, Max, +/- Stdev
238 stats[u"rps-stats-lst"].append(
240 _evaluate_number(re.search(REGEX_RPS_STATS, line).group(1)),
241 _evaluate_number(re.search(REGEX_RPS_STATS, line).group(2)),
242 _evaluate_number(re.search(REGEX_RPS_STATS, line).group(3)),
243 _evaluate_number(re.search(REGEX_RPS_STATS, line).group(4))
246 elif u"Requests/sec:" in line:
248 stats[u"rps-lst"].append(
249 _evaluate_number(re.search(REGEX_RPS, line).group(1))
251 elif u"Transfer/sec:" in line:
253 stats[u"bw-lst"].append(
254 _evaluate_number(re.search(REGEX_BW, line).group(1))
257 for item in stats[u"rps-stats-lst"]:
258 stats[u"rps-sum"] += item[0]
259 stats[u"bw-sum"] = sum(stats[u"bw-lst"])
264 def _evaluate_number(num):
265 """Evaluate the numeric value of the number with multiplicands, e.g.:
268 :param num: Number to evaluate.
270 :returns: Evaluated number.
272 :raises: WrkError if it is not possible to evaluate the given number.
275 val = re.search(REGEX_NUM, num)
277 val_num = float(val.group(1))
280 u"The output of wrk does not include the results or the format "
281 u"of results has changed."
283 val_mul = val.group(2).lower()
287 elif u"m" in val_mul:
289 elif u"g" in val_mul:
290 val_num *= 1000000000
291 elif u"b" in val_mul:
293 elif u"%" in val_mul:
298 raise WrkError(f"The multiplicand {val_mul} is not defined.")