mirror of
https://github.com/Azure/MachineLearningNotebooks.git
synced 2025-12-19 17:17:04 -05:00
168 lines
5.7 KiB
Python
168 lines
5.7 KiB
Python
from typing import Any, Dict, Optional, List
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import re
|
|
|
|
import pandas as pd
|
|
|
|
from matplotlib import pyplot as plt
|
|
from matplotlib.backends.backend_pdf import PdfPages
|
|
|
|
from azureml.automl.core.shared import constants
|
|
from azureml.automl.core.shared.types import GrainType
|
|
from azureml.automl.runtime.shared.score import scoring
|
|
|
|
GRAIN = "time_series_id"
|
|
BACKTEST_ITER = "backtest_iteration"
|
|
ACTUALS = "actual_level"
|
|
PREDICTIONS = "predicted_level"
|
|
ALL_GRAINS = "all_sets"
|
|
|
|
FORECASTS_FILE = "forecast.csv"
|
|
SCORES_FILE = "scores.csv"
|
|
PLOTS_FILE = "plots_fcst_vs_actual.pdf"
|
|
RE_INVALID_SYMBOLS = re.compile("[: ]")
|
|
|
|
|
|
def _compute_metrics(df: pd.DataFrame, metrics: List[str]):
|
|
"""
|
|
Compute metrics for one data frame.
|
|
|
|
:param df: The data frame which contains actual_level and predicted_level columns.
|
|
:return: The data frame with two columns - metric_name and metric.
|
|
"""
|
|
scores = scoring.score_regression(
|
|
y_test=df[ACTUALS], y_pred=df[PREDICTIONS], metrics=metrics
|
|
)
|
|
metrics_df = pd.DataFrame(list(scores.items()), columns=["metric_name", "metric"])
|
|
metrics_df.sort_values(["metric_name"], inplace=True)
|
|
metrics_df.reset_index(drop=True, inplace=True)
|
|
return metrics_df
|
|
|
|
|
|
def _format_grain_name(grain: GrainType) -> str:
|
|
"""
|
|
Convert grain name to string.
|
|
|
|
:param grain: the grain name.
|
|
:return: the string representation of the given grain.
|
|
"""
|
|
if not isinstance(grain, tuple) and not isinstance(grain, list):
|
|
return str(grain)
|
|
grain = list(map(str, grain))
|
|
return "|".join(grain)
|
|
|
|
|
|
def compute_all_metrics(
|
|
fcst_df: pd.DataFrame,
|
|
ts_id_colnames: List[str],
|
|
metric_names: Optional[List[set]] = None,
|
|
):
|
|
"""
|
|
Calculate metrics per grain.
|
|
|
|
:param fcst_df: forecast data frame. Must contain 2 columns: 'actual_level' and 'predicted_level'
|
|
:param metric_names: (optional) the list of metric names to return
|
|
:param ts_id_colnames: (optional) list of grain column names
|
|
:return: dictionary of summary table for all tests and final decision on stationary vs nonstaionary
|
|
"""
|
|
if not metric_names:
|
|
metric_names = list(constants.Metric.SCALAR_REGRESSION_SET)
|
|
|
|
if ts_id_colnames is None:
|
|
ts_id_colnames = []
|
|
|
|
metrics_list = []
|
|
if ts_id_colnames:
|
|
for grain, df in fcst_df.groupby(ts_id_colnames):
|
|
one_grain_metrics_df = _compute_metrics(df, metric_names)
|
|
one_grain_metrics_df[GRAIN] = _format_grain_name(grain)
|
|
metrics_list.append(one_grain_metrics_df)
|
|
|
|
# overall metrics
|
|
one_grain_metrics_df = _compute_metrics(fcst_df, metric_names)
|
|
one_grain_metrics_df[GRAIN] = ALL_GRAINS
|
|
metrics_list.append(one_grain_metrics_df)
|
|
|
|
# collect into a data frame
|
|
return pd.concat(metrics_list)
|
|
|
|
|
|
def _draw_one_plot(
|
|
df: pd.DataFrame,
|
|
time_column_name: str,
|
|
grain_column_names: List[str],
|
|
pdf: PdfPages,
|
|
) -> None:
|
|
"""
|
|
Draw the single plot.
|
|
|
|
:param df: The data frame with the data to build plot.
|
|
:param time_column_name: The name of a time column.
|
|
:param grain_column_names: The name of grain columns.
|
|
:param pdf: The pdf backend used to render the plot.
|
|
"""
|
|
fig, _ = plt.subplots(figsize=(20, 10))
|
|
df = df.set_index(time_column_name)
|
|
plt.plot(df[[ACTUALS, PREDICTIONS]])
|
|
plt.xticks(rotation=45)
|
|
iteration = df[BACKTEST_ITER].iloc[0]
|
|
if grain_column_names:
|
|
grain_name = [df[grain].iloc[0] for grain in grain_column_names]
|
|
plt.title(f"Time series ID: {_format_grain_name(grain_name)} {iteration}")
|
|
plt.legend(["actual", "forecast"])
|
|
plt.close(fig)
|
|
pdf.savefig(fig)
|
|
|
|
|
|
def calculate_scores_and_build_plots(
|
|
input_dir: str, output_dir: str, automl_settings: Dict[str, Any]
|
|
):
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
grains = automl_settings.get(constants.TimeSeries.GRAIN_COLUMN_NAMES)
|
|
time_column_name = automl_settings.get(constants.TimeSeries.TIME_COLUMN_NAME)
|
|
if grains is None:
|
|
grains = []
|
|
if isinstance(grains, str):
|
|
grains = [grains]
|
|
while BACKTEST_ITER in grains:
|
|
grains.remove(BACKTEST_ITER)
|
|
|
|
dfs = []
|
|
for fle in os.listdir(input_dir):
|
|
file_path = os.path.join(input_dir, fle)
|
|
if os.path.isfile(file_path) and file_path.endswith(".csv"):
|
|
df_iter = pd.read_csv(file_path, parse_dates=[time_column_name])
|
|
for _, iteration in df_iter.groupby(BACKTEST_ITER):
|
|
dfs.append(iteration)
|
|
forecast_df = pd.concat(dfs, sort=False, ignore_index=True)
|
|
# To make sure plots are in order, sort the predictions by grain and iteration.
|
|
ts_index = grains + [BACKTEST_ITER]
|
|
forecast_df.sort_values(by=ts_index, inplace=True)
|
|
pdf = PdfPages(os.path.join(output_dir, PLOTS_FILE))
|
|
for _, one_forecast in forecast_df.groupby(ts_index):
|
|
_draw_one_plot(one_forecast, time_column_name, grains, pdf)
|
|
pdf.close()
|
|
forecast_df.to_csv(os.path.join(output_dir, FORECASTS_FILE), index=False)
|
|
metrics = compute_all_metrics(forecast_df, grains + [BACKTEST_ITER])
|
|
metrics.to_csv(os.path.join(output_dir, SCORES_FILE), index=False)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
args = {"forecasts": "--forecasts", "scores_out": "--output-dir"}
|
|
parser = argparse.ArgumentParser("Parsing input arguments.")
|
|
for argname, arg in args.items():
|
|
parser.add_argument(arg, dest=argname, required=True)
|
|
parsed_args, _ = parser.parse_known_args()
|
|
input_dir = parsed_args.forecasts
|
|
output_dir = parsed_args.scores_out
|
|
with open(
|
|
os.path.join(
|
|
os.path.dirname(os.path.realpath(__file__)), "automl_settings.json"
|
|
)
|
|
) as json_file:
|
|
automl_settings = json.load(json_file)
|
|
calculate_scores_and_build_plots(input_dir, output_dir, automl_settings)
|