CSIT-1106: Unify the anomaly detection (plots, dashboard)
[csit.git] / resources / tools / presentation / generator_CPTA.py
index e8091c0..2c62e11 100644 (file)
 """Generation of Continuous Performance Trending and Analysis.
 """
 
-import datetime
+import multiprocessing
+import os
 import logging
 import csv
 import prettytable
 import plotly.offline as ploff
 import plotly.graph_objs as plgo
 import plotly.exceptions as plerr
-import numpy as np
 import pandas as pd
 
 from collections import OrderedDict
-from utils import find_outliers, archive_input_data, execute_command
+from datetime import datetime
+
+from utils import split_outliers, archive_input_data, execute_command,\
+    classify_anomalies, Worker
 
 
 # Command to build the html format of the report
 HTML_BUILDER = 'sphinx-build -v -c conf_cpta -a ' \
                '-b html -E ' \
                '-t html ' \
-               '-D version="Generated on {date}" ' \
+               '-D version="{date}" ' \
                '{working_dir} ' \
                '{build_dir}/'
 
@@ -64,7 +67,7 @@ def generate_cpta(spec, data):
     ret_code = _generate_all_charts(spec, data)
 
     cmd = HTML_BUILDER.format(
-        date=datetime.date.today().strftime('%d-%b-%Y'),
+        date=datetime.utcnow().strftime('%m/%d/%Y %H:%M UTC'),
         working_dir=spec.environment["paths"]["DIR[WORKING,SRC]"],
         build_dir=spec.environment["paths"]["DIR[BUILD,HTML]"])
     execute_command(cmd)
@@ -84,204 +87,84 @@ def generate_cpta(spec, data):
     return ret_code
 
 
-def _select_data(in_data, period, fill_missing=False, use_first=False):
-    """Select the data from the full data set. The selection is done by picking
-    the samples depending on the period: period = 1: All, period = 2: every
-    second sample, period = 3: every third sample ...
-
-    :param in_data: Full set of data.
-    :param period: Sampling period.
-    :param fill_missing: If the chosen sample is missing in the full set, its
-    nearest neighbour is used.
-    :param use_first: Use the first sample even though it is not chosen.
-    :type in_data: OrderedDict
-    :type period: int
-    :type fill_missing: bool
-    :type use_first: bool
-    :returns: Reduced data.
-    :rtype: OrderedDict
-    """
-
-    first_idx = min(in_data.keys())
-    last_idx = max(in_data.keys())
-
-    idx = last_idx
-    data_dict = dict()
-    if use_first:
-        data_dict[first_idx] = in_data[first_idx]
-    while idx >= first_idx:
-        data = in_data.get(idx, None)
-        if data is None:
-            if fill_missing:
-                threshold = int(round(idx - period / 2)) + 1 - period % 2
-                idx_low = first_idx if threshold < first_idx else threshold
-                threshold = int(round(idx + period / 2))
-                idx_high = last_idx if threshold > last_idx else threshold
-
-                flag_l = True
-                flag_h = True
-                idx_lst = list()
-                inc = 1
-                while flag_l or flag_h:
-                    if idx + inc > idx_high:
-                        flag_h = False
-                    else:
-                        idx_lst.append(idx + inc)
-                    if idx - inc < idx_low:
-                        flag_l = False
-                    else:
-                        idx_lst.append(idx - inc)
-                    inc += 1
-
-                for i in idx_lst:
-                    if i in in_data.keys():
-                        data_dict[i] = in_data[i]
-                        break
-        else:
-            data_dict[idx] = data
-        idx -= period
-
-    return OrderedDict(sorted(data_dict.items(), key=lambda t: t[0]))
-
-
-def _evaluate_results(in_data, trimmed_data, window=10):
-    """Evaluates if the sample value is regress, normal or progress compared to
-    previous data within the window.
-    We use the intervals defined as:
-    - regress: less than median - 3 * stdev
-    - normal: between median - 3 * stdev and median + 3 * stdev
-    - progress: more than median + 3 * stdev
-
-    :param in_data: Full data set.
-    :param trimmed_data: Full data set without the outliers.
-    :param window: Window size used to calculate moving median and moving stdev.
-    :type in_data: pandas.Series
-    :type trimmed_data: pandas.Series
-    :type window: int
-    :returns: Evaluated results.
-    :rtype: list
-    """
-
-    if len(in_data) > 2:
-        win_size = in_data.size if in_data.size < window else window
-        results = [0.0, ] * win_size
-        median = in_data.rolling(window=win_size).median()
-        stdev_t = trimmed_data.rolling(window=win_size, min_periods=2).std()
-        m_vals = median.values
-        s_vals = stdev_t.values
-        d_vals = in_data.values
-        for day in range(win_size, in_data.size):
-            if np.isnan(m_vals[day - 1]) or np.isnan(s_vals[day - 1]):
-                results.append(0.0)
-            elif d_vals[day] < (m_vals[day - 1] - 3 * s_vals[day - 1]):
-                results.append(0.33)
-            elif (m_vals[day - 1] - 3 * s_vals[day - 1]) <= d_vals[day] <= \
-                    (m_vals[day - 1] + 3 * s_vals[day - 1]):
-                results.append(0.66)
-            else:
-                results.append(1.0)
-    else:
-        results = [0.0, ]
-        try:
-            median = np.median(in_data)
-            stdev = np.std(in_data)
-            if in_data.values[-1] < (median - 3 * stdev):
-                results.append(0.33)
-            elif (median - 3 * stdev) <= in_data.values[-1] <= (
-                    median + 3 * stdev):
-                results.append(0.66)
-            else:
-                results.append(1.0)
-        except TypeError:
-            results.append(None)
-    return results
-
-
-def _generate_trending_traces(in_data, build_info, period, moving_win_size=10,
-                              fill_missing=True, use_first=False,
-                              show_moving_median=True, name="", color=""):
+def _generate_trending_traces(in_data, build_info, moving_win_size=10,
+                              show_trend_line=True, name="", color=""):
     """Generate the trending traces:
      - samples,
-     - moving median (trending plot)
+     - trimmed moving median (trending line)
      - outliers, regress, progress
 
     :param in_data: Full data set.
     :param build_info: Information about the builds.
-    :param period: Sampling period.
     :param moving_win_size: Window size.
-    :param fill_missing: If the chosen sample is missing in the full set, its
-    nearest neighbour is used.
-    :param use_first: Use the first sample even though it is not chosen.
-    :param show_moving_median: Show moving median (trending plot).
+    :param show_trend_line: Show moving median (trending plot).
     :param name: Name of the plot
     :param color: Name of the color for the plot.
     :type in_data: OrderedDict
     :type build_info: dict
-    :type period: int
     :type moving_win_size: int
-    :type fill_missing: bool
-    :type use_first: bool
-    :type show_moving_median: bool
+    :type show_trend_line: bool
     :type name: str
     :type color: str
-    :returns: Generated traces (list) and the evaluated result (float).
+    :returns: Generated traces (list) and the evaluated result.
     :rtype: tuple(traces, result)
     """
 
