Files
MachineLearningNotebooks/how-to-use-azureml/automated-machine-learning/forecasting-many-models/auto-ml-forecasting-many-models.ipynb

898 lines
46 KiB
Plaintext

{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Copyright (c) Microsoft Corporation. All rights reserved.\n",
"\n",
"Licensed under the MIT License."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/automated-machine-learning/forecasting-hierarchical-timeseries/auto-ml-forecasting-hierarchical-timeseries.png)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<font color=\"red\" size=\"5\"><strong>!Important!</strong> </br>This notebook is outdated and is not supported by the AutoML Team. Please use the supported version ([link](https://github.com/Azure/azureml-examples/tree/main/sdk/python/jobs/pipelines/1k_demand_forecasting_with_pipeline_components/automl-forecasting-demand-many-models-in-pipeline)).</font>"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Many Models - Automated ML\n",
"**_Generate many models time series forecasts with Automated Machine Learning_**\n",
"\n",
"---"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"For this notebook we are using a synthetic dataset portraying sales data to predict the the quantity of a vartiety of product skus across several states, stores, and product categories.\n",
"\n",
"**NOTE: There are limits on how many runs we can do in parallel per workspace, and we currently recommend to set the parallelism to maximum of 320 runs per experiment per workspace. If users want to have more parallelism and increase this limit they might encounter Too Many Requests errors (HTTP 429).**"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Prerequisites\n",
"You'll need to create a compute Instance by following [these](https://learn.microsoft.com/en-us/azure/machine-learning/v1/how-to-create-manage-compute-instance?tabs=python) instructions."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 1.0 Set up workspace, datastore, experiment"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1613003526897
}
},
"outputs": [],
"source": [
"import azureml.core\n",
"from azureml.core import Workspace, Datastore\n",
"import pandas as pd\n",
"\n",
"# Set up your workspace\n",
"ws = Workspace.from_config()\n",
"ws.get_details()\n",
"\n",
"# Set up your datastores\n",
"dstore = ws.get_default_datastore()\n",
"\n",
"output = {}\n",
"output[\"SDK version\"] = azureml.core.VERSION\n",
"output[\"Subscription ID\"] = ws.subscription_id\n",
"output[\"Workspace\"] = ws.name\n",
"output[\"Resource Group\"] = ws.resource_group\n",
"output[\"Location\"] = ws.location\n",
"output[\"Default datastore name\"] = dstore.name\n",
"output[\"SDK Version\"] = azureml.core.VERSION\n",
"pd.set_option(\"display.max_colwidth\", None)\n",
"outputDf = pd.DataFrame(data=output, index=[\"\"])\n",
"outputDf.T"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Choose an experiment"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1613003540729
}
},
"outputs": [],
"source": [
"from azureml.core import Experiment\n",
"\n",
"experiment = Experiment(ws, \"automl-many-models\")\n",
"\n",
"print(\"Experiment name: \" + experiment.name)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 2.0 Data\n",
"\n",
"This notebook uses simulated orange juice sales data to walk you through the process of training many models on Azure Machine Learning using Automated ML. \n",
"\n",
"The time series data used in this example was simulated based on the University of Chicago's Dominick's Finer Foods dataset which featured two years of sales of 3 different orange juice brands for individual stores. The full simulated dataset includes 3,991 stores with 3 orange juice brands each thus allowing 11,973 models to be trained to showcase the power of the many models pattern.\n",
"\n",
" \n",
"In this notebook, two datasets will be created: one with all 11,973 files and one with only 10 files that can be used to quickly test and debug. For each dataset, you'll be walked through the process of:\n",
"\n",
"1. Registering the blob container as a Datastore to the Workspace\n",
"2. Registering a tabular dataset to the Workspace"
]
},
{
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"### 2.1 Data Preparation\n",
"The OJ data is available in the public blob container. The data is split to be used for training and for inferencing. For the current dataset, the data was split on time column ('WeekStarting') before and after '1992-5-28' .\n",
"\n",
"The container has\n",
"<ol>\n",
" <li><b>'oj-data-tabular'</b> and <b>'oj-inference-tabular'</b> folders that contains training and inference data respectively for the 11,973 models. </li>\n",
" <li>It also has <b>'oj-data-small-tabular'</b> and <b>'oj-inference-small-tabular'</b> folders that has training and inference data for 10 models.</li>\n",
"</ol>\n",
"\n",
"To create the [TabularDataset](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.tabular_dataset.tabulardataset?view=azure-ml-py) needed for the ParallelRunStep, you first need to register the blob container to the workspace."
]
},
{
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"<b> To use your own data, put your own data in a blobstore folder. As shown it can be one file or multiple files. We can then register datastore using that blob as shown below.\n",
" \n",
"<h3> How sample data in blob store looks like</h3>\n",
"\n",
"['oj-data-tabular'](https://ms.portal.azure.com/#blade/Microsoft_Azure_Storage/ContainerMenuBlade/overview/storageAccountId/%2Fsubscriptions%2F102a16c3-37d3-48a8-9237-4c9b1e8e80e0%2FresourceGroups%2FAutoMLSampleNotebooksData%2Fproviders%2FMicrosoft.Storage%2FstorageAccounts%2Fautomlsamplenotebookdata/path/automl-sample-notebook-data/etag/%220x8D84EAA65DE50B7%22/defaultEncryptionScope/%24account-encryption-key/denyEncryptionScopeOverride//defaultId//publicAccessVal/Container)</b>\n",
"![image-4.png](mm-1.png)\n",
"\n",
"['oj-inference-tabular'](https://ms.portal.azure.com/#blade/Microsoft_Azure_Storage/ContainerMenuBlade/overview/storageAccountId/%2Fsubscriptions%2F102a16c3-37d3-48a8-9237-4c9b1e8e80e0%2FresourceGroups%2FAutoMLSampleNotebooksData%2Fproviders%2FMicrosoft.Storage%2FstorageAccounts%2Fautomlsamplenotebookdata/path/automl-sample-notebook-data/etag/%220x8D84EAA65DE50B7%22/defaultEncryptionScope/%24account-encryption-key/denyEncryptionScopeOverride//defaultId//publicAccessVal/Container)\n",
"![image-3.png](mm-2.png)\n",
"\n",
"['oj-data-small-tabular'](https://ms.portal.azure.com/#blade/Microsoft_Azure_Storage/ContainerMenuBlade/overview/storageAccountId/%2Fsubscriptions%2F102a16c3-37d3-48a8-9237-4c9b1e8e80e0%2FresourceGroups%2FAutoMLSampleNotebooksData%2Fproviders%2FMicrosoft.Storage%2FstorageAccounts%2Fautomlsamplenotebookdata/path/automl-sample-notebook-data/etag/%220x8D84EAA65DE50B7%22/defaultEncryptionScope/%24account-encryption-key/denyEncryptionScopeOverride//defaultId//publicAccessVal/Container)\n",
"\n",
"![image-5.png](mm-3.png)\n",
"\n",
"['oj-inference-small-tabular'](https://ms.portal.azure.com/#blade/Microsoft_Azure_Storage/ContainerMenuBlade/overview/storageAccountId/%2Fsubscriptions%2F102a16c3-37d3-48a8-9237-4c9b1e8e80e0%2FresourceGroups%2FAutoMLSampleNotebooksData%2Fproviders%2FMicrosoft.Storage%2FstorageAccounts%2Fautomlsamplenotebookdata/path/automl-sample-notebook-data/etag/%220x8D84EAA65DE50B7%22/defaultEncryptionScope/%24account-encryption-key/denyEncryptionScopeOverride//defaultId//publicAccessVal/Container)\n",
"![image-6.png](mm-4.png)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### 2.2 Register the blob container as DataStore\n",
"\n",
"A Datastore is a place where data can be stored that is then made accessible to a compute either by means of mounting or copying the data to the compute target.\n",
"\n",
"Please refer to [Datastore](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.datastore(class)?view=azure-ml-py) documentation on how to access data from Datastore.\n",
"\n",
"In this next step, we will be registering blob storage as datastore to the Workspace."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.core import Datastore\n",
"\n",
"# Please change the following to point to your own blob container and pass in account_key\n",
"blob_datastore_name = \"automl_many_models\"\n",
"container_name = \"automl-sample-notebook-data\"\n",
"account_name = \"automlsamplenotebookdata\"\n",
"\n",
"oj_datastore = Datastore.register_azure_blob_container(\n",
" workspace=ws,\n",
" datastore_name=blob_datastore_name,\n",
" container_name=container_name,\n",
" account_name=account_name,\n",
" create_if_not_exists=True,\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### 2.3 Using tabular datasets \n",
"\n",
"Now that the datastore is available from the Workspace, [TabularDataset](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.tabular_dataset.tabulardataset?view=azure-ml-py) can be created. Datasets in Azure Machine Learning are references to specific data in a Datastore. We are using TabularDataset, so that users who have their data which can be in one or many files (*.parquet or *.csv) and have not split up data according to group columns needed for training, can do so using out of box support for 'partiion_by' feature of TabularDataset shown in section 5.0 below."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1613007017296
}
},
"outputs": [],
"source": [
"from azureml.core import Dataset\n",
"\n",
"ds_name_small = \"oj-data-small-tabular\"\n",
"input_ds_small = Dataset.Tabular.from_delimited_files(\n",
" path=oj_datastore.path(ds_name_small + \"/\"), validate=False\n",
")\n",
"\n",
"inference_name_small = \"oj-inference-small-tabular\"\n",
"inference_ds_small = Dataset.Tabular.from_delimited_files(\n",
" path=oj_datastore.path(inference_name_small + \"/\"), validate=False\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### 2.4 Configure data with ``OutputFileDatasetConfig`` objects\n",
"This step shows how to configure output data from a pipeline step. One of the use cases for this step is when you want to do some preprocessing before feeding the data to training step. Intermediate data (or output of a step) is represented by an ``OutputFileDatasetConfig`` object. ``output_data`` is produced as the output of a step. Optionally, this data can be registered as a dataset by calling the ``register_on_complete`` method. If you create an ``OutputFileDatasetConfig`` in one step and use it as an input to another step, that data dependency between steps creates an implicit execution order in the pipeline.\n",
"\n",
"``OutputFileDatasetConfig`` objects return a directory, and by default write output to the default datastore of the workspace.\n",
"\n",
"Since instance creation for class ``OutputTabularDatasetConfig`` is not allowed, we first create an instance of this class. Then we use the ``read_parquet_files`` method to read the parquet file into ``OutputTabularDatasetConfig``."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.data.output_dataset_config import OutputFileDatasetConfig\n",
"\n",
"output_data = OutputFileDatasetConfig(\n",
" name=\"processed_data\", destination=(dstore, \"outputdataset/{run-id}/{output-name}\")\n",
").as_upload()\n",
"# output_data_dataset = output_data.register_on_complete(\n",
"# name='processed_data', description = 'files from prev step')\n",
"output_data = output_data.read_parquet_files()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 3.0 Build the training pipeline\n",
"Now that the dataset, WorkSpace, and datastore are set up, we can put together a pipeline for training.\n",
"\n",
"> Note that if you have an AzureML Data Scientist role, you will not have permission to create compute resources. Talk to your workspace or IT admin to create the compute targets described in this section, if they do not already exist."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Choose a compute target\n",
"\n",
"You will need to create a [compute target](https://docs.microsoft.com/en-us/azure/machine-learning/how-to-set-up-training-targets#amlcompute) for your AutoML run. In this tutorial, you create AmlCompute as your training compute resource.\n",
"\n",
"\\*\\*Creation of AmlCompute takes approximately 5 minutes.**\n",
"\n",
"If the AmlCompute with that name is already in your workspace this code will skip the creation process. As with other Azure services, there are limits on certain resources (e.g. AmlCompute) associated with the Azure Machine Learning service. Please read this [article](https://docs.microsoft.com/en-us/azure/machine-learning/how-to-manage-quotas) on the default limits and how to request more quota."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1613007037308
}
},
"outputs": [],
"source": [
"from azureml.core.compute import ComputeTarget, AmlCompute\n",
"\n",
"# Name your cluster\n",
"compute_name = \"mm-compute-v1\"\n",
"\n",
"\n",
"if compute_name in ws.compute_targets:\n",
" compute_target = ws.compute_targets[compute_name]\n",
" if compute_target and type(compute_target) is AmlCompute:\n",
" print(\"Found compute target: \" + compute_name)\n",
"else:\n",
" print(\"Creating a new compute target...\")\n",
" provisioning_config = AmlCompute.provisioning_configuration(\n",
" vm_size=\"STANDARD_D14_V2\", max_nodes=20\n",
" )\n",
" # Create the compute target\n",
" compute_target = ComputeTarget.create(ws, compute_name, provisioning_config)\n",
"\n",
" # Can poll for a minimum number of nodes and for a specific timeout.\n",
" # If no min node count is provided it will use the scale settings for the cluster\n",
" compute_target.wait_for_completion(\n",
" show_output=True, min_node_count=None, timeout_in_minutes=20\n",
" )\n",
"\n",
" # For a more detailed view of current cluster status, use the 'status' property\n",
" print(compute_target.status.serialize())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Configure the training run's environment\n",
"The next step is making sure that the remote training run has all the dependencies needed by the training steps. Dependencies and the runtime context are set by creating and configuring a RunConfiguration object.\n",
"\n",
"The code below shows two options for handling dependencies. As presented, with ``USE_CURATED_ENV = True``, the configuration is based on a [curated environment](https://docs.microsoft.com/en-us/azure/machine-learning/resource-curated-environments). Curated environments have prebuilt Docker images in the [Microsoft Container Registry](https://hub.docker.com/publishers/microsoftowner). For more information, see [Azure Machine Learning curated environments](https://docs.microsoft.com/en-us/azure/machine-learning/resource-curated-environments).\n",
"\n",
"The path taken if you change ``USE_CURATED_ENV`` to False shows the pattern for explicitly setting your dependencies. In that scenario, a new custom Docker image will be created and registered in an Azure Container Registry within your resource group (see [Introduction to private Docker container registries in Azure](https://docs.microsoft.com/en-us/azure/container-registry/container-registry-intro)). Building and registering this image can take quite a few minutes."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.core.runconfig import RunConfiguration\n",
"from azureml.core.conda_dependencies import CondaDependencies\n",
"from azureml.core import Environment\n",
"\n",
"aml_run_config = RunConfiguration()\n",
"aml_run_config.target = compute_target\n",
"\n",
"USE_CURATED_ENV = True\n",
"if USE_CURATED_ENV:\n",
" curated_environment = Environment.get(\n",
" workspace=ws, name=\"AzureML-sklearn-1.5\"\n",
" )\n",
" aml_run_config.environment = curated_environment\n",
"else:\n",
" aml_run_config.environment.python.user_managed_dependencies = False\n",
"\n",
" # Add some packages relied on by data prep step\n",
" aml_run_config.environment.python.conda_dependencies = CondaDependencies.create(\n",
" conda_packages=[\"pandas\", \"scikit-learn\"],\n",
" pip_packages=[\"azureml-sdk\", \"azureml-dataset-runtime[fuse,pandas]\"],\n",
" pin_sdk_version=False,\n",
" )"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Set up training parameters\n",
"\n",
"We need to provide ``ForecastingParameters``, ``AutoMLConfig`` and ``ManyModelsTrainParameters`` objects. For the forecasting task we also need to define several settings including the name of the time column, the maximum forecast horizon, and the partition column name(s) definition.\n",
"\n",
"#### ``ForecastingParameters`` arguments\n",
"| Property | Description|\n",
"| :--------------- | :------------------- |\n",
"| **forecast_horizon** | The forecast horizon is how many periods forward you would like to forecast. This integer horizon is in units of the timeseries frequency (e.g. daily, weekly). Periods are inferred from your data. |\n",
"| **time_column_name** | The name of your time column. |\n",
"| **time_series_id_column_names** | The column names used to uniquely identify timeseries in data that has multiple rows with the same timestamp. |\n",
"| **cv_step_size** | Number of periods between two consecutive cross-validation folds. The default value is \\\"auto\\\", in which case AutoMl determines the cross-validation step size automatically, if a validation set is not provided. Or users could specify an integer value. |\n",
"\n",
"#### ``AutoMLConfig`` arguments\n",
"| Property | Description|\n",
"| :--------------- | :------------------- |\n",
"| **task** | forecasting |\n",
"| **primary_metric** | This is the metric that you want to optimize.<br> Forecasting supports the following primary metrics <br><i>spearman_correlation</i><br><i>normalized_root_mean_squared_error</i><br><i>r2_score</i><br><i>normalized_mean_absolute_error</i> |\n",
"| **blocked_models** | Blocked models won't be used by AutoML. |\n",
"| **iteration_timeout_minutes** | Maximum amount of time in minutes that the model can train. This is optional but provides customers with greater control on exit criteria. |\n",
"| **iterations** | Number of models to train. This is optional but provides customers with greater control on exit criteria. |\n",
"| **experiment_timeout_hours** | Maximum amount of time in hours that each experiment can take before it terminates. This is optional but provides customers with greater control on exit criteria. **It does not control the overall timeout for the pipeline run, instead controls the timeout for each training run per partitioned time series.** |\n",
"| **label_column_name** | The name of the label column. |\n",
"| **n_cross_validations** | Number of cross validation splits. The default value is \\\"auto\\\", in which case AutoMl determines the number of cross-validations automatically, if a validation set is not provided. Or users could specify an integer value. Rolling Origin Validation is used to split time-series in a temporally consistent way. |\n",
"| **enable_early_stopping** | Flag to enable early termination if the primary metric is no longer improving. |\n",
"| **enable_engineered_explanations** | Engineered feature explanations will be downloaded if enable_engineered_explanations flag is set to True. By default it is set to False to save storage space. |\n",
"| **track_child_runs** | Flag to disable tracking of child runs. Only best run is tracked if the flag is set to False (this includes the model and metrics of the run). |\n",
"| **pipeline_fetch_max_batch_size** | Determines how many pipelines (training algorithms) to fetch at a time for training, this helps reduce throttling when training at large scale. |\n",
"\n",
"\n",
"#### ``ManyModelsTrainParameters`` arguments\n",
"| Property | Description|\n",
"| :--------------- | :------------------- |\n",
"| **automl_settings** | The ``AutoMLConfig`` object defined above. |\n",
"| **partition_column_names** | The names of columns used to group your models. For timeseries, the groups must not split up individual time-series. That is, each group must contain one or more whole time-series. |"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1613007061544
}
},
"outputs": [],
"source": [
"from azureml.train.automl.runtime._many_models.many_models_parameters import (\n",
" ManyModelsTrainParameters,\n",
")\n",
"from azureml.automl.core.forecasting_parameters import ForecastingParameters\n",
"from azureml.train.automl.automlconfig import AutoMLConfig\n",
"\n",
"partition_column_names = [\"Store\", \"Brand\"]\n",
"\n",
"forecasting_parameters = ForecastingParameters(\n",
" time_column_name=\"WeekStarting\",\n",
" forecast_horizon=6,\n",
" time_series_id_column_names=partition_column_names,\n",
" cv_step_size=\"auto\",\n",
")\n",
"\n",
"automl_settings = AutoMLConfig(\n",
" task=\"forecasting\",\n",
" primary_metric=\"normalized_root_mean_squared_error\",\n",
" iteration_timeout_minutes=10,\n",
" iterations=15,\n",
" experiment_timeout_hours=0.25,\n",
" label_column_name=\"Quantity\",\n",
" n_cross_validations=\"auto\", # Feel free to set to a small integer (>=2) if runtime is an issue.\n",
" track_child_runs=False,\n",
" forecasting_parameters=forecasting_parameters,\n",
")\n",
"\n",
"mm_paramters = ManyModelsTrainParameters(\n",
" automl_settings=automl_settings, partition_column_names=partition_column_names\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Construct your pipeline steps\n",
"Once you have the compute resource and environment created, you're ready to define your pipeline's steps. There are many built-in steps available via the Azure Machine Learning SDK, as you can see on the [reference documentation for the azureml.pipeline.steps package](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps?view=azure-ml-py). The most flexible class is [PythonScriptStep](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.python_script_step.pythonscriptstep?view=azure-ml-py), which runs a Python script.\n",
"\n",
"Your data preparation code is in a subdirectory (in this example, \"data_preprocessing_tabular.py\" in the directory \"./scripts\"). As part of the pipeline creation process, this directory is zipped and uploaded to the compute_target and the step runs the script specified as the value for ``script_name``.\n",
"\n",
"The ``arguments`` values specify the inputs and outputs of the step. In the example below, the baseline data is the ``input_ds_small`` dataset. The script data_preprocessing_tabular.py does whatever data-transformation tasks are appropriate to the task at hand and outputs the data to ``output_data``, of type ``OutputFileDatasetConfig``. For more information, see [Moving data into and between ML pipeline steps (Python)](https://docs.microsoft.com/en-us/azure/machine-learning/how-to-move-data-in-out-of-pipelines). The step will run on the machine defined by ``compute_target``, using the configuration ``aml_run_config``.\n",
"\n",
"Reuse of previous results (``allow_reuse``) is key when using pipelines in a collaborative environment since eliminating unnecessary reruns offers agility. Reuse is the default behavior when the ``script_name``, ``inputs``, and the parameters of a step remain the same. When reuse is allowed, results from the previous run are immediately sent to the next step. If ``allow_reuse`` is set to False, a new run will always be generated for this step during pipeline execution.\n",
"\n",
"> Note that we only support partitioned FileDataset and TabularDataset without partition when using such output as input.\n",
"\n",
"> Note that we **drop column** \"Revenue\" from the dataset in this step to avoid information leak as \"Quantity\" = \"Revenue\" / \"Price\". **Please modify the logic based on your data**."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.pipeline.steps import PythonScriptStep\n",
"\n",
"dataprep_source_dir = \"./scripts\"\n",
"entry_point = \"data_preprocessing_tabular.py\"\n",
"ds_input = input_ds_small.as_named_input(\"train_10_models\")\n",
"\n",
"data_prep_step = PythonScriptStep(\n",
" script_name=entry_point,\n",
" source_directory=dataprep_source_dir,\n",
" arguments=[\"--input\", ds_input, \"--output\", output_data],\n",
" compute_target=compute_target,\n",
" runconfig=aml_run_config,\n",
" allow_reuse=False,\n",
")\n",
"\n",
"input_ds_small = output_data"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Set up many models pipeline"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Parallel run step is leveraged to train multiple models at once. To configure the ParallelRunConfig you will need to determine the appropriate number of workers and nodes for your use case. The ``process_count_per_node`` is based off the number of cores of the compute VM. The node_count will determine the number of master nodes to use, increasing the node count will speed up the training process.\n",
"\n",
"| Property | Description|\n",
"| :--------------- | :------------------- |\n",
"| **experiment** | The experiment used for training. |\n",
"| **train_data** | The file dataset to be used as input to the training run. |\n",
"| **node_count** | The number of compute nodes to be used for running the user script. We recommend to start with 3 and increase the node_count if the training time is taking too long. |\n",
"| **process_count_per_node** | Process count per node, we recommend 2:1 ratio for number of cores: number of processes per node. eg. If node has 16 cores then configure 8 or less process count per node for optimal performance. |\n",
"| **train_pipeline_parameters** | The set of configuration parameters defined in the previous section. |\n",
"| **run_invocation_timeout** | Maximum amount of time in seconds that the ``ParallelRunStep`` class is allowed. This is optional but provides customers with greater control on exit criteria. This must be greater than ``experiment_timeout_hours`` by at least 300 seconds. |\n",
"\n",
"Calling this method will create a new aggregated dataset which is generated dynamically on pipeline execution.\n",
"\n",
"**Note**: Total time taken for the **training step** in the pipeline to complete = $ \\frac{t}{ p \\times n } \\times ts $\n",
"where,\n",
"- $ t $ is time taken for training one partition (can be viewed in the training logs)\n",
"- $ p $ is ``process_count_per_node``\n",
"- $ n $ is ``node_count``\n",
"- $ ts $ is total number of partitions in time series based on ``partition_column_names``"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.contrib.automl.pipeline.steps import AutoMLPipelineBuilder\n",
"\n",
"\n",
"training_pipeline_steps = AutoMLPipelineBuilder.get_many_models_train_steps(\n",
" experiment=experiment,\n",
" train_data=input_ds_small,\n",
" compute_target=compute_target,\n",
" node_count=2,\n",
" process_count_per_node=8,\n",
" run_invocation_timeout=1200,\n",
" train_pipeline_parameters=mm_paramters,\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.pipeline.core import Pipeline\n",
"\n",
"training_pipeline = Pipeline(ws, steps=training_pipeline_steps)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Submit the pipeline to run\n",
"Next we submit our pipeline to run. The whole training pipeline takes about 40m using a STANDARD_D16S_V3 VM with our current ParallelRunConfig setting."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"training_run = experiment.submit(training_pipeline)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"training_run.wait_for_completion(show_output=False)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Check the run status, if training_run is in completed state, continue to forecasting. If training_run is in another state, check the portal for failures."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 5.0 Publish and schedule the train pipeline (Optional)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 5.1 Publish the pipeline\n",
"\n",
"Once you have a pipeline you're happy with, you can publish a pipeline so you can call it programmatically later on. See this [tutorial](https://docs.microsoft.com/en-us/azure/machine-learning/how-to-create-your-first-pipeline#publish-a-pipeline) for additional information on publishing and calling pipelines."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# published_pipeline = training_pipeline.publish(name = 'automl_train_many_models',\n",
"# description = 'train many models',\n",
"# version = '1',\n",
"# continue_on_step_failure = False)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 5.2 Schedule the pipeline\n",
"You can also [schedule the pipeline](https://docs.microsoft.com/en-us/azure/machine-learning/how-to-schedule-pipelines) to run on a time-based or change-based schedule. This could be used to automatically retrain models every month or based on another trigger such as data drift."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# from azureml.pipeline.core import Schedule, ScheduleRecurrence\n",
"\n",
"# training_pipeline_id = published_pipeline.id\n",
"\n",
"# recurrence = ScheduleRecurrence(frequency=\"Month\", interval=1, start_time=\"2020-01-01T09:00:00\")\n",
"# recurring_schedule = Schedule.create(ws, name=\"automl_training_recurring_schedule\",\n",
"# description=\"Schedule Training Pipeline to run on the first day of every month\",\n",
"# pipeline_id=training_pipeline_id,\n",
"# experiment_name=experiment.name,\n",
"# recurrence=recurrence)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 6.0 Forecasting"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Set up output dataset for inference data\n",
"Output of inference can be represented as [OutputFileDatasetConfig](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.output_dataset_config.outputdatasetconfig?view=azure-ml-py) object and OutputFileDatasetConfig can be registered as a dataset. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.data import OutputFileDatasetConfig\n",
"\n",
"output_inference_data_ds = OutputFileDatasetConfig(\n",
" name=\"many_models_inference_output\", destination=(dstore, \"oj/inference_data/\")\n",
").register_on_complete(name=\"oj_inference_data_ds\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"For many models we need to provide the ManyModelsInferenceParameters object.\n",
"\n",
"#### ``ManyModelsInferenceParameters`` arguments\n",
"| Property | Description|\n",
"| :--------------- | :------------------- |\n",
"| **partition_column_names** | List of column names that identifies groups. |\n",
"| **target_column_name** | \\[Optional] Column name only if the inference dataset has the target. |\n",
"| **time_column_name** | \\[Optional] Time column name only if it is timeseries. |\n",
"| **inference_type** | \\[Optional] Which inference method to use on the model. Possible values are 'forecast', 'predict_proba', and 'predict'. |\n",
"| **forecast_mode** | \\[Optional] The type of forecast to be used, either 'rolling' or 'recursive'; defaults to 'recursive'. |\n",
"| **step** | \\[Optional] Number of periods to advance the forecasting window in each iteration **(for rolling forecast only)**; defaults to 1. |\n",
"\n",
"#### ``get_many_models_batch_inference_steps`` arguments\n",
"| Property | Description|\n",
"| :--------------- | :------------------- |\n",
"| **experiment** | The experiment used for inference run. |\n",
"| **inference_data** | The data to use for inferencing. It should be the same schema as used for training.\n",
"| **compute_target** | The compute target that runs the inference pipeline. |\n",
"| **node_count** | The number of compute nodes to be used for running the user script. We recommend to start with the number of cores per node (varies by compute sku). |\n",
"| **process_count_per_node** | \\[Optional] The number of processes per node. By default it's 2 (should be at most half of the number of cores in a single node of the compute cluster that will be used for the experiment).\n",
"| **inference_pipeline_parameters** | \\[Optional] The ``ManyModelsInferenceParameters`` object defined above. |\n",
"| **append_row_file_name** | \\[Optional] The name of the output file (optional, default value is 'parallel_run_step.txt'). Supports 'txt' and 'csv' file extension. A 'txt' file extension generates the output in 'txt' format with space as separator without column names. A 'csv' file extension generates the output in 'csv' format with comma as separator and with column names. |\n",
"| **train_run_id** | \\[Optional] The run id of the **training pipeline**. By default it is the latest successful training pipeline run in the experiment. |\n",
"| **train_experiment_name** | \\[Optional] The train experiment that contains the train pipeline. This one is only needed when the train pipeline is not in the same experiement as the inference pipeline. |\n",
"| **run_invocation_timeout** | \\[Optional] Maximum amount of time in seconds that the ``ParallelRunStep`` class is allowed. This is optional but provides customers with greater control on exit criteria. |\n",
"| **output_datastore** | \\[Optional] The ``Datastore`` or ``OutputDatasetConfig`` to be used for output. If specified any pipeline output will be written to that location. If unspecified the default datastore will be used. |\n",
"| **arguments** | \\[Optional] Arguments to be passed to inference script. Possible argument is '--forecast_quantiles' followed by quantile values. |"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.contrib.automl.pipeline.steps import AutoMLPipelineBuilder\n",
"from azureml.train.automl.runtime._many_models.many_models_parameters import (\n",
" ManyModelsInferenceParameters,\n",
")\n",
"\n",
"mm_parameters = ManyModelsInferenceParameters(\n",
" partition_column_names=[\"Store\", \"Brand\"],\n",
" time_column_name=\"WeekStarting\",\n",
" target_column_name=\"Quantity\",\n",
")\n",
"\n",
"output_file_name = \"parallel_run_step.csv\"\n",
"\n",
"inference_steps = AutoMLPipelineBuilder.get_many_models_batch_inference_steps(\n",
" experiment=experiment,\n",
" inference_data=inference_ds_small,\n",
" node_count=2,\n",
" process_count_per_node=8,\n",
" compute_target=compute_target,\n",
" run_invocation_timeout=300,\n",
" output_datastore=output_inference_data_ds,\n",
" train_run_id=training_run.id,\n",
" train_experiment_name=training_run.experiment.name,\n",
" inference_pipeline_parameters=mm_parameters,\n",
" append_row_file_name=output_file_name,\n",
" arguments=[\"--forecast_quantiles\", 0.1, 0.9],\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.pipeline.core import Pipeline\n",
"\n",
"inference_pipeline = Pipeline(ws, steps=inference_steps)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"inference_run = experiment.submit(inference_pipeline)\n",
"inference_run.wait_for_completion(show_output=False)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Retrieve results\n",
"\n",
"The forecasting pipeline forecasts the orange juice quantity for a Store by Brand. The pipeline returns one file with the predictions for each store and outputs the result to the forecasting_output Blob container. The details of the blob container is listed in 'forecasting_output.txt' under Outputs+logs. \n",
"\n",
"The following code snippet:\n",
"1. Downloads the contents of the output folder that is passed in the parallel run step \n",
"2. Reads the output file that has the predictions as pandas dataframe and \n",
"3. Displays the top 10 rows of the predictions"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.contrib.automl.pipeline.steps.utilities import get_output_from_mm_pipeline\n",
"\n",
"forecasting_results_name = \"forecasting_results\"\n",
"forecasting_output_name = \"many_models_inference_output\"\n",
"forecast_file = get_output_from_mm_pipeline(\n",
" inference_run, forecasting_results_name, forecasting_output_name, output_file_name\n",
")\n",
"df = pd.read_csv(forecast_file)\n",
"print(\n",
" \"Prediction has \", df.shape[0], \" rows. Here the first 10 rows are being displayed.\"\n",
")\n",
"df.head(10)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 7.0 Publish and schedule the inference pipeline (Optional)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 7.1 Publish the pipeline\n",
"\n",
"Once you have a pipeline you're happy with, you can publish a pipeline so you can call it programmatically later on. See this [tutorial](https://docs.microsoft.com/en-us/azure/machine-learning/how-to-create-your-first-pipeline#publish-a-pipeline) for additional information on publishing and calling pipelines."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# published_pipeline_inf = inference_pipeline.publish(name = 'automl_forecast_many_models',\n",
"# description = 'forecast many models',\n",
"# version = '1',\n",
"# continue_on_step_failure = False)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 7.2 Schedule the pipeline\n",
"You can also [schedule the pipeline](https://docs.microsoft.com/en-us/azure/machine-learning/how-to-schedule-pipelines) to run on a time-based or change-based schedule. This could be used to automatically retrain or forecast models every month or based on another trigger such as data drift."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# from azureml.pipeline.core import Schedule, ScheduleRecurrence\n",
"\n",
"# forecasting_pipeline_id = published_pipeline.id\n",
"\n",
"# recurrence = ScheduleRecurrence(frequency=\"Month\", interval=1, start_time=\"2020-01-01T09:00:00\")\n",
"# recurring_schedule = Schedule.create(ws, name=\"automl_forecasting_recurring_schedule\",\n",
"# description=\"Schedule Forecasting Pipeline to run on the first day of every week\",\n",
"# pipeline_id=forecasting_pipeline_id,\n",
"# experiment_name=experiment.name,\n",
"# recurrence=recurrence)"
]
}
],
"metadata": {
"authors": [
{
"name": "jialiu"
}
],
"categories": [
"how-to-use-azureml",
"automated-machine-learning"
],
"kernelspec": {
"display_name": "Python 3.8 - AzureML",
"language": "python",
"name": "python38-azureml"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.10"
},
"vscode": {
"interpreter": {
"hash": "6bd77c88278e012ef31757c15997a7bea8c943977c43d6909403c00ae11d43ca"
}
}
},
"nbformat": 4,
"nbformat_minor": 4
}