Merge pull request #1852 from Azure/release_update/Release-167

update samples from Release-167 as a part of  SDK release
This commit is contained in:
Harneet Virk
2022-11-08 11:01:10 -08:00
committed by GitHub
9 changed files with 170 additions and 335 deletions

View File

@@ -21,6 +21,7 @@ dependencies:
- markupsafe<2.1.0 - markupsafe<2.1.0
- tqdm==4.64.1 - tqdm==4.64.1
- jsonschema==4.16.0 - jsonschema==4.16.0
- websocket-client==1.4.1
- pip: - pip:
# Required packages for AzureML execution, history, and data preparation. # Required packages for AzureML execution, history, and data preparation.

View File

@@ -43,11 +43,20 @@ def init():
global output_dir global output_dir
global automl_settings global automl_settings
global model_uid global model_uid
global forecast_quantiles
logger.info("Initialization of the run.") logger.info("Initialization of the run.")
parser = argparse.ArgumentParser("Parsing input arguments.") parser = argparse.ArgumentParser("Parsing input arguments.")
parser.add_argument("--output-dir", dest="out", required=True) parser.add_argument("--output-dir", dest="out", required=True)
parser.add_argument("--model-name", dest="model", default=None) parser.add_argument("--model-name", dest="model", default=None)
parser.add_argument("--model-uid", dest="model_uid", default=None) parser.add_argument("--model-uid", dest="model_uid", default=None)
parser.add_argument(
"--forecast_quantiles",
nargs="*",
type=float,
help="forecast quantiles list",
default=None,
)
parsed_args, _ = parser.parse_known_args() parsed_args, _ = parser.parse_known_args()
model_name = parsed_args.model model_name = parsed_args.model
@@ -55,6 +64,7 @@ def init():
target_column_name = automl_settings.get("label_column_name") target_column_name = automl_settings.get("label_column_name")
output_dir = parsed_args.out output_dir = parsed_args.out
model_uid = parsed_args.model_uid model_uid = parsed_args.model_uid
forecast_quantiles = parsed_args.forecast_quantiles
os.makedirs(output_dir, exist_ok=True) os.makedirs(output_dir, exist_ok=True)
os.environ["AUTOML_IGNORE_PACKAGE_VERSION_INCOMPATIBILITIES".lower()] = "True" os.environ["AUTOML_IGNORE_PACKAGE_VERSION_INCOMPATIBILITIES".lower()] = "True"
@@ -126,23 +136,18 @@ def run_backtest(data_input_name: str, file_name: str, experiment: Experiment):
) )
print(f"The model {best_run.properties['model_name']} was registered.") print(f"The model {best_run.properties['model_name']} was registered.")
_, x_pred = fitted_model.forecast(X_test) # By default we will have forecast quantiles of 0.5, which is our target
x_pred.reset_index(inplace=True, drop=False) if forecast_quantiles:
columns = [automl_settings[constants.TimeSeries.TIME_COLUMN_NAME]] if 0.5 not in forecast_quantiles:
if automl_settings.get(constants.TimeSeries.GRAIN_COLUMN_NAMES): forecast_quantiles.append(0.5)
# We know that fitted_model.grain_column_names is a list. fitted_model.quantiles = forecast_quantiles
columns.extend(fitted_model.grain_column_names)
columns.append(constants.TimeSeriesInternal.DUMMY_TARGET_COLUMN) x_pred = fitted_model.forecast_quantiles(X_test)
# Remove featurized columns.
x_pred = x_pred[columns]
x_pred.rename(
{constants.TimeSeriesInternal.DUMMY_TARGET_COLUMN: "predicted_level"},
axis=1,
inplace=True,
)
x_pred["actual_level"] = y_test x_pred["actual_level"] = y_test
x_pred["backtest_iteration"] = f"iteration_{last_training_date}" x_pred["backtest_iteration"] = f"iteration_{last_training_date}"
x_pred.rename({0.5: "predicted_level"}, axis=1, inplace=True)
date_safe = RE_INVALID_SYMBOLS.sub("_", last_training_date) date_safe = RE_INVALID_SYMBOLS.sub("_", last_training_date)
x_pred.to_csv(os.path.join(output_dir, f"iteration_{date_safe}.csv"), index=False) x_pred.to_csv(os.path.join(output_dir, f"iteration_{date_safe}.csv"), index=False)
return x_pred return x_pred

