"""Generation of Continuous Performance Trending and Analysis.
"""
-import multiprocessing
-import os
import logging
import csv
import prettytable
from collections import OrderedDict
from datetime import datetime
+from copy import deepcopy
-from utils import archive_input_data, execute_command, \
- classify_anomalies, Worker
+from utils import archive_input_data, execute_command, classify_anomalies
# Command to build the html format of the report
css_file:
css_file.write(THEME_OVERRIDES)
- archive_input_data(spec)
+ if spec.configuration.get("archive-inputs", True):
+ archive_input_data(spec)
logging.info("Done.")
:type input_data: InputData
"""
- def _generate_chart(_, data_q, graph):
+ def _generate_chart(graph):
"""Generates the chart.
"""
logs = list()
- logging.info(" Generating the chart '{0}' ...".
- format(graph.get("title", "")))
logs.append(("INFO", " Generating the chart '{0}' ...".
format(graph.get("title", ""))))
return
chart_data = dict()
+ chart_tags = dict()
for job, job_data in data.iteritems():
if job != job_name:
continue
try:
chart_data[test_name][int(index)] = \
test["result"]["receive-rate"]
+ chart_tags[test_name] = test.get("tags", None)
except (KeyError, TypeError):
pass
itm = itm.avg
tst_lst.append(str(itm))
csv_tbl.append("{0},".format(tst_name) + ",".join(tst_lst) + '\n')
+
# Generate traces:
traces = list()
index = 0
- for test_name, test_data in chart_data.items():
- if not test_data:
- logs.append(("WARNING", "No data for the test '{0}'".
- format(test_name)))
- continue
- message = "index: {index}, test: {test}".format(
- index=index, test=test_name)
- test_name = test_name.split('.')[-1]
- try:
- trace, rslt = _generate_trending_traces(
- test_data,
- job_name=job_name,
- build_info=build_info,
- name='-'.join(test_name.split('-')[2:-1]),
- color=COLORS[index])
- except IndexError:
- message = "Out of colors: {}".format(message)
- logs.append(("ERROR", message))
- logging.error(message)
+ groups = graph.get("groups", None)
+ visibility = list()
+
+ if groups:
+ for group in groups:
+ visible = list()
+ for tag in group:
+ for test_name, test_data in chart_data.items():
+ if not test_data:
+ logs.append(("WARNING",
+ "No data for the test '{0}'".
+ format(test_name)))
+ continue
+ if tag in chart_tags[test_name]:
+ message = "index: {index}, test: {test}".format(
+ index=index, test=test_name)
+ test_name = test_name.split('.')[-1]
+ try:
+ trace, rslt = _generate_trending_traces(
+ test_data,
+ job_name=job_name,
+ build_info=build_info,
+ name='-'.join(test_name.split('-')[2:-1]),
+ color=COLORS[index])
+ except IndexError:
+ message = "Out of colors: {}".format(message)
+ logs.append(("ERROR", message))
+ logging.error(message)
+ index += 1
+ continue
+ traces.extend(trace)
+ visible.extend([True for _ in range(len(trace))])
+ res.append(rslt)
+ index += 1
+ break
+ visibility.append(visible)
+ else:
+ for test_name, test_data in chart_data.items():
+ if not test_data:
+ logs.append(("WARNING", "No data for the test '{0}'".
+ format(test_name)))
+ continue
+ message = "index: {index}, test: {test}".format(
+ index=index, test=test_name)
+ test_name = test_name.split('.')[-1]
+ try:
+ trace, rslt = _generate_trending_traces(
+ test_data,
+ job_name=job_name,
+ build_info=build_info,
+ name='-'.join(test_name.split('-')[2:-1]),
+ color=COLORS[index])
+ except IndexError:
+ message = "Out of colors: {}".format(message)
+ logs.append(("ERROR", message))
+ logging.error(message)
+ index += 1
+ continue
+ traces.extend(trace)
+ res.append(rslt)
index += 1
- continue
- traces.extend(trace)
- res.append(rslt)
- index += 1
if traces:
# Generate the chart:
- graph["layout"]["xaxis"]["title"] = \
- graph["layout"]["xaxis"]["title"].format(job=job_name)
+ try:
+ layout = deepcopy(graph["layout"])
+ except KeyError as err:
+ logging.error("Finished with error: No layout defined")
+ logging.error(repr(err))
+ return
+ if groups:
+ show = list()
+ for i in range(len(visibility)):
+ visible = list()
+ for r in range(len(visibility)):
+ for _ in range(len(visibility[r])):
+ visible.append(i == r)
+ show.append(visible)
+
+ buttons = list()
+ buttons.append(dict(
+ label="All",
+ method="update",
+ args=[{"visible": [True for _ in range(len(show[0]))]}, ]
+ ))
+ for i in range(len(groups)):
+ try:
+ label = graph["group-names"][i]
+ except (IndexError, KeyError):
+ label = "Group {num}".format(num=i + 1)
+ buttons.append(dict(
+ label=label,
+ method="update",
+ args=[{"visible": show[i]}, ]
+ ))
+
+ layout['updatemenus'] = list([
+ dict(
+ active=0,
+ type="dropdown",
+ direction="down",
+ xanchor="left",
+ yanchor="bottom",
+ x=-0.12,
+ y=1.0,
+ buttons=buttons
+ )
+ ])
+
name_file = "{0}-{1}{2}".format(spec.cpta["output-file"],
graph["output-file-name"],
spec.cpta["output-file-type"])
logs.append(("INFO", " Writing the file '{0}' ...".
format(name_file)))
- plpl = plgo.Figure(data=traces, layout=graph["layout"])
+ plpl = plgo.Figure(data=traces, layout=layout)
try:
ploff.plot(plpl, show_link=False, auto_open=False,
filename=name_file)
except plerr.PlotlyEmptyDataError:
logs.append(("WARNING", "No data for the plot. Skipped."))
- data_out = {
- "job_name": job_name,
- "csv_table": csv_tbl,
- "results": res,
- "logs": logs
- }
- data_q.put(data_out)
+ for level, line in logs:
+ if level == "INFO":
+ logging.info(line)
+ elif level == "ERROR":
+ logging.error(line)
+ elif level == "DEBUG":
+ logging.debug(line)
+ elif level == "CRITICAL":
+ logging.critical(line)
+ elif level == "WARNING":
+ logging.warning(line)
+
+ return {"job_name": job_name, "csv_table": csv_tbl, "results": res}
builds_dict = dict()
for job in spec.input["builds"].keys():
testbed
)
- work_queue = multiprocessing.JoinableQueue()
- manager = multiprocessing.Manager()
- data_queue = manager.Queue()
- cpus = multiprocessing.cpu_count()
-
- workers = list()
- for cpu in range(cpus):
- worker = Worker(work_queue,
- data_queue,
- _generate_chart)
- worker.daemon = True
- worker.start()
- workers.append(worker)
- os.system("taskset -p -c {0} {1} > /dev/null 2>&1".
- format(cpu, worker.pid))
-
- for chart in spec.cpta["plots"]:
- work_queue.put((chart, ))
- work_queue.join()
-
anomaly_classifications = list()
# Create the header:
header = "Version:," + ",".join(versions) + '\n'
csv_tables[job_name].append(header)
- while not data_queue.empty():
- result = data_queue.get()
+ for chart in spec.cpta["plots"]:
+ result = _generate_chart(chart)
anomaly_classifications.extend(result["results"])
csv_tables[result["job_name"]].extend(result["csv_table"])
- for item in result["logs"]:
- if item[0] == "INFO":
- logging.info(item[1])
- elif item[0] == "ERROR":
- logging.error(item[1])
- elif item[0] == "DEBUG":
- logging.debug(item[1])
- elif item[0] == "CRITICAL":
- logging.critical(item[1])
- elif item[0] == "WARNING":
- logging.warning(item[1])
-
- del data_queue
-
- # Terminate all workers
- for worker in workers:
- worker.terminate()
- worker.join()
-
# Write the tables:
for job_name, csv_table in csv_tables.items():
file_name = spec.cpta["output-file"] + "-" + job_name + "-trending"