Files
MachineLearningNotebooks/how-to-use-azureml/machine-learning-pipelines/parallel-run/tabular-dataset-partition-per-column.ipynb

421 lines
15 KiB
Plaintext

{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Copyright (c) Microsoft Corporation. All rights reserved. \n",
"Licensed under the MIT License."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/machine-learning-pipelines/parallel-run/tabular-dataset-partition-per-column.png)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Using Azure Machine Learning Pipelines for Batch Inference for tabular input partitioned by column value\n",
"\n",
"In this notebook, we will demonstrate how to make predictions on large quantities of data asynchronously using the ML pipelines with Azure Machine Learning. Batch inference (or batch scoring) provides cost-effective inference, with unparalleled throughput for asynchronous applications. Batch prediction pipelines can scale to perform inference on terabytes of production data. Batch prediction is optimized for high throughput, fire-and-forget predictions for a large collection of data.\n",
"\n",
"> **Tip**\n",
"If your system requires low-latency processing (to process a single document or small set of documents quickly), use [real-time scoring](https://docs.microsoft.com/en-us/azure/machine-learning/v1/how-to-consume-web-service) instead of batch prediction.\n",
"\n",
"This example will create a partitioned tabular dataset by splitting the rows in a large csv file by its value on specified column. Each partition will form up a mini-batch in the parallel processing procedure.\n",
"\n",
"The outline of this notebook is as follows:\n",
"\n",
"- Create a tabular dataset partitioned by value on specified column.\n",
"- Do batch inference on the dataset with each mini-batch corresponds to one partition.\n",
"\n",
"## Prerequisites\n",
"If you are using an Azure Machine Learning Notebook VM, you are all set. Otherwise, make sure you go through the configuration Notebook located at https://github.com/Azure/MachineLearningNotebooks first. This sets you up with a working config file that has information on your workspace, subscription id, etc. "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Connect to workspace"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.core.workspace import Workspace\n",
"ws = Workspace.from_config()\n",
"print('Workspace name: ' + ws.name, \n",
" 'Azure region: ' + ws.location, \n",
" 'Subscription id: ' + ws.subscription_id, \n",
" 'Resource group: ' + ws.resource_group, sep = '\\n')\n",
"\n",
"datastore = ws.get_default_datastore()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import azureml.core\n",
"print(azureml.core.VERSION)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Download OJ sales data from opendataset url"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import requests\n",
"\n",
"oj_sales_path = \"./oj.csv\"\n",
"r = requests.get(\"https://raw.githubusercontent.com/Azure/azureml-examples/main/sdk/python/jobs/automl-standalone-jobs/automl-forecasting-orange-juice-sales/data/dominicks_OJ.csv\")\n",
"open(oj_sales_path, \"wb\").write(r.content)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Upload OJ sales data to datastore"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"datastore.upload_files([oj_sales_path], \".\", \"oj_sales_data\", overwrite=True, show_progress=False)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Create tabular dataset\n",
"Create normal tabular dataset"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.core import Dataset\n",
"\n",
"dataset = Dataset.Tabular.from_delimited_files(path=(datastore, 'oj_sales_data/*.csv'))\n",
"print(dataset.to_pandas_dataframe())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Partition the tabular dataset\n",
"Partition the dataset by column 'store' and 'brand'. You can get a partition of data by specifying the value of one or more partition keys. E.g., by specifying `store=1000 and brand='tropicana'`, you can get all the rows that matches this condition in the dataset."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"partitioned_dataset = dataset.partition_by(partition_keys=['Store', 'Brand'], target=(datastore, \"partition_by_key_res\"), name=\"partitioned_oj_data\")\n",
"partitioned_dataset.partition_keys"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Create or Attach existing compute resource"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"from azureml.core.compute import AmlCompute, ComputeTarget\n",
"\n",
"# choose a name for your cluster\n",
"compute_name = os.environ.get(\"AML_COMPUTE_CLUSTER_NAME\", \"cpu-cluster\")\n",
"compute_min_nodes = os.environ.get(\"AML_COMPUTE_CLUSTER_MIN_NODES\", 0)\n",
"compute_max_nodes = os.environ.get(\"AML_COMPUTE_CLUSTER_MAX_NODES\", 2)\n",
"\n",
"# This example uses CPU VM. For using GPU VM, set SKU to Standard_NC6s_v3\n",
"vm_size = os.environ.get(\"AML_COMPUTE_CLUSTER_SKU\", \"STANDARD_D2_V2\")\n",
"\n",
"\n",
"if compute_name in ws.compute_targets:\n",
" compute_target = ws.compute_targets[compute_name]\n",
" if compute_target and type(compute_target) is AmlCompute:\n",
" print('found compute target. just use it. ' + compute_name)\n",
"else:\n",
" print('creating a new compute target...')\n",
" provisioning_config = AmlCompute.provisioning_configuration(vm_size = vm_size,\n",
" min_nodes = compute_min_nodes, \n",
" max_nodes = compute_max_nodes)\n",
"\n",
" # create the cluster\n",
" compute_target = ComputeTarget.create(ws, compute_name, provisioning_config)\n",
" \n",
" # can poll for a minimum number of nodes and for a specific timeout. \n",
" # if no min node count is provided it will use the scale settings for the cluster\n",
" compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)\n",
" \n",
" # For a more detailed view of current AmlCompute status, use get_status()\n",
" print(compute_target.get_status().serialize())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Intermediate/Output Data"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.pipeline.core import Pipeline, PipelineData\n",
"\n",
"output_dir = PipelineData(name=\"inferences\", datastore=datastore)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Calculate total revenue of each mini-batch partitioned by dataset partition key(s)\n",
"The script sum up the total revenue of a mini-batch."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"scripts_folder = \"Code\"\n",
"script_file = \"total_income.py\"\n",
"\n",
"# peek at contents\n",
"with open(os.path.join(scripts_folder, script_file)) as inference_file:\n",
" print(inference_file.read())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Build and run the batch inference pipeline\n",
"### Specify the environment to run the script\n",
"You would need to specify the required private azureml packages in dependencies. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.core import Environment\n",
"from azureml.core.runconfig import CondaDependencies, DEFAULT_CPU_IMAGE\n",
"\n",
"batch_conda_deps = CondaDependencies.create(pip_packages=[\"azureml-core\", \"azureml-dataset-runtime[fuse,pandas]\"])\n",
"batch_env = Environment(name=\"batch_environment\")\n",
"batch_env.python.conda_dependencies = batch_conda_deps\n",
"batch_env.docker.base_image = DEFAULT_CPU_IMAGE"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Create the configuration to wrap the inference script\n",
"The parameter `partition_keys` is a list containing a subset of the dataset partition keys, specifying how is the input dataset partitioned. Each and every possible combination of values of partition_keys will form up a mini-batch. E.g., by specifying `partition_keys=['store', 'brand']` will result in mini-batches like `store=1000 && brand=tropicana`, `store=1000 && brand=dominicks`, `store=1001 && brand=dominicks`, ..."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig\n",
"\n",
"# In a real-world scenario, you'll want to shape your process per node and nodes to fit your problem domain.\n",
"parallel_run_config = ParallelRunConfig(\n",
" source_directory=scripts_folder,\n",
" entry_script=script_file, # the user script to run against each input\n",
" partition_keys=['Store', 'Brand'],\n",
" error_threshold=5,\n",
" output_action='append_row',\n",
" append_row_file_name=\"revenue_outputs.txt\",\n",
" environment=batch_env,\n",
" compute_target=compute_target, \n",
" node_count=2,\n",
" run_invocation_timeout=600\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Create the pipeline step"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"parallel_run_step = ParallelRunStep(\n",
" name='summarize-revenue',\n",
" inputs=[partitioned_dataset.as_named_input(\"partitioned_tabular_input\")],\n",
" output=output_dir,\n",
" parallel_run_config=parallel_run_config,\n",
" allow_reuse=False\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Run the pipeline"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.core import Experiment\n",
"from azureml.pipeline.core import Pipeline\n",
"\n",
"pipeline = Pipeline(workspace=ws, steps=[parallel_run_step])\n",
"\n",
"pipeline_run = Experiment(ws, 'tabular-dataset-partition').submit(pipeline)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"pipeline_run.wait_for_completion(show_output=True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## View the prediction results\n",
"In the total_income.py file above you can see that the ResultList with the filename and the prediction result gets returned. These are written to the DataStore specified in the PipelineData object as the output data, which in this case is called inferences. This containers the outputs from all of the worker nodes used in the compute cluster. You can download this data to view the results ... below just filters to the first 10 rows"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"import tempfile\n",
"\n",
"batch_run = pipeline_run.find_step_run(parallel_run_step.name)[0]\n",
"batch_output = batch_run.get_output_data(output_dir.name)\n",
"\n",
"target_dir = tempfile.mkdtemp()\n",
"batch_output.download(local_path=target_dir)\n",
"result_file = os.path.join(target_dir, batch_output.path_on_datastore, parallel_run_config.append_row_file_name)\n",
"\n",
"df = pd.read_csv(result_file, delimiter=\" \", header=None)\n",
"df.columns=[\"WeekStarting\", \"Quantity\", \"logQuantity\", \"Advert\", \"Price\", \"Age60\", \"COLLEGE\", \"INCOME\", \"Hincome150\", \"Large HH\", \"Minorities\", \"WorkingWoman\", \"SSTRDIST\", \"SSTRVOL\", \"CPDIST5\", \"CPWVOL5\", \"Store\", \"Brand\", \"total_income\"]\n",
"\n",
"print(\"Prediction has \", df.shape[0], \" rows\")\n",
"df.head(10)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"authors": [
{
"name": "prsbjdev"
}
],
"category": "Other notebooks",
"compute": [
"AML Compute"
],
"datasets": [
"OJ Sales Data"
],
"deployment": [
"None"
],
"exclude_from_index": false,
"framework": [
"None"
],
"friendly_name": "Batch inferencing OJ Sales Data partitioned by column using ParallelRunStep",
"index_order": 1,
"kernelspec": {
"display_name": "Python 3.8 - AzureML",
"language": "python",
"name": "python38-azureml"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.13"
}
},
"nbformat": 4,
"nbformat_minor": 4
}