-    if period > 1:
-        in_data = _select_data(in_data, period,
-                               fill_missing=fill_missing,
-                               use_first=use_first)
-    try:
-        data_x = ["{0}/{1}".format(key, build_info[str(key)][1].split("~")[-1])
-                  for key in in_data.keys()]
-    except KeyError:
-        data_x = [key for key in in_data.keys()]
-    data_y = [val for val in in_data.values()]
-    data_pd = pd.Series(data_y, index=data_x)
+    data_x = list(in_data.keys())
+    data_y = list(in_data.values())
 
-    t_data, outliers = find_outliers(data_pd)
+    hover_text = list()
+    xaxis = list()
+    for idx in data_x:
+        hover_text.append("vpp-ref: {0}<br>csit-ref: mrr-daily-build-{1}".
+                          format(build_info[str(idx)][1].rsplit('~', 1)[0],
+                                 idx))
+        date = build_info[str(idx)][0]
+        xaxis.append(datetime(int(date[0:4]), int(date[4:6]), int(date[6:8]),
+                              int(date[9:11]), int(date[12:])))
 
-    results = _evaluate_results(data_pd, t_data, window=moving_win_size)
+    data_pd = pd.Series(data_y, index=xaxis)
+
+    t_data, outliers = split_outliers(data_pd, outlier_const=1.5,
+                                      window=moving_win_size)
+    anomaly_classification = classify_anomalies(t_data, window=moving_win_size)
 
     anomalies = pd.Series()