View File

@@ -365,6 +365,7 @@
" step_size=BACKTESTING_PERIOD,\n", " step_size=BACKTESTING_PERIOD,\n",
" step_number=NUMBER_OF_BACKTESTS,\n", " step_number=NUMBER_OF_BACKTESTS,\n",
" model_uid=model_uid,\n", " model_uid=model_uid,\n",
" forecast_quantiles=[0.025, 0.975], # Optional\n",
")" ")"
] ]
}, },
@@ -590,6 +591,7 @@
" step_size=BACKTESTING_PERIOD,\n", " step_size=BACKTESTING_PERIOD,\n",
" step_number=NUMBER_OF_BACKTESTS,\n", " step_number=NUMBER_OF_BACKTESTS,\n",
" model_name=model_name,\n", " model_name=model_name,\n",
" forecast_quantiles=[0.025, 0.975],\n",
")" ")"
] ]
}, },

View File

@@ -31,6 +31,7 @@ def get_backtest_pipeline(
step_number: int, step_number: int,
model_name: Optional[str] = None, model_name: Optional[str] = None,
model_uid: Optional[str] = None, model_uid: Optional[str] = None,
forecast_quantiles: Optional[list] = None,
) -> Pipeline: ) -> Pipeline:
""" """
:param experiment: The experiment used to run the pipeline. :param experiment: The experiment used to run the pipeline.
@@ -44,6 +45,7 @@ def get_backtest_pipeline(
:param step_size: The number of periods to step back in backtesting. :param step_size: The number of periods to step back in backtesting.
:param step_number: The number of backtesting iterations. :param step_number: The number of backtesting iterations.
:param model_uid: The uid to mark models from this run of the experiment. :param model_uid: The uid to mark models from this run of the experiment.
:param forecast_quantiles: The forecast quantiles that are required in the inference.
:return: The pipeline to be used for model retraining. :return: The pipeline to be used for model retraining.
**Note:** The output will be uploaded in the pipeline output **Note:** The output will be uploaded in the pipeline output
called 'score'. called 'score'.
@@ -135,6 +137,9 @@ def get_backtest_pipeline(
if model_uid is not None: if model_uid is not None:
prs_args.append("--model-uid") prs_args.append("--model-uid")
prs_args.append(model_uid) prs_args.append(model_uid)
if forecast_quantiles:
prs_args.append("--forecast_quantiles")
prs_args.extend(forecast_quantiles)
backtest_prs = ParallelRunStep( backtest_prs = ParallelRunStep(
name=parallel_step_name, name=parallel_step_name,
parallel_run_config=back_test_config, parallel_run_config=back_test_config,

View File

@@ -575,7 +575,32 @@
"outputs": [], "outputs": [],
"source": [ "source": [
"remote_run.download_file(\"outputs/predictions.csv\", \"predictions.csv\")\n", "remote_run.download_file(\"outputs/predictions.csv\", \"predictions.csv\")\n",
"df_all = pd.read_csv(\"predictions.csv\")" "fcst_df = pd.read_csv(\"predictions.csv\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Note that the rolling forecast can contain multiple predictions for each date, each from a different forecast origin. For example, consider 2012-09-05:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"fcst_df[fcst_df.date == \"2012-09-05\"]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Here, the forecast origin refers to the latest date of actuals available for a given forecast. The earliest origin in the rolling forecast, 2012-08-31, is the last day in the training data. For origin date 2012-09-01, the forecasts use actual recorded counts from the training data *and* the actual count recorded on 2012-09-01. Note that the model is not retrained for origin dates later than 2012-08-31, but the values for model features, such as lagged values of daily count, are updated.\n",
"\n",
"Let's calculate the metrics over all rolling forecasts:"
] ]
}, },
{ {
@@ -587,29 +612,17 @@
"from azureml.automl.core.shared import constants\n", "from azureml.automl.core.shared import constants\n",
"from azureml.automl.runtime.shared.score import scoring\n", "from azureml.automl.runtime.shared.score import scoring\n",
"from sklearn.metrics import mean_absolute_error, mean_squared_error\n", "from sklearn.metrics import mean_absolute_error, mean_squared_error\n",
"from matplotlib import pyplot as plt\n",
"\n", "\n",
"# use automl metrics module\n", "# use automl metrics module\n",
"scores = scoring.score_regression(\n", "scores = scoring.score_regression(\n",
" y_test=df_all[target_column_name],\n", " y_test=fcst_df[target_column_name],\n",
" y_pred=df_all[\"predicted\"],\n", " y_pred=fcst_df[\"predicted\"],\n",
" metrics=list(constants.Metric.SCALAR_REGRESSION_SET),\n", " metrics=list(constants.Metric.SCALAR_REGRESSION_SET),\n",
")\n", ")\n",
"\n", "\n",
"print(\"[Test data scores]\\n\")\n", "print(\"[Test data scores]\\n\")\n",
"for key, value in scores.items():\n", "for key, value in scores.items():\n",
" print(\"{}: {:.3f}\".format(key, value))\n", " print(\"{}: {:.3f}\".format(key, value))"
"\n",
"# Plot outputs\n",
"%matplotlib inline\n",
"test_pred = plt.scatter(df_all[target_column_name], df_all[\"predicted\"], color=\"b\")\n",
"test_test = plt.scatter(\n",
" df_all[target_column_name], df_all[target_column_name], color=\"g\"\n",
")\n",
"plt.legend(\n",
" (test_pred, test_test), (\"prediction\", \"truth\"), loc=\"upper left\", fontsize=8\n",
")\n",
"plt.show()"
] ]
}, },
{ {
@@ -618,36 +631,15 @@
"source": [ "source": [
"For more details on what metrics are included and how they are calculated, please refer to [supported metrics](https://docs.microsoft.com/en-us/azure/machine-learning/how-to-understand-automated-ml#regressionforecasting-metrics). You could also calculate residuals, like described [here](https://docs.microsoft.com/en-us/azure/machine-learning/how-to-understand-automated-ml#residuals).\n", "For more details on what metrics are included and how they are calculated, please refer to [supported metrics](https://docs.microsoft.com/en-us/azure/machine-learning/how-to-understand-automated-ml#regressionforecasting-metrics). You could also calculate residuals, like described [here](https://docs.microsoft.com/en-us/azure/machine-learning/how-to-understand-automated-ml#residuals).\n",
"\n", "\n",
"\n", "The rolling forecast metric values are very high in comparison to the validation metrics reported by the AutoML job. What's going on here? We will investigate in the following cells!"
"Since we did a rolling evaluation on the test set, we can analyze the predictions by their forecast horizon relative to the rolling origin. The model was initially trained at a forecast horizon of 14, so each prediction from the model is associated with a horizon value from 1 to 14. The horizon values are in a column named, \"horizon_origin,\" in the prediction set. For example, we can calculate some of the error metrics grouped by the horizon:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from metrics_helper import MAPE, APE\n",
"\n",
"df_all.groupby(\"horizon_origin\").apply(\n",
" lambda df: pd.Series(\n",
" {\n",
" \"MAPE\": MAPE(df[target_column_name], df[\"predicted\"]),\n",
" \"RMSE\": np.sqrt(\n",
" mean_squared_error(df[target_column_name], df[\"predicted\"])\n",
" ),\n",
" \"MAE\": mean_absolute_error(df[target_column_name], df[\"predicted\"]),\n",
" }\n",
" )\n",
")"
] ]
}, },
{ {
"cell_type": "markdown", "cell_type": "markdown",
"metadata": {}, "metadata": {},
"source": [ "source": [
"To drill down more, we can look at the distributions of APE (absolute percentage error) by horizon. From the chart, it is clear that the overall MAPE is being skewed by one particular point where the actual value is of small absolute value." "### Forecast versus actuals plot\n",
"We will plot predictions and actuals on a time series plot. Since there are many forecasts for each date, we select the 14-day-ahead forecast from each forecast origin for our comparison."
] ]
}, },
{ {
@@ -656,21 +648,55 @@
"metadata": {}, "metadata": {},
"outputs": [], "outputs": [],
"source": [ "source": [
"df_all_APE = df_all.assign(APE=APE(df_all[target_column_name], df_all[\"predicted\"]))\n", "from matplotlib import pyplot as plt\n",
"APEs = [\n",
" df_all_APE[df_all[\"horizon_origin\"] == h].APE.values\n",
" for h in range(1, forecast_horizon + 1)\n",
"]\n",
"\n", "\n",
"%matplotlib inline\n", "%matplotlib inline\n",
"plt.boxplot(APEs)\n",
"plt.yscale(\"log\")\n",
"plt.xlabel(\"horizon\")\n",
"plt.ylabel(\"APE (%)\")\n",
"plt.title(\"Absolute Percentage Errors by Forecast Horizon\")\n",
"\n", "\n",
"fcst_df_h14 = (\n",
" fcst_df.groupby(\"forecast_origin\", as_index=False)\n",
" .last()\n",
" .drop(columns=[\"forecast_origin\"])\n",
")\n",
"fcst_df_h14.set_index(time_column_name, inplace=True)\n",
"plt.plot(fcst_df_h14[[target_column_name, \"predicted\"]])\n",
"plt.xticks(rotation=45)\n",
"plt.title(f\"Predicted vs. Actuals\")\n",
"plt.legend([\"actual\", \"14-day-ahead forecast\"])\n",
"plt.show()" "plt.show()"
] ]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Looking at the plot, there are two clear issues:\n",
"1. An anomalously low count value on October 29th, 2012.\n",
"2. End-of-year holidays (Thanksgiving and Christmas) in late November and late December.\n",
"\n",
"What happened on Oct. 29th, 2012? That day, Hurricane Sandy brought severe storm surge flooding to the east coast of the United States, particularly around New York City. This is certainly an anomalous event that the model did not account for!\n",
"\n",
"As for the late year holidays, the model apparently did not learn to account for the full reduction of bike share rentals on these major holidays. The training data covers 2011 and early 2012, so the model fit only had access to a single occurrence of these holidays. This makes it challenging to resolve holiday effects; however, a larger AutoML model search may result in a better model that is more holiday-aware.\n",
"\n",
"If we filter the predictions prior to the Thanksgiving holiday and remove the anomalous day of 2012-10-29, the metrics are closer to validation levels:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"date_filter = (fcst_df.date != \"2012-10-29\") & (fcst_df.date < \"2012-11-22\")\n",
"scores = scoring.score_regression(\n",
" y_test=fcst_df[date_filter][target_column_name],\n",
" y_pred=fcst_df[date_filter][\"predicted\"],\n",
" metrics=list(constants.Metric.SCALAR_REGRESSION_SET),\n",
")\n",
"\n",
"print(\"[Test data scores (filtered)]\\n\")\n",
"for key, value in scores.items():\n",
" print(\"{}: {:.3f}\".format(key, value))"
]
} }
], ],
"metadata": { "metadata": {
@@ -711,7 +737,7 @@
"name": "python", "name": "python",
"nbconvert_exporter": "python", "nbconvert_exporter": "python",
"pygments_lexer": "ipython3", "pygments_lexer": "ipython3",
"version": "3.8.5" "version": "3.7.13"
}, },
"mimetype": "text/x-python", "mimetype": "text/x-python",
"name": "python", "name": "python",

