a891fd6ae9763fdab8f08ad1c39cfaf3fea385f4
[csit.git] / resources / tools / wrk / wrk.py
1 # Copyright (c) 2018 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """wrk implementation into CSIT framework.
15 """
16
17 import re
18
19 from copy import deepcopy
20 from time import sleep
21
22 from robot.api import logger
23
24 from resources.libraries.python.ssh import SSH
25 from resources.libraries.python.topology import NodeType
26 from resources.libraries.python.CpuUtils import CpuUtils
27 from resources.libraries.python.constants import Constants
28
29 from resources.tools.wrk.wrk_traffic_profile_parser import WrkTrafficProfile
30 from resources.tools.wrk.wrk_errors import WrkError
31
32
33 REGEX_LATENCY_STATS = \
34     r"Latency\s*" \
35     r"(\d*\.*\d*\S*)\s*" \
36     r"(\d*\.*\d*\S*)\s*" \
37     r"(\d*\.*\d*\S*)\s*" \
38     r"(\d*\.*\d*\%)"
39 REGEX_RPS_STATS = \
40     r"Req/Sec\s*" \
41     r"(\d*\.*\d*\S*)\s*" \
42     r"(\d*\.*\d*\S*)\s*" \
43     r"(\d*\.*\d*\S*)\s*" \
44     r"(\d*\.*\d*\%)"
45 REGEX_RPS = r"Requests/sec:\s*" \
46             r"(\d*\.*\S*)"
47 REGEX_BW = r"Transfer/sec:\s*" \
48            r"(\d*\.*\S*)"
49 REGEX_LATENCY_DIST = \
50     r"Latency Distribution\n" \
51     r"\s*50\%\s*(\d*\.*\d*\D*)\n" \
52     r"\s*75\%\s*(\d*\.*\d*\D*)\n" \
53     r"\s*90\%\s*(\d*\.*\d*\D*)\n" \
54     r"\s*99\%\s*(\d*\.*\d*\D*)\n"
55
56 # Split number and multiplicand, e.g. 14.25k --> 14.25 and k
57 REGEX_NUM = r"(\d*\.*\d*)(\D*)"
58
59
60 def install_wrk(tg_node):
61     """Install wrk on the TG node.
62
63     :param tg_node: Traffic generator node.
64     :type tg_node: dict
65     :raises: RuntimeError if the given node is not a TG node or if the
66     installation fails.
67     """
68
69     if tg_node['type'] != NodeType.TG:
70         raise RuntimeError('Node type is not a TG.')
71
72     ssh = SSH()
73     ssh.connect(tg_node)
74
75     ret, _, _ = ssh.exec_command(
76         "sudo -E "
77         "sh -c '{0}/resources/tools/wrk/wrk_utils.sh install false'".
78         format(Constants.REMOTE_FW_DIR), timeout=1800)
79     if int(ret) != 0:
80         raise RuntimeError('Installation of wrk on TG node failed.')
81
82
83 def destroy_wrk(tg_node):
84     """Destroy wrk on the TG node.
85
86     :param tg_node: Traffic generator node.
87     :type tg_node: dict
88     :raises: RuntimeError if the given node is not a TG node or the removal of
89     wrk failed.
90     """
91
92     if tg_node['type'] != NodeType.TG:
93         raise RuntimeError('Node type is not a TG.')
94
95     ssh = SSH()
96     ssh.connect(tg_node)
97
98     ret, _, _ = ssh.exec_command(
99         "sudo -E "
100         "sh -c '{0}/resources/tools/wrk/wrk_utils.sh destroy'".
101         format(Constants.REMOTE_FW_DIR), timeout=1800)
102     if int(ret) != 0:
103         raise RuntimeError('Removal of wrk from the TG node failed.')
104
105
106 def run_wrk(tg_node, profile_name, tg_numa, test_type, warm_up=False):
107     """Send the traffic as defined in the profile.
108
109     :param tg_node: Traffic generator node.
110     :param profile_name: The name of wrk traffic profile.
111     :param tg_numa: Numa node on which wrk will run.
112     :param test_type: The type of the tests: cps, rps, bw
113     :param warm_up: If True, warm-up traffic is generated before test traffic.
114     :type profile_name: str
115     :type tg_node: dict
116     :type tg_numa: int
117     :type test_type: str
118     :type warm_up: bool
119     :returns: Message with measured data.
120     :rtype: str
121     :raises: RuntimeError if node type is not a TG.
122     """
123
124     if tg_node['type'] != NodeType.TG:
125         raise RuntimeError('Node type is not a TG.')
126
127     # Parse and validate the profile
128     profile_path = ("resources/traffic_profiles/wrk/{0}.yaml".
129                     format(profile_name))
130     profile = WrkTrafficProfile(profile_path).traffic_profile
131
132     cores = CpuUtils.cpu_list_per_node(tg_node, tg_numa)
133     first_cpu = cores[profile["first-cpu"]]
134
135     if len(profile["urls"]) == 1 and profile["cpus"] == 1:
136         params = [
137             "traffic_1_url_1_core",
138             str(first_cpu),
139             str(profile["nr-of-threads"]),
140             str(profile["nr-of-connections"]),
141             "{0}s".format(profile["duration"]),
142             "'{0}'".format(profile["header"]),
143             str(profile["timeout"]),
144             str(profile["script"]),
145             str(profile["latency"]),
146             "'{0}'".format(" ".join(profile["urls"]))
147         ]
148         if warm_up:
149             warm_up_params = deepcopy(params)
150             warm_up_params[4] = "10s"
151     elif len(profile["urls"]) == profile["cpus"]:
152         params = [
153             "traffic_n_urls_n_cores",
154             str(first_cpu),
155             str(profile["nr-of-threads"]),
156             str(profile["nr-of-connections"]),
157             "{0}s".format(profile["duration"]),
158             "'{0}'".format(profile["header"]),
159             str(profile["timeout"]),
160             str(profile["script"]),
161             str(profile["latency"]),
162             "'{0}'".format(" ".join(profile["urls"]))
163         ]
164         if warm_up:
165             warm_up_params = deepcopy(params)
166             warm_up_params[4] = "10s"
167     else:
168         params = [
169             "traffic_n_urls_m_cores",
170             str(first_cpu),
171             str(profile["cpus"] / len(profile["urls"])),
172             str(profile["nr-of-threads"]),
173             str(profile["nr-of-connections"]),
174             "{0}s".format(profile["duration"]),
175             "'{0}'".format(profile["header"]),
176             str(profile["timeout"]),
177             str(profile["script"]),
178             str(profile["latency"]),
179             "'{0}'".format(" ".join(profile["urls"]))
180         ]
181         if warm_up:
182             warm_up_params = deepcopy(params)
183             warm_up_params[5] = "10s"
184
185     args = " ".join(params)
186
187     ssh = SSH()
188     ssh.connect(tg_node)
189
190     if warm_up:
191         warm_up_args = " ".join(warm_up_params)
192         ret, _, _ = ssh.exec_command(
193             "{0}/resources/tools/wrk/wrk_utils.sh {1}".
194             format(Constants.REMOTE_FW_DIR, warm_up_args), timeout=1800)
195         if int(ret) != 0:
196             raise RuntimeError('wrk runtime error.')
197         sleep(60)
198
199     ret, stdout, _ = ssh.exec_command(
200         "{0}/resources/tools/wrk/wrk_utils.sh {1}".
201         format(Constants.REMOTE_FW_DIR, args), timeout=1800)
202     if int(ret) != 0:
203         raise RuntimeError('wrk runtime error.')
204
205     stats = _parse_wrk_output(stdout)
206
207     log_msg = "\nMeasured values:\n"
208     if test_type == "cps":
209         log_msg += "Connections/sec: Avg / Stdev / Max  / +/- Stdev\n"
210         for item in stats["rps-stats-lst"]:
211             log_msg += "{0} / {1} / {2} / {3}\n".format(*item)
212         log_msg += "Total cps: {0}cps\n".format(stats["rps-sum"])
213     elif test_type == "rps":
214         log_msg += "Requests/sec: Avg / Stdev / Max  / +/- Stdev\n"
215         for item in stats["rps-stats-lst"]:
216             log_msg += "{0} / {1} / {2} / {3}\n".format(*item)
217         log_msg += "Total rps: {0}rps\n".format(stats["rps-sum"])
218     elif test_type == "bw":
219         log_msg += "Transfer/sec: {0}Bps".format(stats["bw-sum"])
220
221     logger.info(log_msg)
222
223     return log_msg
224
225
226 def _parse_wrk_output(msg):
227     """Parse the wrk stdout with the results.
228
229     :param msg: stdout of wrk.
230     :type msg: str
231     :returns: Parsed results.
232     :rtype: dict
233     :raises: WrkError if the message does not include the results.
234     """
235
236     if "Thread Stats" not in msg:
237         raise WrkError("The output of wrk does not include the results.")
238
239     msg_lst = msg.splitlines(False)
240
241     stats = {
242         "latency-dist-lst": list(),
243         "latency-stats-lst": list(),
244         "rps-stats-lst": list(),
245         "rps-lst": list(),
246         "bw-lst": list(),
247         "rps-sum": 0,
248         "bw-sum": None
249     }
250
251     for line in msg_lst:
252         if "Latency Distribution" in line:
253             # Latency distribution - 50%, 75%, 90%, 99%
254             pass
255         elif "Latency" in line:
256             # Latency statistics - Avg, Stdev, Max, +/- Stdev
257             pass
258         elif "Req/Sec" in line:
259             # rps statistics - Avg, Stdev, Max, +/- Stdev
260             stats["rps-stats-lst"].append((
261                 _evaluate_number(re.search(REGEX_RPS_STATS, line).group(1)),
262                 _evaluate_number(re.search(REGEX_RPS_STATS, line).group(2)),
263                 _evaluate_number(re.search(REGEX_RPS_STATS, line).group(3)),
264                 _evaluate_number(re.search(REGEX_RPS_STATS, line).group(4))))
265         elif "Requests/sec:" in line:
266             # rps (cps)
267             stats["rps-lst"].append(
268                 _evaluate_number(re.search(REGEX_RPS, line).group(1)))
269         elif "Transfer/sec:" in line:
270             # BW
271             stats["bw-lst"].append(
272                 _evaluate_number(re.search(REGEX_BW, line).group(1)))
273
274     for item in stats["rps-stats-lst"]:
275         stats["rps-sum"] += item[0]
276     stats["bw-sum"] = sum(stats["bw-lst"])
277
278     return stats
279
280
281 def _evaluate_number(num):
282     """Evaluate the numeric value of the number with multiplicands, e.g.:
283     12.25k --> 12250
284
285     :param num: Number to evaluate.
286     :type num: str
287     :returns: Evaluated number.
288     :rtype: float
289     :raises: WrkError if it is not possible to evaluate the given number.
290     """
291
292     val = re.search(REGEX_NUM, num)
293     try:
294         val_num = float(val.group(1))
295     except ValueError:
296         raise WrkError("The output of wrk does not include the results "
297                        "or the format of results has changed.")
298     val_mul = val.group(2).lower()
299     if val_mul:
300         if "k" in val_mul:
301             val_num *= 1000
302         elif "m" in val_mul:
303             val_num *= 1000000
304         elif "g" in val_mul:
305             val_num *= 1000000000
306         elif "b" in val_mul:
307             pass
308         elif "%" in val_mul:
309             pass
310         elif "" in val_mul:
311             pass
312         else:
313             raise WrkError("The multiplicand {0} is not defined.".
314                            format(val_mul))
315     return val_num