-    anomalies_res = list()
-    for idx, item in enumerate(in_data.items()):
-        item_pd = pd.Series([item[1], ],
-                            index=["{0}/{1}".
-                            format(item[0],
-                                   build_info[str(item[0])][1].split("~")[-1]), ])
-        if item[0] in outliers.keys():
-            anomalies = anomalies.append(item_pd)
-            anomalies_res.append(0.0)
-        elif results[idx] in (0.33, 1.0):
-            anomalies = anomalies.append(item_pd)
-            anomalies_res.append(results[idx])
-    anomalies_res.extend([0.0, 0.33, 0.66, 1.0])
+    anomalies_colors = list()
+    anomaly_color = {
+        "outlier": 0.0,
+        "regression": 0.33,
+        "normal": 0.66,
+        "progression": 1.0
+    }
+    if anomaly_classification:
+        for idx, item in enumerate(data_pd.items()):
+            if anomaly_classification[idx] in \
+                    ("outlier", "regression", "progression"):
+                anomalies = anomalies.append(pd.Series([item[1], ],
+                                                       index=[item[0], ]))
+                anomalies_colors.append(
+                    anomaly_color[anomaly_classification[idx]])
+        anomalies_colors.extend([0.0, 0.33, 0.66, 1.0])
 
     # Create traces
-    color_scale = [[0.00, "grey"],
-                   [0.25, "grey"],
-                   [0.25, "red"],
-                   [0.50, "red"],
-                   [0.50, "white"],
-                   [0.75, "white"],
-                   [0.75, "green"],
-                   [1.00, "green"]]
 
     trace_samples = plgo.Scatter(
-        x=data_x,
+        x=xaxis,
         y=data_y,
         mode='markers',
         line={
             "width": 1
         },
+        legendgroup=name,
         name="{name}-thput".format(name=name),
         marker={
             "size": 5,
             "color": color,
             "symbol": "circle",
         },
+        text=hover_text,
+        hoverinfo="x+y+text+name"
     )
     traces = [trace_samples, ]
 
