Files
MachineLearningNotebooks/how-to-use-azureml/automated-machine-learning/forecasting-hierarchical-timeseries/auto-ml-forecasting-hierarchical-timeseries.ipynb

648 lines
26 KiB
Plaintext

{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Copyright (c) Microsoft Corporation. All rights reserved.\n",
"\n",
"Licensed under the MIT License."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/automated-machine-learning/forecasting-hierarchical-timeseries/auto-ml-forecasting-hierarchical-timeseries.png)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Hierarchical Time Series - Automated ML\n",
"**_Generate hierarchical time series forecasts with Automated Machine Learning_**\n",
"\n",
"---"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"For this notebook we are using a synthetic dataset portraying sales data to predict the the quantity of a vartiety of product skus across several states, stores, and product categories.\n",
"\n",
"**NOTE: There are limits on how many runs we can do in parallel per workspace, and we currently recommend to set the parallelism to maximum of 320 runs per experiment per workspace. If users want to have more parallelism and increase this limit they might encounter Too Many Requests errors (HTTP 429).**"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Prerequisites\n",
"You'll need to create a compute Instance by following the instructions in the [EnvironmentSetup.md](../Setup_Resources/EnvironmentSetup.md)."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 1.0 Set up workspace, datastore, experiment"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1613003526897
}
},
"outputs": [],
"source": [
"import azureml.core\n",
"from azureml.core import Workspace, Datastore\n",
"import pandas as pd\n",
"\n",
"# Set up your workspace\n",
"ws = Workspace.from_config()\n",
"ws.get_details()\n",
"\n",
"# Set up your datastores\n",
"dstore = ws.get_default_datastore()\n",
"\n",
"output = {}\n",
"output['SDK version'] = azureml.core.VERSION\n",
"output['Subscription ID'] = ws.subscription_id\n",
"output['Workspace'] = ws.name\n",
"output['Resource Group'] = ws.resource_group\n",
"output['Location'] = ws.location\n",
"output['Default datastore name'] = dstore.name\n",
"pd.set_option('display.max_colwidth', -1)\n",
"outputDf = pd.DataFrame(data = output, index = [''])\n",
"outputDf.T"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Choose an experiment"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1613003540729
}
},
"outputs": [],
"source": [
"from azureml.core import Experiment\n",
"\n",
"experiment = Experiment(ws, 'automl-hts')\n",
"\n",
"print('Experiment name: ' + experiment.name)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 2.0 Data\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"### Upload local csv files to datastore\n",
"You can upload your train and inference csv files to the default datastore in your workspace. \n",
"\n",
"A Datastore is a place where data can be stored that is then made accessible to a compute either by means of mounting or copying the data to the compute target.\n",
"Please refer to [Datastore](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.datastore.datastore?view=azure-ml-py) documentation on how to access data from Datastore."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"datastore_path = \"hts-sample\""
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"datastore = ws.get_default_datastore()\n",
"datastore"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1613005886349
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"datastore.upload(src_dir='./Data/', target_path=datastore_path, overwrite=True, show_progress=True) "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Create the TabularDatasets \n",
"\n",
"Datasets in Azure Machine Learning are references to specific data in a Datastore. The data can be retrieved as a [TabularDatasets](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.tabulardataset?view=azure-ml-py)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1613007017296
}
},
"outputs": [],
"source": [
"from azureml.core.dataset import Dataset\n",
"train_ds = Dataset.Tabular.from_delimited_files(path=datastore.path(\"hts-sample/hts-sample-train.csv\"), validate=False) \n",
"inference_ds = Dataset.Tabular.from_delimited_files(path=datastore.path(\"hts-sample/hts-sample-test.csv\"), validate=False)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Register the TabularDatasets to the Workspace \n",
"Finally, register the dataset to your Workspace so it can be called as an input into the training pipeline in the next notebook. We will use the inference dataset as part of the forecasting pipeline. The step need only be completed once."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"registered_train = train_ds.register(ws, \"hts-sales-train\")\n",
"registered_inference = inference_ds.register(ws, \"hts-sales-test\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 3.0 Build the training pipeline\n",
"Now that the dataset, WorkSpace, and datastore are set up, we can put together a pipeline for training.\n",
"\n",
"> Note that if you have an AzureML Data Scientist role, you will not have permission to create compute resources. Talk to your workspace or IT admin to create the compute targets described in this section, if they do not already exist."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Choose a compute target\n",
"\n",
"You will need to create a [compute target](https://docs.microsoft.com/en-us/azure/machine-learning/how-to-set-up-training-targets#amlcompute) for your AutoML run. In this tutorial, you create AmlCompute as your training compute resource.\n",
"\n",
"\\*\\*Creation of AmlCompute takes approximately 5 minutes.**\n",
"\n",
"If the AmlCompute with that name is already in your workspace this code will skip the creation process. As with other Azure services, there are limits on certain resources (e.g. AmlCompute) associated with the Azure Machine Learning service. Please read this [article](https://docs.microsoft.com/en-us/azure/machine-learning/how-to-manage-quotas) on the default limits and how to request more quota."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1613007037308
}
},
"outputs": [],
"source": [
"from azureml.core.compute import ComputeTarget, AmlCompute\n",
"\n",
"# Name your cluster\n",
"compute_name = \"hts-compute\"\n",
"\n",
"\n",
"if compute_name in ws.compute_targets:\n",
" compute_target = ws.compute_targets[compute_name]\n",
" if compute_target and type(compute_target) is AmlCompute:\n",
" print('Found compute target: ' + compute_name)\n",
"else:\n",
" print('Creating a new compute target...')\n",
" provisioning_config = AmlCompute.provisioning_configuration(vm_size= \"STANDARD_D16S_V3\",\n",
" max_nodes=20)\n",
" # Create the compute target\n",
" compute_target = ComputeTarget.create(\n",
" ws, compute_name, provisioning_config)\n",
"\n",
" # Can poll for a minimum number of nodes and for a specific timeout.\n",
" # If no min node count is provided it will use the scale settings for the cluster\n",
" compute_target.wait_for_completion(\n",
" show_output=True, min_node_count=None, timeout_in_minutes=20)\n",
"\n",
" # For a more detailed view of current cluster status, use the 'status' property\n",
" print(compute_target.status.serialize())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Set up training parameters\n",
"\n",
"This dictionary defines the AutoML and hierarchy settings. For this forecasting task we need to define several settings inncluding the name of the time column, the maximum forecast horizon, the hierarchy definition, and the level of the hierarchy at which to train.\n",
"\n",
"| Property | Description|\n",
"| :--------------- | :------------------- |\n",
"| **task** | forecasting |\n",
"| **primary_metric** | This is the metric that you want to optimize.<br> Forecasting supports the following primary metrics <br><i>spearman_correlation</i><br><i>normalized_root_mean_squared_error</i><br><i>r2_score</i><br><i>normalized_mean_absolute_error</i> |\n",
"| **blocked_models** | Blocked models won't be used by AutoML. |\n",
"| **iteration_timeout_minutes** | Maximum amount of time in minutes that the model can train. This is optional but provides customers with greater control on exit criteria. |\n",
"| **iterations** | Number of models to train. This is optional but provides customers with greater control on exit criteria. |\n",
"| **experiment_timeout_hours** | Maximum amount of time in hours that the experiment can take before it terminates. This is optional but provides customers with greater control on exit criteria. |\n",
"| **label_column_name** | The name of the label column. |\n",
"| **forecast_horizon** | The forecast horizon is how many periods forward you would like to forecast. This integer horizon is in units of the timeseries frequency (e.g. daily, weekly). Periods are inferred from your data. |\n",
"| **n_cross_validations** | Number of cross validation splits. Rolling Origin Validation is used to split time-series in a temporally consistent way. |\n",
"| **enable_early_stopping** | Flag to enable early termination if the score is not improving in the short term. |\n",
"| **time_column_name** | The name of your time column. |\n",
"| **hierarchy_column_names** | The names of columns that define the hierarchical structure of the data from highest level to most granular. |\n",
"| **training_level** | The level of the hierarchy to be used for training models. |\n",
"| **enable_engineered_explanations** | Engineered feature explanations will be downloaded if enable_engineered_explanations flag is set to True. By default it is set to False to save storage space. |\n",
"| **time_series_id_column_name** | The column names used to uniquely identify timeseries in data that has multiple rows with the same timestamp. |\n",
"| **track_child_runs** | Flag to disable tracking of child runs. Only best run is tracked if the flag is set to False (this includes the model and metrics of the run). |\n",
"| **pipeline_fetch_max_batch_size** | Determines how many pipelines (training algorithms) to fetch at a time for training, this helps reduce throttling when training at large scale. |\n",
"| **model_explainability** | Flag to disable explaining the best automated ML model at the end of all training iterations. The default is True and will block non-explainable models which may impact the forecast accuracy. For more information, see [Interpretability: model explanations in automated machine learning](https://docs.microsoft.com/en-us/azure/machine-learning/how-to-machine-learning-interpretability-automl). |"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1613007061544
}
},
"outputs": [],
"source": [
"from azureml.train.automl.runtime._hts.hts_parameters import HTSTrainParameters\n",
"\n",
"model_explainability = True\n",
"\n",
"engineered_explanations = False\n",
"# Define your hierarchy. Adjust the settings below based on your dataset.\n",
"hierarchy = [\"state\", \"store_id\", \"product_category\", \"SKU\"]\n",
"training_level = \"SKU\"\n",
"\n",
"# Set your forecast parameters. Adjust the settings below based on your dataset.\n",
"time_column_name = \"date\"\n",
"label_column_name = \"quantity\"\n",
"forecast_horizon = 7\n",
"\n",
"\n",
"automl_settings = {\n",
" \"task\" : \"forecasting\",\n",
" \"primary_metric\" : \"normalized_root_mean_squared_error\",\n",
" \"label_column_name\": label_column_name,\n",
" \"time_column_name\": time_column_name,\n",
" \"forecast_horizon\": forecast_horizon,\n",
" \"hierarchy_column_names\": hierarchy,\n",
" \"hierarchy_training_level\": training_level,\n",
" \"track_child_runs\": False,\n",
" \"pipeline_fetch_max_batch_size\": 15,\n",
" \"model_explainability\": model_explainability,\n",
" # The following settings are specific to this sample and should be adjusted according to your own needs.\n",
" \"iteration_timeout_minutes\" : 10,\n",
" \"iterations\" : 10,\n",
" \"n_cross_validations\": 2\n",
"}\n",
"\n",
"hts_parameters = HTSTrainParameters(\n",
" automl_settings=automl_settings,\n",
" hierarchy_column_names=hierarchy,\n",
" training_level=training_level,\n",
" enable_engineered_explanations=engineered_explanations\n",
")\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Set up hierarchy training pipeline"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Parallel run step is leveraged to train the hierarchy. To configure the ParallelRunConfig you will need to determine the appropriate number of workers and nodes for your use case. The `process_count_per_node` is based off the number of cores of the compute VM. The node_count will determine the number of master nodes to use, increasing the node count will speed up the training process.\n",
"\n",
"* **experiment:** The experiment used for training.\n",
"* **train_data:** The tabular dataset to be used as input to the training run.\n",
"* **node_count:** The number of compute nodes to be used for running the user script. We recommend to start with 3 and increase the node_count if the training time is taking too long.\n",
"* **process_count_per_node:** Process count per node, we recommend 2:1 ratio for number of cores: number of processes per node. eg. If node has 16 cores then configure 8 or less process count per node or optimal performance.\n",
"* **train_pipeline_parameters:** The set of configuration parameters defined in the previous section. \n",
"\n",
"Calling this method will create a new aggregated dataset which is generated dynamically on pipeline execution."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.contrib.automl.pipeline.steps import AutoMLPipelineBuilder\n",
"\n",
"\n",
"training_pipeline_steps = AutoMLPipelineBuilder.get_many_models_train_steps(\n",
" experiment=experiment,\n",
" train_data=registered_train,\n",
" compute_target=compute_target,\n",
" node_count=2,\n",
" process_count_per_node=8,\n",
" train_pipeline_parameters=hts_parameters,\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.pipeline.core import Pipeline\n",
"\n",
"training_pipeline = Pipeline(ws, steps=training_pipeline_steps)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Submit the pipeline to run\n",
"Next we submit our pipeline to run. The whole training pipeline takes about 1h 11m using a Standard_D12_V2 VM with our current ParallelRunConfig setting."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"training_run = experiment.submit(training_pipeline)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"training_run.wait_for_completion(show_output=False)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Check the run status, if training_run is in completed state, continue to forecasting. If training_run is in another state, check the portal for failures."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### [Optional] Get the explanations\n",
"First we need to download the explanations to the local disk."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"if model_explainability:\n",
" expl_output = training_run.get_pipeline_output(\"explanations\")\n",
" expl_output.download(\"training_explanations\")\n",
"else:\n",
" print(\"Model explanations are available only if model_explainability is set to True.\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The explanations are downloaded to the \"training_explanations/azureml\" directory."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"\n",
"if model_explainability:\n",
" explanations_dirrectory = os.listdir(os.path.join('training_explanations', 'azureml'))\n",
" if len(explanations_dirrectory) > 1:\n",
" print(\"Warning! The directory contains multiple explanations, only the first one will be displayed.\")\n",
" print('The explanations are located at {}.'.format(explanations_dirrectory[0]))\n",
" # Now we will list all the explanations.\n",
" explanation_path = os.path.join('training_explanations', 'azureml', explanations_dirrectory[0], 'training_explanations')\n",
" print(\"Available explanations\")\n",
" print(\"==============================\")\n",
" print(\"\\n\".join(os.listdir(explanation_path)))\n",
"else:\n",
" print(\"Model explanations are available only if model_explainability is set to True.\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"View the explanations on \"state\" level."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from IPython.display import display\n",
"\n",
"explanation_type = 'raw'\n",
"level = 'state'\n",
"\n",
"if model_explainability:\n",
" display(pd.read_csv(os.path.join(explanation_path, \"{}_explanations_{}.csv\").format(explanation_type, level)))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 5.0 Forecasting\n",
"For hierarchical forecasting we need to provide the HTSInferenceParameters object.\n",
"#### HTSInferenceParameters arguments\n",
"* **hierarchy_forecast_level:** The default level of the hierarchy to produce prediction/forecast on.\n",
"* **allocation_method:** \\[Optional] The disaggregation method to use if the hierarchy forecast level specified is below the define hierarchy training level. <br><i>(average historical proportions) 'average_historical_proportions'</i><br><i>(proportions of the historical averages) 'proportions_of_historical_average'</i>\n",
"\n",
"#### get_many_models_batch_inference_steps arguments\n",
"* **experiment:** The experiment used for inference run.\n",
"* **inference_data:** The data to use for inferencing. It should be the same schema as used for training.\n",
"* **compute_target:** The compute target that runs the inference pipeline.\n",
"* **node_count:** The number of compute nodes to be used for running the user script. We recommend to start with the number of cores per node (varies by compute sku).\n",
"* **process_count_per_node:** The number of processes per node.\n",
"* **train_run_id:** \\[Optional] The run id of the hierarchy training, by default it is the latest successful training hts run in the experiment.\n",
"* **train_experiment_name:** \\[Optional] The train experiment that contains the train pipeline. This one is only needed when the train pipeline is not in the same experiement as the inference pipeline.\n",
"* **process_count_per_node:** \\[Optional] The number of processes per node, by default it's 4."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.train.automl.runtime._hts.hts_parameters import HTSInferenceParameters\n",
"\n",
"inference_parameters = HTSInferenceParameters(\n",
" hierarchy_forecast_level=\"store_id\", # The setting is specific to this dataset and should be changed based on your dataset.\n",
" allocation_method=\"proportions_of_historical_average\"\n",
")\n",
"\n",
"steps = AutoMLPipelineBuilder.get_many_models_batch_inference_steps(\n",
" experiment=experiment,\n",
" inference_data=registered_inference,\n",
" compute_target=compute_target,\n",
" inference_pipeline_parameters=inference_parameters,\n",
" node_count=2,\n",
" process_count_per_node=8\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.pipeline.core import Pipeline\n",
"\n",
"inference_pipeline = Pipeline(ws, steps=steps)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"inference_run = experiment.submit(inference_pipeline)\n",
"inference_run.wait_for_completion(show_output=False)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Retrieve results\n",
"\n",
"Forecast results can be retrieved through the following code. The prediction results summary and the actual predictions are downloaded the \"forecast_results\" folder"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"forecasts = inference_run.get_pipeline_output(\"forecasts\")\n",
"forecasts.download(\"forecast_results\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Resbumit the Pipeline\n",
"\n",
"The inference pipeline can be submitted with different configurations."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"inference_run = experiment.submit(inference_pipeline, pipeline_parameters={\"hierarchy_forecast_level\": \"state\"})\n",
"inference_run.wait_for_completion(show_output=False)"
]
}
],
"metadata": {
"authors": [
{
"name": "jialiu"
}
],
"categories": [
"how-to-use-azureml",
"automated-machine-learning"
],
"kernelspec": {
"display_name": "Python 3.6",
"language": "python",
"name": "python36"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.8"
}
},
"nbformat": 4,
"nbformat_minor": 4
}