View File

@@ -36,18 +36,18 @@ y_test_df = (
fitted_model = joblib.load("model.pkl") fitted_model = joblib.load("model.pkl")
y_pred, X_trans = fitted_model.rolling_evaluation(X_test_df, y_test_df.values) X_rf = fitted_model.rolling_forecast(X_test_df, y_test_df.values, step=1)
# Add predictions, actuals, and horizon relative to rolling origin to the test feature data # Add predictions, actuals, and horizon relative to rolling origin to the test feature data
assign_dict = { assign_dict = {
"horizon_origin": X_trans["horizon_origin"].values, fitted_model.forecast_origin_column_name: "forecast_origin",
"predicted": y_pred, fitted_model.forecast_column_name: "predicted",
target_column_name: y_test_df[target_column_name].values, fitted_model.actual_column_name: target_column_name,
} }
df_all = X_test_df.assign(**assign_dict) X_rf.rename(columns=assign_dict, inplace=True)
file_name = "outputs/predictions.csv" file_name = "outputs/predictions.csv"
export_csv = df_all.to_csv(file_name, header=True) export_csv = X_rf.to_csv(file_name, header=True)
# Upload the predictions into artifacts # Upload the predictions into artifacts
run.upload_file(name=file_name, path_or_stream=file_name) run.upload_file(name=file_name, path_or_stream=file_name)

View File

@@ -758,7 +758,15 @@
"metadata": {}, "metadata": {},
"source": [ "source": [
"## Forecasting farther than the forecast horizon <a id=\"recursive forecasting\"></a>\n", "## Forecasting farther than the forecast horizon <a id=\"recursive forecasting\"></a>\n",
"When the forecast destination, or the latest date in the prediction data frame, is farther into the future than the specified forecast horizon, the `forecast()` function will still make point predictions out to the later date using a recursive operation mode. Internally, the method recursively applies the regular forecaster to generate context so that we can forecast further into the future. \n", "When the forecast destination, or the latest date in the prediction data frame, is farther into the future than the specified forecast horizon, the forecaster must be iteratively applied. Here, we advance the forecast origin on each iteration over the prediction window, predicting `max_horizon` periods ahead on each iteration. There are two choices for the context data to use as the forecaster advances into the prediction window:\n",
"\n",
"1. We can use forecasted values from previous iterations (recursive forecast),\n",
"2. We can use known, actual values of the target if they are available (rolling forecast).\n",
"\n",
"The first method is useful in a true forecasting scenario when we do not yet know the actual target values while the second is useful in an evaluation scenario where we want to compute accuracy metrics for the `max_horizon`-period-ahead forecaster over a long test set. We refer to the first as a **recursive forecast** since we apply the forecaster recursively over the prediction window and the second as a **rolling forecast** since we roll forward over known actuals.\n",
"\n",
"### Recursive forecasting\n",
"By default, the `forecast()` function will make point predictions out to the later date using a recursive operation mode. Internally, the method recursively applies the regular forecaster to generate context so that we can forecast further into the future. \n",
"\n", "\n",
"To illustrate the use-case and operation of recursive forecasting, we'll consider an example with a single time-series where the forecasting period directly follows the training period and is twice as long as the forecasting horizon given at training time.\n", "To illustrate the use-case and operation of recursive forecasting, we'll consider an example with a single time-series where the forecasting period directly follows the training period and is twice as long as the forecasting horizon given at training time.\n",
"\n", "\n",
@@ -818,6 +826,35 @@
"np.array_equal(y_pred_all, y_pred_long)" "np.array_equal(y_pred_all, y_pred_long)"
] ]
}, },
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Rolling forecasts\n",
"A rolling forecast is a similar concept to the recursive forecasts described above except that we use known actual values of the target for our context data. We have provided a different, public method for this called `rolling_forecast`. In addition to test data and actuals (`X_test` and `y_test`), `rolling_forecast` also accepts an optional `step` parameter that controls how far the origin advances on each iteration. The recursive forecast mode uses a fixed step of `max_horizon` while `rolling_forecast` defaults to a step size of 1, but can be set to any integer from 1 to `max_horizon`, inclusive.\n",
"\n",
"Let's see what the rolling forecast looks like on the long test set with the step set to 1:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"X_rf = fitted_model.rolling_forecast(X_test_long, y_test_long, step=1)\n",
"X_rf.head(n=12)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Notice that `rolling_forecast` has returned a single DataFrame containing all results and has generated some new columns: `_automl_forecast_origin`, `_automl_forecast_y`, and `_automl_actual_y`. These are the origin date for each forecast, the forecasted value and the actual value, respectively. Note that \"y\" in the forecast and actual column names will generally be replaced by the target column name supplied to AutoML.\n",
"\n",
"The output above shows forecasts for two prediction windows, the first with origin at the end of the training set and the second including the first observation in the test set (2000-01-01 06:00:00). Since the forecast windows overlap, there are multiple forecasts for most dates which are associated with different origin dates."
]
},
{ {
"cell_type": "markdown", "cell_type": "markdown",
"metadata": {}, "metadata": {},
@@ -880,7 +917,7 @@
"name": "python", "name": "python",
"nbconvert_exporter": "python", "nbconvert_exporter": "python",
"pygments_lexer": "ipython3", "pygments_lexer": "ipython3",
"version": "3.8.5" "version": "3.7.13"
}, },
"tags": [ "tags": [
"Forecasting", "Forecasting",
@@ -894,5 +931,5 @@
} }
}, },
"nbformat": 4, "nbformat": 4,
"nbformat_minor": 2 "nbformat_minor": 4
} }