@@ -290,14 +173,21 @@ def _generate_trending_traces(in_data, build_info, period, moving_win_size=10,
         y=anomalies.values,
         mode='markers',
         hoverinfo="none",
-        showlegend=False,
+        showlegend=True,
         legendgroup=name,
-        name="{name}: outliers".format(name=name),
+        name="{name}-anomalies".format(name=name),
         marker={
             "size": 15,
             "symbol": "circle-open",
-            "color": anomalies_res,
-            "colorscale": color_scale,
+            "color": anomalies_colors,
+            "colorscale": [[0.00, "grey"],
+                           [0.25, "grey"],
+                           [0.25, "red"],
+                           [0.50, "red"],
+                           [0.50, "white"],
+                           [0.75, "white"],
+                           [0.75, "green"],
+                           [1.00, "green"]],
             "showscale": True,
             "line": {
                 "width": 2
@@ -322,43 +212,24 @@ def _generate_trending_traces(in_data, build_info, period, moving_win_size=10,
     )
     traces.append(trace_anomalies)
 
-    if show_moving_median:
-        data_mean_y = pd.Series(data_y).rolling(
-            window=moving_win_size, min_periods=2).median()
-        trace_median = plgo.Scatter(
-            x=data_x,
-            y=data_mean_y,
+    if show_trend_line:
+        data_trend = t_data.rolling(window=moving_win_size,
+                                    min_periods=2).median()
+        trace_trend = plgo.Scatter(
+            x=data_trend.keys(),
+            y=data_trend.tolist(),
             mode='lines',
             line={
                 "shape": "spline",
                 "width": 1,
                 "color": color,
             },
+            legendgroup=name,
             name='{name}-trend'.format(name=name)
         )
-        traces.append(trace_median)
+        traces.append(trace_trend)
 
-    return traces, results[-1]
-
-
-def _generate_chart(traces, layout, file_name):
-    """Generates the whole chart using pre-generated traces.
-
-    :param traces: Traces for the chart.
-    :param layout: Layout of the chart.
-    :param file_name: File name for the generated chart.
-    :type traces: list
-    :type layout: dict
-    :type file_name: str
-    """
-
-    # Create plot
-    logging.info("    Writing the file '{0}' ...".format(file_name))
-    plpl = plgo.Figure(data=traces, layout=layout)
-    try:
-        ploff.plot(plpl, show_link=False, auto_open=False, filename=file_name)
-    except plerr.PlotlyEmptyDataError:
-        logging.warning(" No data for the plot. Skipped.")
+    return traces, anomaly_classification[-1]
 
 
 def _generate_all_charts(spec, input_data):
@@ -370,6 +241,93 @@ def _generate_all_charts(spec, input_data):
     :type input_data: InputData
     """
 
+    def _generate_chart(_, data_q, graph):
+        """Generates the chart.
+        """
+
+        logs = list()
+
+        logging.info("  Generating the chart '{0}' ...".
+                     format(graph.get("title", "")))
+        logs.append(("INFO", "  Generating the chart '{0}' ...".
+                     format(graph.get("title", ""))))
+
+        job_name = spec.cpta["data"].keys()[0]
+
+        csv_tbl = list()
+        res = list()
+
+        # Transform the data
+        logs.append(("INFO", "    Creating the data set for the {0} '{1}'.".
+                     format(graph.get("type", ""), graph.get("title", ""))))
+        data = input_data.filter_data(graph, continue_on_error=True)
+        if data is None:
+            logging.error("No data.")
+            return
+
+        chart_data = dict()
+        for job in data:
+            for index, bld in job.items():
+                for test_name, test in bld.items():
+                    if chart_data.get(test_name, None) is None:
+                        chart_data[test_name] = OrderedDict()
+                    try:
+                        chart_data[test_name][int(index)] = \
+                            test["result"]["throughput"]
+                    except (KeyError, TypeError):
+                        pass
+
+        # Add items to the csv table:
+        for tst_name, tst_data in chart_data.items():
+            tst_lst = list()
+            for bld in builds_lst:
+                itm = tst_data.get(int(bld), '')
+                tst_lst.append(str(itm))
+            csv_tbl.append("{0},".format(tst_name) + ",".join(tst_lst) + '\n')
+        # Generate traces:
+        traces = list()
+        win_size = 14
+        index = 0
+        for test_name, test_data in chart_data.items():
+            if not test_data:
+                logs.append(("WARNING", "No data for the test '{0}'".
+                             format(test_name)))
+                continue
+            test_name = test_name.split('.')[-1]
+            trace, rslt = _generate_trending_traces(
+                test_data,
+                build_info=build_info,
+                moving_win_size=win_size,
+                name='-'.join(test_name.split('-')[3:-1]),
+                color=COLORS[index])
+            traces.extend(trace)
+            res.append(rslt)
+            index += 1
+
+        if traces:
+            # Generate the chart:
+            graph["layout"]["xaxis"]["title"] = \
+                graph["layout"]["xaxis"]["title"].format(job=job_name)
+            name_file = "{0}-{1}{2}".format(spec.cpta["output-file"],
+                                            graph["output-file-name"],
+                                            spec.cpta["output-file-type"])
+
+            logs.append(("INFO", "    Writing the file '{0}' ...".
+                         format(name_file)))
+            plpl = plgo.Figure(data=traces, layout=graph["layout"])
+            try:
+                ploff.plot(plpl, show_link=False, auto_open=False,
+                           filename=name_file)
+            except plerr.PlotlyEmptyDataError:
+                logs.append(("WARNING", "No data for the plot. Skipped."))
+
+        data_out = {
+            "csv_table": csv_tbl,
+            "results": res,
+            "logs": logs
+        }
+        data_q.put(data_out)
+
     job_name = spec.cpta["data"].keys()[0]
 
     builds_lst = list()
@@ -379,7 +337,7 @@ def _generate_all_charts(spec, input_data):
             builds_lst.append(str(build["build"]))
 
     # Get "build ID": "date" dict:
-    build_info = dict()
+    build_info = OrderedDict()
     for build in builds_lst:
         try:
             build_info[build] = (
@@ -387,7 +345,29 @@ def _generate_all_charts(spec, input_data):
                 input_data.metadata(job_name, build)["version"]
             )
         except KeyError:
-            pass
+            build_info[build] = ("", "")
+
+    work_queue = multiprocessing.JoinableQueue()
+    manager = multiprocessing.Manager()
+    data_queue = manager.Queue()
+    cpus = multiprocessing.cpu_count()
+
+    workers = list()
+    for cpu in range(cpus):
+        worker = Worker(work_queue,
+                        data_queue,
+                        _generate_chart)
+        worker.daemon = True
+        worker.start()
+        workers.append(worker)
+        os.system("taskset -p -c {0} {1} > /dev/null 2>&1".
+                  format(cpu, worker.pid))
+
+    for chart in spec.cpta["plots"]:
+        work_queue.put((chart, ))
+    work_queue.join()
+
+    anomaly_classifications = list()
 
     # Create the header:
     csv_table = list()
@@ -400,73 +380,30 @@ def _generate_all_charts(spec, input_data):
     header = "VPP Version:," + ",".join(vpp_versions) + '\n'
     csv_table.append(header)
 
-    results = list()
-    for chart in spec.cpta["plots"]:
-        logging.info("  Generating the chart '{0}' ...".
-                     format(chart.get("title", "")))
+    while not data_queue.empty():
+        result = data_queue.get()
 
-        # Transform the data
-        data = input_data.filter_data(chart, continue_on_error=True)
-        if data is None:
-            logging.error("No data.")
-            return
+        anomaly_classifications.extend(result["results"])
+        csv_table.extend(result["csv_table"])
 
-        chart_data = dict()
-        for job in data:
-            for idx, build in job.items():
-                for test_name, test in build.items():
-                    if chart_data.get(test_name, None) is None:
-                        chart_data[test_name] = OrderedDict()
-                    try:
-                        chart_data[test_name][int(idx)] = \
-                            test["result"]["throughput"]
-                    except (KeyError, TypeError):
-                        pass
+        for item in result["logs"]:
+            if item[0] == "INFO":
+                logging.info(item[1])
+            elif item[0] == "ERROR":
+                logging.error(item[1])
+            elif item[0] == "DEBUG":
+                logging.debug(item[1])
+            elif item[0] == "CRITICAL":
+                logging.critical(item[1])
+            elif item[0] == "WARNING":
+                logging.warning(item[1])
 
-        # Add items to the csv table:
-        for tst_name, tst_data in chart_data.items():
-            tst_lst = list()
-            for build in builds_lst:
-                item = tst_data.get(int(build), '')
-                tst_lst.append(str(item) if item else '')
-            csv_table.append("{0},".format(tst_name) + ",".join(tst_lst) + '\n')
-
-        for period in chart["periods"]:
-            # Generate traces:
-            traces = list()
-            win_size = 10 if period == 1 else 5 if period < 20 else 3
-            idx = 0
-            for test_name, test_data in chart_data.items():
-                if not test_data:
-                    logging.warning("No data for the test '{0}'".
-                                    format(test_name))
-                    continue
-                test_name = test_name.split('.')[-1]
-                trace, result = _generate_trending_traces(
-                    test_data,
-                    build_info=build_info,
-                    period=period,
-                    moving_win_size=win_size,
-                    fill_missing=True,
-                    use_first=False,
-                    name='-'.join(test_name.split('-')[3:-1]),
-                    color=COLORS[idx])
-                traces.extend(trace)
-                results.append(result)
-                idx += 1
+    del data_queue
 
-            # Generate the chart:
-            chart["layout"]["xaxis"]["title"] = \
-                chart["layout"]["xaxis"]["title"].format(job=job_name)
-            _generate_chart(traces,
-                            chart["layout"],
-                            file_name="{0}-{1}-{2}{3}".format(
-                                spec.cpta["output-file"],
-                                chart["output-file-name"],
-                                period,
-                                spec.cpta["output-file-type"]))
-
-        logging.info("  Done.")
+    # Terminate all workers
+    for worker in workers:
+        worker.terminate()
+        worker.join()
 
     # Write the tables:
     file_name = spec.cpta["output-file"] + "-trending"
@@ -487,24 +424,27 @@ def _generate_all_charts(spec, input_data):
                             row[idx] = str(round(float(item) / 1000000, 2))
                         except ValueError:
                             pass
-                txt_table.add_row(row)
+                try:
+                    txt_table.add_row(row)
+                except Exception as err:
+                    logging.warning("Error occurred while generating TXT table:"
+                                    "\n{0}".format(err))
             line_nr += 1
         txt_table.align["Build Number:"] = "l"
     with open("{0}.txt".format(file_name), "w") as txt_file:
         txt_file.write(str(txt_table))
 
     # Evaluate result:
-    result = "PASS"
-    for item in results:
-        if item is None:
-            result = "FAIL"
-            break
-        if item == 0.66 and result == "PASS":
-            result = "PASS"
-        elif item == 0.33 or item == 0.0:
-            result = "FAIL"
-
-    logging.info("Partial results: {0}".format(results))
+    if anomaly_classifications:
+        result = "PASS"
+        for classification in anomaly_classifications:
+            if classification == "regression" or classification == "outlier":
+                result = "FAIL"
+                break
+    else:
+        result = "FAIL"
+
+    logging.info("Partial results: {0}".format(anomaly_classifications))
     logging.info("Result: {0}".format(result))
 
     return result