add new topology parameter: arch
[csit.git] / resources / tools / wrk / wrk.py
1 # Copyright (c) 2018 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """wrk implementation into CSIT framework.
15 """
16
17 import re
18
19 from robot.api import logger
20
21 from resources.libraries.python.ssh import SSH
22 from resources.libraries.python.topology import NodeType
23 from resources.libraries.python.CpuUtils import CpuUtils
24 from resources.libraries.python.constants import Constants
25
26 from resources.tools.wrk.wrk_traffic_profile_parser import WrkTrafficProfile
27 from resources.tools.wrk.wrk_errors import WrkError
28
29
30 REGEX_LATENCY_STATS = \
31     r"Latency\s*" \
32     r"(\d*\.*\d*\S*)\s*" \
33     r"(\d*\.*\d*\S*)\s*" \
34     r"(\d*\.*\d*\S*)\s*" \
35     r"(\d*\.*\d*\%)"
36 REGEX_RPS_STATS = \
37     r"Req/Sec\s*" \
38     r"(\d*\.*\d*\S*)\s*" \
39     r"(\d*\.*\d*\S*)\s*" \
40     r"(\d*\.*\d*\S*)\s*" \
41     r"(\d*\.*\d*\%)"
42 REGEX_RPS = r"Requests/sec:\s*" \
43             r"(\d*\.*\S*)"
44 REGEX_BW = r"Transfer/sec:\s*" \
45            r"(\d*\.*\S*)"
46 REGEX_LATENCY_DIST = \
47     r"Latency Distribution\n" \
48     r"\s*50\%\s*(\d*\.*\d*\D*)\n" \
49     r"\s*75\%\s*(\d*\.*\d*\D*)\n" \
50     r"\s*90\%\s*(\d*\.*\d*\D*)\n" \
51     r"\s*99\%\s*(\d*\.*\d*\D*)\n"
52
53 # Split number and multiplicand, e.g. 14.25k --> 14.25 and k
54 REGEX_NUM = r"(\d*\.*\d*)(\D*)"
55
56
57 def install_wrk(tg_node):
58     """Install wrk on the TG node.
59
60     :param tg_node: Traffic generator node.
61     :type tg_node: dict
62     :raises: RuntimeError if the given node is not a TG node or if the
63     installation fails.
64     """
65
66     if tg_node['type'] != NodeType.TG:
67         raise RuntimeError('Node type is not a TG.')
68
69     ssh = SSH()
70     ssh.connect(tg_node)
71
72     ret, _, _ = ssh.exec_command(
73         "sudo -E "
74         "sh -c '{0}/resources/tools/wrk/wrk_utils.sh install false'".
75         format(Constants.REMOTE_FW_DIR), timeout=1800)
76     if int(ret) != 0:
77         raise RuntimeError('Installation of wrk on TG node failed.')
78
79
80 def destroy_wrk(tg_node):
81     """Destroy wrk on the TG node.
82
83     :param tg_node: Traffic generator node.
84     :type tg_node: dict
85     :raises: RuntimeError if the given node is not a TG node or the removal of
86     wrk failed.
87     """
88
89     if tg_node['type'] != NodeType.TG:
90         raise RuntimeError('Node type is not a TG.')
91
92     ssh = SSH()
93     ssh.connect(tg_node)
94
95     ret, _, _ = ssh.exec_command(
96         "sudo -E "
97         "sh -c '{0}/resources/tools/wrk/wrk_utils.sh destroy'".
98         format(Constants.REMOTE_FW_DIR), timeout=1800)
99     if int(ret) != 0:
100         raise RuntimeError('Removal of wrk from the TG node failed.')
101
102
103 def run_wrk(tg_node, profile_name, tg_numa, test_type):
104     """Send the traffic as defined in the profile.
105
106     :param tg_node: Traffic generator node.
107     :param profile_name: The name of wrk traffic profile.
108     :param tg_numa: Numa node on which wrk will run.
109     :param test_type: The type of the tests: cps, rps, bw
110     :type profile_name: str
111     :type tg_node: dict
112     :type tg_numa: int
113     :type test_type: str
114     :returns: Message with measured data.
115     :rtype: str
116     :raises: RuntimeError if node type is not a TG.
117     """
118
119     if tg_node['type'] != NodeType.TG:
120         raise RuntimeError('Node type is not a TG.')
121
122     # Parse and validate the profile
123     profile_path = ("resources/traffic_profiles/wrk/{0}.yaml".
124                     format(profile_name))
125     profile = WrkTrafficProfile(profile_path).traffic_profile
126
127     cores = CpuUtils.cpu_list_per_node(tg_node, tg_numa)
128     first_cpu = cores[profile["first-cpu"]]
129
130     if len(profile["urls"]) == 1 and profile["cpus"] == 1:
131         params = [
132             "traffic_1_url_1_core",
133             str(first_cpu),
134             str(profile["nr-of-threads"]),
135             str(profile["nr-of-connections"]),
136             "{0}s".format(profile["duration"]),
137             "'{0}'".format(profile["header"]),
138             str(profile["timeout"]),
139             str(profile["script"]),
140             str(profile["latency"]),
141             "'{0}'".format(" ".join(profile["urls"]))
142         ]
143     elif len(profile["urls"]) == profile["cpus"]:
144         params = [
145             "traffic_n_urls_n_cores",
146             str(first_cpu),
147             str(profile["nr-of-threads"]),
148             str(profile["nr-of-connections"]),
149             "{0}s".format(profile["duration"]),
150             "'{0}'".format(profile["header"]),
151             str(profile["timeout"]),
152             str(profile["script"]),
153             str(profile["latency"]),
154             "'{0}'".format(" ".join(profile["urls"]))
155         ]
156     else:
157         params = [
158             "traffic_n_urls_m_cores",
159             str(first_cpu),
160             str(profile["cpus"] / len(profile["urls"])),
161             str(profile["nr-of-threads"]),
162             str(profile["nr-of-connections"]),
163             "{0}s".format(profile["duration"]),
164             "'{0}'".format(profile["header"]),
165             str(profile["timeout"]),
166             str(profile["script"]),
167             str(profile["latency"]),
168             "'{0}'".format(" ".join(profile["urls"]))
169         ]
170     args = " ".join(params)
171
172     ssh = SSH()
173     ssh.connect(tg_node)
174
175     ret, stdout, _ = ssh.exec_command(
176         "{0}/resources/tools/wrk/wrk_utils.sh {1}".
177         format(Constants.REMOTE_FW_DIR, args), timeout=1800)
178     if int(ret) != 0:
179         raise RuntimeError('wrk runtime error.')
180
181     stats = _parse_wrk_output(stdout)
182
183     log_msg = "\nMeasured values:\n"
184     if test_type == "cps":
185         log_msg += "Connections/sec: Avg / Stdev / Max  / +/- Stdev\n"
186         for item in stats["rps-stats-lst"]:
187             log_msg += "{0} / {1} / {2} / {3}\n".format(*item)
188         log_msg += "Total cps: {0}cps\n".format(stats["rps-sum"])
189     elif test_type == "rps":
190         log_msg += "Requests/sec: Avg / Stdev / Max  / +/- Stdev\n"
191         for item in stats["rps-stats-lst"]:
192             log_msg += "{0} / {1} / {2} / {3}\n".format(*item)
193         log_msg += "Total rps: {0}cps\n".format(stats["rps-sum"])
194     elif test_type == "bw":
195         log_msg += "Transfer/sec: {0}Bps".format(stats["bw-sum"])
196
197     logger.info(log_msg)
198
199     return log_msg
200
201
202 def _parse_wrk_output(msg):
203     """Parse the wrk stdout with the results.
204
205     :param msg: stdout of wrk.
206     :type msg: str
207     :returns: Parsed results.
208     :rtype: dict
209     :raises: WrkError if the message does not include the results.
210     """
211
212     if "Thread Stats" not in msg:
213         raise WrkError("The output of wrk does not include the results.")
214
215     msg_lst = msg.splitlines(False)
216
217     stats = {
218         "latency-dist-lst": list(),
219         "latency-stats-lst": list(),
220         "rps-stats-lst": list(),
221         "rps-lst": list(),
222         "bw-lst": list(),
223         "rps-sum": 0,
224         "bw-sum": None
225     }
226
227     for line in msg_lst:
228         if "Latency Distribution" in line:
229             # Latency distribution - 50%, 75%, 90%, 99%
230             pass
231         elif "Latency" in line:
232             # Latency statistics - Avg, Stdev, Max, +/- Stdev
233             pass
234         elif "Req/Sec" in line:
235             # rps statistics - Avg, Stdev, Max, +/- Stdev
236             stats["rps-stats-lst"].append((
237                 _evaluate_number(re.search(REGEX_RPS_STATS, line).group(1)),
238                 _evaluate_number(re.search(REGEX_RPS_STATS, line).group(2)),
239                 _evaluate_number(re.search(REGEX_RPS_STATS, line).group(3)),
240                 _evaluate_number(re.search(REGEX_RPS_STATS, line).group(4))))
241         elif "Requests/sec:" in line:
242             # rps (cps)
243             stats["rps-lst"].append(
244                 _evaluate_number(re.search(REGEX_RPS, line).group(1)))
245         elif "Transfer/sec:" in line:
246             # BW
247             stats["bw-lst"].append(
248                 _evaluate_number(re.search(REGEX_BW, line).group(1)))
249
250     for item in stats["rps-stats-lst"]:
251         stats["rps-sum"] += item[0]
252     stats["bw-sum"] = sum(stats["bw-lst"])
253
254     return stats
255
256
257 def _evaluate_number(num):
258     """Evaluate the numeric value of the number with multiplicands, e.g.:
259     12.25k --> 12250
260
261     :param num: Number to evaluate.
262     :type num: str
263     :returns: Evaluated number.
264     :rtype: float
265     :raises: WrkError if it is not possible to evaluate the given number.
266     """
267
268     val = re.search(REGEX_NUM, num)
269     try:
270         val_num = float(val.group(1))
271     except ValueError:
272         raise WrkError("The output of wrk does not include the results "
273                        "or the format of results has changed.")
274     val_mul = val.group(2).lower()
275     if val_mul:
276         if "k" in val_mul:
277             val_num *= 1000
278         elif "m" in val_mul:
279             val_num *= 1000000
280         elif "g" in val_mul:
281             val_num *= 1000000000
282         elif "b" in val_mul:
283             pass
284         elif "%" in val_mul:
285             pass
286         elif "" in val_mul:
287             pass
288         else:
289             raise WrkError("The multiplicand {0} is not defined.".
290                            format(val_mul))
291     return val_num