View File

@@ -325,7 +325,7 @@
"source": [ "source": [
"### Setting forecaster maximum horizon \n", "### Setting forecaster maximum horizon \n",
"\n", "\n",
"The forecast horizon is the number of periods into the future that the model should predict. Here, we set the horizon to 12 periods (i.e. 12 months). Notice that this is much shorter than the number of months in the test set; we will need to use a rolling test to evaluate the performance on the whole test set. For more discussion of forecast horizons and guiding principles for setting them, please see the [energy demand notebook](https://github.com/Azure/MachineLearningNotebooks/tree/master/how-to-use-azureml/automated-machine-learning/forecasting-energy-demand). " "The forecast horizon is the number of periods into the future that the model should predict. Here, we set the horizon to 14 periods (i.e. 14 days). Notice that this is much shorter than the number of months in the test set; we will need to use a rolling test to evaluate the performance on the whole test set. For more discussion of forecast horizons and guiding principles for setting them, please see the [energy demand notebook](https://github.com/Azure/MachineLearningNotebooks/tree/master/how-to-use-azureml/automated-machine-learning/forecasting-energy-demand). "
] ]
}, },
{ {
@@ -337,7 +337,7 @@
}, },
"outputs": [], "outputs": [],
"source": [ "source": [
"forecast_horizon = 12" "forecast_horizon = 14"
] ]
}, },
{ {
@@ -699,5 +699,5 @@
} }
}, },
"nbformat": 4, "nbformat": 4,
"nbformat_minor": 2 "nbformat_minor": 4
} }

View File

@@ -4,7 +4,6 @@ import os
import numpy as np import numpy as np
import pandas as pd import pandas as pd
from pandas.tseries.frequencies import to_offset
from sklearn.externals import joblib from sklearn.externals import joblib
from sklearn.metrics import mean_absolute_error, mean_squared_error from sklearn.metrics import mean_absolute_error, mean_squared_error
@@ -19,219 +18,8 @@ except ImportError:
_torch_present = False _torch_present = False
def align_outputs( def map_location_cuda(storage, loc):
y_predicted, return storage.cuda()
X_trans,
X_test,
y_test,
predicted_column_name="predicted",
horizon_colname="horizon_origin",
):
"""
Demonstrates how to get the output aligned to the inputs
using pandas indexes. Helps understand what happened if
the output's shape differs from the input shape, or if
the data got re-sorted by time and grain during forecasting.
Typical causes of misalignment are:
* we predicted some periods that were missing in actuals -> drop from eval
* model was asked to predict past max_horizon -> increase max horizon
* data at start of X_test was needed for lags -> provide previous periods
"""
if horizon_colname in X_trans:
df_fcst = pd.DataFrame(
{
predicted_column_name: y_predicted,
horizon_colname: X_trans[horizon_colname],
}
)
else:
df_fcst = pd.DataFrame({predicted_column_name: y_predicted})
# y and X outputs are aligned by forecast() function contract
df_fcst.index = X_trans.index
# align original X_test to y_test
X_test_full = X_test.copy()
X_test_full[target_column_name] = y_test
# X_test_full's index does not include origin, so reset for merge
df_fcst.reset_index(inplace=True)
X_test_full = X_test_full.reset_index().drop(columns="index")
together = df_fcst.merge(X_test_full, how="right")
# drop rows where prediction or actuals are nan
# happens because of missing actuals
# or at edges of time due to lags/rolling windows
clean = together[
together[[target_column_name, predicted_column_name]].notnull().all(axis=1)
]
return clean
def do_rolling_forecast_with_lookback(
fitted_model, X_test, y_test, max_horizon, X_lookback, y_lookback, freq="D"
):
"""
Produce forecasts on a rolling origin over the given test set.
Each iteration makes a forecast for the next 'max_horizon' periods
with respect to the current origin, then advances the origin by the
horizon time duration. The prediction context for each forecast is set so
that the forecaster uses the actual target values prior to the current
origin time for constructing lag features.
This function returns a concatenated DataFrame of rolling forecasts.
"""
print("Using lookback of size: ", y_lookback.size)
df_list = []
origin_time = X_test[time_column_name].min()
X = X_lookback.append(X_test)
y = np.concatenate((y_lookback, y_test), axis=0)
while origin_time <= X_test[time_column_name].max():
# Set the horizon time - end date of the forecast
horizon_time = origin_time + max_horizon * to_offset(freq)
# Extract test data from an expanding window up-to the horizon
expand_wind = X[time_column_name] < horizon_time
X_test_expand = X[expand_wind]
y_query_expand = np.zeros(len(X_test_expand)).astype(float)
y_query_expand.fill(np.NaN)
if origin_time != X[time_column_name].min():
# Set the context by including actuals up-to the origin time
test_context_expand_wind = X[time_column_name] < origin_time
context_expand_wind = X_test_expand[time_column_name] < origin_time
y_query_expand[context_expand_wind] = y[test_context_expand_wind]
# Print some debug info
print(
"Horizon_time:",
horizon_time,
" origin_time: ",
origin_time,
" max_horizon: ",
max_horizon,
" freq: ",
freq,
)
print("expand_wind: ", expand_wind)
print("y_query_expand")
print(y_query_expand)
print("X_test")
print(X)
print("X_test_expand")
print(X_test_expand)
print("Type of X_test_expand: ", type(X_test_expand))
print("Type of y_query_expand: ", type(y_query_expand))
print("y_query_expand")
print(y_query_expand)
# Make a forecast out to the maximum horizon
# y_fcst, X_trans = y_query_expand, X_test_expand
y_fcst, X_trans = fitted_model.forecast(X_test_expand, y_query_expand)
print("y_fcst")
print(y_fcst)
# Align forecast with test set for dates within
# the current rolling window
trans_tindex = X_trans.index.get_level_values(time_column_name)
trans_roll_wind = (trans_tindex >= origin_time) & (trans_tindex < horizon_time)
test_roll_wind = expand_wind & (X[time_column_name] >= origin_time)
df_list.append(
align_outputs(
y_fcst[trans_roll_wind],
X_trans[trans_roll_wind],
X[test_roll_wind],
y[test_roll_wind],
)
)
# Advance the origin time
origin_time = horizon_time
return pd.concat(df_list, ignore_index=True)
def do_rolling_forecast(fitted_model, X_test, y_test, max_horizon, freq="D"):
"""
Produce forecasts on a rolling origin over the given test set.
Each iteration makes a forecast for the next 'max_horizon' periods
with respect to the current origin, then advances the origin by the
horizon time duration. The prediction context for each forecast is set so
that the forecaster uses the actual target values prior to the current
origin time for constructing lag features.
This function returns a concatenated DataFrame of rolling forecasts.
"""
df_list = []
origin_time = X_test[time_column_name].min()
while origin_time <= X_test[time_column_name].max():
# Set the horizon time - end date of the forecast
horizon_time = origin_time + max_horizon * to_offset(freq)
# Extract test data from an expanding window up-to the horizon
expand_wind = X_test[time_column_name] < horizon_time
X_test_expand = X_test[expand_wind]
y_query_expand = np.zeros(len(X_test_expand)).astype(float)
y_query_expand.fill(np.NaN)
if origin_time != X_test[time_column_name].min():
# Set the context by including actuals up-to the origin time
test_context_expand_wind = X_test[time_column_name] < origin_time
context_expand_wind = X_test_expand[time_column_name] < origin_time
y_query_expand[context_expand_wind] = y_test[test_context_expand_wind]
# Print some debug info
print(
"Horizon_time:",
horizon_time,
" origin_time: ",
origin_time,
" max_horizon: ",
max_horizon,
" freq: ",
freq,
)
print("expand_wind: ", expand_wind)
print("y_query_expand")
print(y_query_expand)
print("X_test")
print(X_test)
print("X_test_expand")
print(X_test_expand)
print("Type of X_test_expand: ", type(X_test_expand))
print("Type of y_query_expand: ", type(y_query_expand))
print("y_query_expand")
print(y_query_expand)
# Make a forecast out to the maximum horizon
y_fcst, X_trans = fitted_model.forecast(X_test_expand, y_query_expand)
print("y_fcst")
print(y_fcst)
# Align forecast with test set for dates within the
# current rolling window
trans_tindex = X_trans.index.get_level_values(time_column_name)
trans_roll_wind = (trans_tindex >= origin_time) & (trans_tindex < horizon_time)
test_roll_wind = expand_wind & (X_test[time_column_name] >= origin_time)
df_list.append(
align_outputs(
y_fcst[trans_roll_wind],
X_trans[trans_roll_wind],
X_test[test_roll_wind],
y_test[test_roll_wind],
)
)
# Advance the origin time
origin_time = horizon_time
return pd.concat(df_list, ignore_index=True)
def APE(actual, pred): def APE(actual, pred):
@@ -254,10 +42,6 @@ def MAPE(actual, pred):
return np.mean(APE(actual_safe, pred_safe)) return np.mean(APE(actual_safe, pred_safe))
def map_location_cuda(storage, loc):
return storage.cuda()
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument( parser.add_argument(
"--max_horizon", "--max_horizon",
@@ -303,7 +87,6 @@ print(model_path)
run = Run.get_context() run = Run.get_context()
# get input dataset by name # get input dataset by name
test_dataset = run.input_datasets["test_data"] test_dataset = run.input_datasets["test_data"]
lookback_dataset = run.input_datasets["lookback_data"]
grain_column_names = [] grain_column_names = []
@@ -312,15 +95,8 @@ df = test_dataset.to_pandas_dataframe()
print("Read df") print("Read df")
print(df) print(df)
X_test_df = test_dataset.drop_columns(columns=[target_column_name]) X_test_df = df
y_test_df = test_dataset.with_timestamp_columns(None).keep_columns( y_test = df.pop(target_column_name).to_numpy()
columns=[target_column_name]
)
X_lookback_df = lookback_dataset.drop_columns(columns=[target_column_name])
y_lookback_df = lookback_dataset.with_timestamp_columns(None).keep_columns(
columns=[target_column_name]
)
_, ext = os.path.splitext(model_path) _, ext = os.path.splitext(model_path)
if ext == ".pt": if ext == ".pt":
@@ -336,37 +112,20 @@ else:
# Load the sklearn pipeline. # Load the sklearn pipeline.
fitted_model = joblib.load(model_path) fitted_model = joblib.load(model_path)
if hasattr(fitted_model, "get_lookback"): X_rf = fitted_model.rolling_forecast(X_test_df, y_test, step=1)
lookback = fitted_model.get_lookback() assign_dict = {
df_all = do_rolling_forecast_with_lookback( fitted_model.forecast_origin_column_name: "forecast_origin",
fitted_model, fitted_model.forecast_column_name: "predicted",
X_test_df.to_pandas_dataframe(), fitted_model.actual_column_name: target_column_name,
y_test_df.to_pandas_dataframe().values.T[0], }
max_horizon, X_rf.rename(columns=assign_dict, inplace=True)
X_lookback_df.to_pandas_dataframe()[-lookback:],
y_lookback_df.to_pandas_dataframe().values.T[0][-lookback:],
freq,
)
else:
df_all = do_rolling_forecast(
fitted_model,
X_test_df.to_pandas_dataframe(),
y_test_df.to_pandas_dataframe().values.T[0],
max_horizon,
freq,
)
print(df_all) print(X_rf.head())
print("target values:::")
print(df_all[target_column_name])
print("predicted values:::")
print(df_all["predicted"])
# Use the AutoML scoring module # Use the AutoML scoring module
regression_metrics = list(constants.REGRESSION_SCALAR_SET) regression_metrics = list(constants.REGRESSION_SCALAR_SET)
y_test = np.array(df_all[target_column_name]) y_test = np.array(X_rf[target_column_name])
y_pred = np.array(df_all["predicted"]) y_pred = np.array(X_rf["predicted"])
scores = scoring.score_regression(y_test, y_pred, regression_metrics) scores = scoring.score_regression(y_test, y_pred, regression_metrics)
print("scores:") print("scores:")
@@ -376,11 +135,11 @@ for key, value in scores.items():
run.log(key, value) run.log(key, value)
print("Simple forecasting model") print("Simple forecasting model")
rmse = np.sqrt(mean_squared_error(df_all[target_column_name], df_all["predicted"])) rmse = np.sqrt(mean_squared_error(X_rf[target_column_name], X_rf["predicted"]))
print("[Test Data] \nRoot Mean squared error: %.2f" % rmse) print("[Test Data] \nRoot Mean squared error: %.2f" % rmse)
mae = mean_absolute_error(df_all[target_column_name], df_all["predicted"]) mae = mean_absolute_error(X_rf[target_column_name], X_rf["predicted"])
print("mean_absolute_error score: %.2f" % mae) print("mean_absolute_error score: %.2f" % mae)
print("MAPE: %.2f" % MAPE(df_all[target_column_name], df_all["predicted"])) print("MAPE: %.2f" % MAPE(X_rf[target_column_name], X_rf["predicted"]))
run.log("rmse", rmse) run.log("rmse", rmse)
run.log("mae", mae) run.log("mae", mae)