update samples from Release-167 as a part of 1.0.83 SDK release
This commit is contained in:
@@ -1,41 +0,0 @@
|
||||
|
||||
# Copyright (c) Microsoft. All rights reserved.
|
||||
# Licensed under the MIT license.
|
||||
|
||||
import os
|
||||
import numpy as np
|
||||
import tensorflow as tf
|
||||
from PIL import Image
|
||||
from azureml.core import Model
|
||||
|
||||
|
||||
def init():
|
||||
global g_tf_sess
|
||||
|
||||
# pull down model from workspace
|
||||
model_path = Model.get_model_path("mnist")
|
||||
|
||||
# contruct graph to execute
|
||||
tf.reset_default_graph()
|
||||
saver = tf.train.import_meta_graph(os.path.join(model_path, 'mnist-tf.model.meta'))
|
||||
g_tf_sess = tf.Session(config=tf.ConfigProto(device_count={'GPU': 0}))
|
||||
saver.restore(g_tf_sess, os.path.join(model_path, 'mnist-tf.model'))
|
||||
|
||||
|
||||
def run(mini_batch):
|
||||
print(f'run method start: {__file__}, run({mini_batch})')
|
||||
resultList = []
|
||||
in_tensor = g_tf_sess.graph.get_tensor_by_name("network/X:0")
|
||||
output = g_tf_sess.graph.get_tensor_by_name("network/output/MatMul:0")
|
||||
|
||||
for image in mini_batch:
|
||||
# prepare each image
|
||||
data = Image.open(image)
|
||||
np_im = np.array(data).reshape((1, 784))
|
||||
# perform inference
|
||||
inference_result = output.eval(feed_dict={in_tensor: np_im}, session=g_tf_sess)
|
||||
# find best probability, and add to result list
|
||||
best_result = np.argmax(inference_result)
|
||||
resultList.append("{}: {}".format(os.path.basename(image), best_result))
|
||||
|
||||
return resultList
|
||||
@@ -1,31 +0,0 @@
|
||||
import io
|
||||
import pickle
|
||||
import argparse
|
||||
import numpy as np
|
||||
|
||||
from azureml.core.model import Model
|
||||
from sklearn.linear_model import LogisticRegression
|
||||
|
||||
|
||||
def init():
|
||||
global iris_model
|
||||
|
||||
parser = argparse.ArgumentParser(description="Iris model serving")
|
||||
parser.add_argument('--model_name', dest="model_name", required=True)
|
||||
args, unknown_args = parser.parse_known_args()
|
||||
|
||||
model_path = Model.get_model_path(args.model_name)
|
||||
with open(model_path, 'rb') as model_file:
|
||||
iris_model = pickle.load(model_file)
|
||||
|
||||
|
||||
def run(input_data):
|
||||
# make inference
|
||||
num_rows, num_cols = input_data.shape
|
||||
pred = iris_model.predict(input_data).reshape((num_rows, 1))
|
||||
|
||||
# cleanup output
|
||||
result = input_data.drop(input_data.columns[4:], axis=1)
|
||||
result['variety'] = pred
|
||||
|
||||
return result
|
||||
@@ -1,127 +0,0 @@
|
||||
# Azure Machine Learning Batch Inference
|
||||
|
||||
Azure Machine Learning Batch Inference targets large inference jobs that are not time-sensitive. Batch Inference provides cost-effective inference compute scaling, with unparalleled throughput for asynchronous applications. It is optimized for high-throughput, fire-and-forget inference over large collections of data.
|
||||
|
||||
# Getting Started with Batch Inference Public Preview
|
||||
|
||||
Batch inference public preview offers a platform in which to do large inference or generic parallel map-style operations. Below introduces the major steps to use this new functionality. For a quick try, please follow the prerequisites and simply run the sample notebooks provided in this directory.
|
||||
|
||||
## Prerequisites
|
||||
|
||||
### Python package installation
|
||||
Following the convention of most AzureML Public Preview features, Batch Inference SDK is currently available as a contrib package.
|
||||
|
||||
If you're unfamiliar with creating a new Python environment, you may follow this example for [creating a conda environment](https://docs.microsoft.com/en-us/azure/machine-learning/service/how-to-configure-environment#local). Batch Inference package can be installed through the following pip command.
|
||||
```
|
||||
pip install azureml-contrib-pipeline-steps
|
||||
```
|
||||
|
||||
### Creation of Azure Machine Learning Workspace
|
||||
If you do not already have a Azure ML Workspace, please run the [configuration Notebook](../../configuration.ipynb).
|
||||
|
||||
## Configure a Batch Inference job
|
||||
|
||||
To run a Batch Inference job, you will need to gather some configuration data.
|
||||
|
||||
1. **ParallelRunConfig**
|
||||
- **entry_script**: the local file path to the scoring script. If source_directory is specified, use relative path, otherwise use any path accessible on machine.
|
||||
- **error_threshold**: the number of record failures for TabularDataset and file failures for FileDataset that should be ignored during processing. If the aggregated error count (across all workers) goes above this value, then the job will be aborted. Set to -1 to ignore all failures during processing.
|
||||
- **output_action**: one of the following values
|
||||
- **"append_row"**: all values output by run() method invocations will be aggregated into one unique file named parallel_run_step.txt that is created in the output location.
|
||||
- **"summary_only"** – scoring script will handle the output by itself. The script still needs to return one output row per successfully-processed input item. This is used for error threshold calculation (the actual value of the output row is ignored).
|
||||
- **source_directory**: supporting files for scoring (optional)
|
||||
- **compute_target**: only **AmlCompute** is supported currently
|
||||
- **node_count**: number of compute nodes to use.
|
||||
- **process_count_per_node**: number of processes per node (optional, default value is 1).
|
||||
- **mini_batch_size**: the approximate amount of input data passed to each run() invocation. For FileDataset input, this is number of files user script can process in one run() call. For TabularDataset input it is approximate size of data user script can process in one run() call. E.g. 1024, 1024KB, 10MB, 1GB (optional, default value 10 files for FileDataset and 1MB for TabularDataset.)
|
||||
- **logging_level**: log verbosity. Values in increasing verbosity are: 'WARNING', 'INFO', 'DEBUG' (optional, default value is 'INFO').
|
||||
- **run_invocation_timeout**: run method invocation timeout period in seconds (optional, default value is 60).
|
||||
- **environment**: The environment definition. This field configures the Python environment. It can be configured to use an existing Python environment or to set up a temp environment for the experiment. The definition is also responsible for setting the required application dependencies.
|
||||
- **description**: name given to batch service.
|
||||
|
||||
2. **Scoring (entry) script**: entry point for execution, scoring script should contain two functions:
|
||||
- **init()**: this function should be used for any costly or common preparation for subsequent inferences, e.g., deserializing and loading the model into a global object.
|
||||
- **run(mini_batch)**: The method to be parallelized. Each invocation will have one minibatch.
|
||||
- **mini_batch**: Batch inference will invoke run method and pass either a list or Pandas DataFrame as an argument to the method. Each entry in min_batch will be - a filepath if input is a FileDataset, a Pandas DataFrame if input is a TabularDataset.
|
||||
- **return value**: run() method should return a Pandas DataFrame or an array. For append_row output_action, these returned elements are appended into the common output file. For summary_only, the contents of the elements are ignored. For all output actions, each returned output element indicates one successful inference of input element in the input mini-batch.
|
||||
|
||||
3. **Base image** (optional)
|
||||
- if GPU is required, use DEFAULT_GPU_IMAGE as base image in environment. [Example GPU environment](./file-dataset-image-inference-mnist.ipynb#specify-the-environment-to-run-the-script)
|
||||
|
||||
Example image pull:
|
||||
```python
|
||||
from azureml.core.runconfig import ContainerRegistry
|
||||
|
||||
# use an image available in public Container Registry without authentication
|
||||
public_base_image = "mcr.microsoft.com/azureml/o16n-sample-user-base/ubuntu-miniconda"
|
||||
|
||||
# or use an image available in a private Container Registry
|
||||
base_image = "myregistry.azurecr.io/mycustomimage:1.0"
|
||||
base_image_registry = ContainerRegistry()
|
||||
base_image_registry.address = "myregistry.azurecr.io"
|
||||
base_image_registry.username = "username"
|
||||
base_image_registry.password = "password"
|
||||
```
|
||||
|
||||
|
||||
## Create a batch inference job
|
||||
|
||||
**ParallelRunStep** is a newly added step in the azureml.contrib.pipeline.steps package. You will use it to add a step to create a batch inference job with your Azure machine learning pipeline. (Use batch inference without an Azure machine learning pipeline is not supported yet). ParallelRunStep has all the following parameters:
|
||||
- **name**: this name will be used to register batch inference service, has the following naming restrictions: (unique, 3-32 chars and regex ^\[a-z\]([-a-z0-9]*[a-z0-9])?$)
|
||||
- **models**: zero or more model names already registered in Azure Machine Learning model registry.
|
||||
- **parallel_run_config**: ParallelRunConfig as defined above.
|
||||
- **inputs**: one or more Dataset objects.
|
||||
- **output**: this should be a PipelineData object encapsulating an Azure BLOB container path.
|
||||
- **arguments**: list of custom arguments passed to scoring script (optional)
|
||||
- **allow_reuse**: optional, default value is True. If the inputs remain the same as a previous run, it will make the previous run results immediately available (skips re-computing the step).
|
||||
|
||||
## Passing arguments from pipeline submission to script
|
||||
|
||||
Many tasks require arguments to be passed from job submission to the distributed runs. Below is an example to pass such information.
|
||||
```
|
||||
# from script which creates pipeline job
|
||||
parallelrun_step = ParallelRunStep(
|
||||
...
|
||||
arguments=["--model_name", "mosaic"] # name of the model we want to use, in case we have more than one option
|
||||
)
|
||||
```
|
||||
```
|
||||
# from driver.py/score.py/task.py
|
||||
import argparse
|
||||
|
||||
parser.add_argument('--model_name', dest="model_name")
|
||||
|
||||
args, unknown_args = parser.parse_known_args()
|
||||
|
||||
# to access values
|
||||
args.model_name # "mosaic"
|
||||
```
|
||||
|
||||
## Submit a batch inference job
|
||||
|
||||
You can submit a batch inference job by pipeline_run, or through REST calls with a published pipeline. To control node count using REST API/experiment, please use aml_node_count(special) pipeline parameter. A typical use case follows:
|
||||
|
||||
```python
|
||||
pipeline = Pipeline(workspace=ws, steps=[parallelrun_step])
|
||||
pipeline_run = Experiment(ws, 'name_of_pipeline_run').submit(pipeline)
|
||||
```
|
||||
|
||||
## Monitor your batch inference job
|
||||
|
||||
A batch inference job can take a long time to finish. You can monitor your job's progress from Azure portal, using Azure ML widgets, view console output through SDK, or check out overview.txt in log/azureml directory.
|
||||
|
||||
```python
|
||||
# view with widgets (will display GUI inside a browser)
|
||||
from azureml.widgets import RunDetails
|
||||
RunDetails(pipeline_run).show()
|
||||
|
||||
# simple console output
|
||||
pipeline_run.wait_for_completion(show_output=True)
|
||||
```
|
||||
|
||||
# Sample notebooks
|
||||
|
||||
- [file-dataset-image-inference-mnist.ipynb](./file-dataset-image-inference-mnist.ipynb) demonstrates how to run batch inference on an MNIST dataset.
|
||||
- [tabular-dataset-inference-iris.ipynb](./tabular-dataset-inference-iris.ipynb) demonstrates how to run batch inference on an IRIS dataset.
|
||||
|
||||

|
||||
@@ -1,563 +0,0 @@
|
||||
{
|
||||
"cells": [
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"Copyright (c) Microsoft Corporation. All rights reserved. \n",
|
||||
"Licensed under the MIT License."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
""
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"# Using Azure Machine Learning Pipelines for Batch Inference\n",
|
||||
"\n",
|
||||
"In this notebook, we will demonstrate how to make predictions on large quantities of data asynchronously using the ML pipelines with Azure Machine Learning. Batch inference (or batch scoring) provides cost-effective inference, with unparalleled throughput for asynchronous applications. Batch prediction pipelines can scale to perform inference on terabytes of production data. Batch prediction is optimized for high throughput, fire-and-forget predictions for a large collection of data.\n",
|
||||
"\n",
|
||||
"> **Tip**\n",
|
||||
"If your system requires low-latency processing (to process a single document or small set of documents quickly), use [real-time scoring](https://docs.microsoft.com/en-us/azure/machine-learning/service/how-to-consume-web-service) instead of batch prediction.\n",
|
||||
"\n",
|
||||
"In this example will be take a digit identification model already-trained on MNIST dataset using the [AzureML training with deep learning example notebook](https://github.com/Azure/MachineLearningNotebooks/blob/master/how-to-use-azureml/training-with-deep-learning/train-hyperparameter-tune-deploy-with-keras/train-hyperparameter-tune-deploy-with-keras.ipynb), and run that trained model on some of the MNIST test images in batch. \n",
|
||||
"\n",
|
||||
"The input dataset used for this notebook differs from a standard MNIST dataset in that it has been converted to PNG images to demonstrate use of files as inputs to Batch Inference. A sample of PNG-converted images of the MNIST dataset were take from [this repository](https://github.com/myleott/mnist_png). \n",
|
||||
"\n",
|
||||
"The outline of this notebook is as follows:\n",
|
||||
"\n",
|
||||
"- Create a DataStore referencing MNIST images stored in a blob container.\n",
|
||||
"- Register the pretrained MNIST model into the model registry. \n",
|
||||
"- Use the registered model to do batch inference on the images in the data blob container.\n",
|
||||
"\n",
|
||||
"## Prerequisites\n",
|
||||
"If you are using an Azure Machine Learning Notebook VM, you are all set. Otherwise, make sure you go through the configuration Notebook located at https://github.com/Azure/MachineLearningNotebooks first. This sets you up with a working config file that has information on your workspace, subscription id, etc. "
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"### Connect to workspace\n",
|
||||
"Create a workspace object from the existing workspace. Workspace.from_config() reads the file config.json and loads the details into an object named ws."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"from azureml.core import Workspace\n",
|
||||
"\n",
|
||||
"ws = Workspace.from_config()\n",
|
||||
"print('Workspace name: ' + ws.name, \n",
|
||||
" 'Azure region: ' + ws.location, \n",
|
||||
" 'Subscription id: ' + ws.subscription_id, \n",
|
||||
" 'Resource group: ' + ws.resource_group, sep = '\\n')"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"### Create or Attach existing compute resource\n",
|
||||
"By using Azure Machine Learning Compute, a managed service, data scientists can train machine learning models on clusters of Azure virtual machines. Examples include VMs with GPU support. In this tutorial, you create Azure Machine Learning Compute as your training environment. The code below creates the compute clusters for you if they don't already exist in your workspace.\n",
|
||||
"\n",
|
||||
"**Creation of compute takes approximately 5 minutes. If the AmlCompute with that name is already in your workspace the code will skip the creation process.**"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"import os\n",
|
||||
"from azureml.core.compute import AmlCompute, ComputeTarget\n",
|
||||
"from azureml.core.compute_target import ComputeTargetException\n",
|
||||
"\n",
|
||||
"# choose a name for your cluster\n",
|
||||
"compute_name = os.environ.get(\"AML_COMPUTE_CLUSTER_NAME\", \"cpu-cluster\")\n",
|
||||
"compute_min_nodes = os.environ.get(\"AML_COMPUTE_CLUSTER_MIN_NODES\", 0)\n",
|
||||
"compute_max_nodes = os.environ.get(\"AML_COMPUTE_CLUSTER_MAX_NODES\", 4)\n",
|
||||
"\n",
|
||||
"# This example uses CPU VM. For using GPU VM, set SKU to STANDARD_NC6\n",
|
||||
"vm_size = os.environ.get(\"AML_COMPUTE_CLUSTER_SKU\", \"STANDARD_D2_V2\")\n",
|
||||
"\n",
|
||||
"\n",
|
||||
"if compute_name in ws.compute_targets:\n",
|
||||
" compute_target = ws.compute_targets[compute_name]\n",
|
||||
" if compute_target and type(compute_target) is AmlCompute:\n",
|
||||
" print('found compute target. just use it. ' + compute_name)\n",
|
||||
"else:\n",
|
||||
" print('creating a new compute target...')\n",
|
||||
" provisioning_config = AmlCompute.provisioning_configuration(vm_size = vm_size,\n",
|
||||
" min_nodes = compute_min_nodes, \n",
|
||||
" max_nodes = compute_max_nodes)\n",
|
||||
"\n",
|
||||
" # create the cluster\n",
|
||||
" compute_target = ComputeTarget.create(ws, compute_name, provisioning_config)\n",
|
||||
" \n",
|
||||
" # can poll for a minimum number of nodes and for a specific timeout. \n",
|
||||
" # if no min node count is provided it will use the scale settings for the cluster\n",
|
||||
" compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)\n",
|
||||
" \n",
|
||||
" # For a more detailed view of current AmlCompute status, use get_status()\n",
|
||||
" print(compute_target.get_status().serialize())"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"### Create a datastore containing sample images\n",
|
||||
"The input dataset used for this notebook differs from a standard MNIST dataset in that it has been converted to PNG images to demonstrate use of files as inputs to Batch Inference. A sample of PNG-converted images of the MNIST dataset were take from [this repository](https://github.com/myleott/mnist_png).\n",
|
||||
"\n",
|
||||
"We have created a public blob container `sampledata` on an account named `pipelinedata`, containing these images from the MNIST dataset. In the next step, we create a datastore with the name `images_datastore`, which points to this blob container. In the call to `register_azure_blob_container` below, setting the `overwrite` flag to `True` overwrites any datastore that was created previously with that name. \n",
|
||||
"\n",
|
||||
"This step can be changed to point to your blob container by providing your own `datastore_name`, `container_name`, and `account_name`."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"from azureml.core.datastore import Datastore\n",
|
||||
"\n",
|
||||
"account_name = \"pipelinedata\"\n",
|
||||
"datastore_name = \"mnist_datastore\"\n",
|
||||
"container_name = \"sampledata\"\n",
|
||||
"\n",
|
||||
"mnist_data = Datastore.register_azure_blob_container(ws, \n",
|
||||
" datastore_name=datastore_name, \n",
|
||||
" container_name= container_name, \n",
|
||||
" account_name=account_name,\n",
|
||||
" overwrite=True)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"Next, let's specify the default datastore for the outputs."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"def_data_store = ws.get_default_datastore()"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"### Create a FileDataset\n",
|
||||
"A [FileDataset](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.filedataset?view=azure-ml-py) references single or multiple files in your datastores or public urls. The files can be of any format. FileDataset provides you with the ability to download or mount the files to your compute. By creating a dataset, you create a reference to the data source location. If you applied any subsetting transformations to the dataset, they will be stored in the dataset as well. The data remains in its existing location, so no extra storage cost is incurred."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"from azureml.core.dataset import Dataset\n",
|
||||
"\n",
|
||||
"mnist_ds_name = 'mnist_sample_data'\n",
|
||||
"\n",
|
||||
"path_on_datastore = mnist_data.path('mnist')\n",
|
||||
"input_mnist_ds = Dataset.File.from_files(path=path_on_datastore, validate=False)\n",
|
||||
"registered_mnist_ds = input_mnist_ds.register(ws, mnist_ds_name, create_new_version=True)\n",
|
||||
"named_mnist_ds = registered_mnist_ds.as_named_input(mnist_ds_name)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"### Intermediate/Output Data\n",
|
||||
"Intermediate data (or output of a Step) is represented by [PipelineData](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipelinedata?view=azure-ml-py) object. PipelineData can be produced by one step and consumed in another step by providing the PipelineData object as an output of one step and the input of one or more steps.\n",
|
||||
"\n",
|
||||
"**Constructing PipelineData**\n",
|
||||
"- name: [Required] Name of the data item within the pipeline graph\n",
|
||||
"- datastore_name: Name of the Datastore to write this output to\n",
|
||||
"- output_name: Name of the output\n",
|
||||
"- output_mode: Specifies \"upload\" or \"mount\" modes for producing output (default: mount)\n",
|
||||
"- output_path_on_compute: For \"upload\" mode, the path to which the module writes this output during execution\n",
|
||||
"- output_overwrite: Flag to overwrite pre-existing data"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"from azureml.pipeline.core import Pipeline, PipelineData\n",
|
||||
"\n",
|
||||
"output_dir = PipelineData(name=\"inferences\", \n",
|
||||
" datastore=def_data_store, \n",
|
||||
" output_path_on_compute=\"mnist/results\")"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"### Download the Model\n",
|
||||
"\n",
|
||||
"Download and extract the model from https://pipelinedata.blob.core.windows.net/mnist-model/mnist-tf.tar.gz to \"models\" directory"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"import tarfile\n",
|
||||
"import urllib.request\n",
|
||||
"\n",
|
||||
"# create directory for model\n",
|
||||
"model_dir = 'models'\n",
|
||||
"if not os.path.isdir(model_dir):\n",
|
||||
" os.mkdir(model_dir)\n",
|
||||
"\n",
|
||||
"url=\"https://pipelinedata.blob.core.windows.net/mnist-model/mnist-tf.tar.gz\"\n",
|
||||
"response = urllib.request.urlretrieve(url, \"model.tar.gz\")\n",
|
||||
"tar = tarfile.open(\"model.tar.gz\", \"r:gz\")\n",
|
||||
"tar.extractall(model_dir)\n",
|
||||
"\n",
|
||||
"os.listdir(model_dir)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"### Register the model with Workspace\n",
|
||||
"A registered model is a logical container for one or more files that make up your model. For example, if you have a model that's stored in multiple files, you can register them as a single model in the workspace. After you register the files, you can then download or deploy the registered model and receive all the files that you registered.\n",
|
||||
"\n",
|
||||
"Using tags, you can track useful information such as the name and version of the machine learning library used to train the model. Note that tags must be alphanumeric. Learn more about registering models [here](https://docs.microsoft.com/en-us/azure/machine-learning/service/how-to-deploy-and-where#registermodel) "
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"from azureml.core.model import Model\n",
|
||||
"\n",
|
||||
"# register downloaded model \n",
|
||||
"model = Model.register(model_path = \"models/\",\n",
|
||||
" model_name = \"mnist\", # this is the name the model is registered as\n",
|
||||
" tags = {'pretrained': \"mnist\"},\n",
|
||||
" description = \"Mnist trained tensorflow model\",\n",
|
||||
" workspace = ws)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"### Using your model to make batch predictions\n",
|
||||
"To use the model to make batch predictions, you need an **entry script** and a list of **dependencies**:\n",
|
||||
"\n",
|
||||
"#### An entry script\n",
|
||||
"This script accepts requests, scores the requests by using the model, and returns the results.\n",
|
||||
"- __init()__ - Typically this function loads the model into a global object. This function is run only once at the start of batch processing per worker node/process. Init method can make use of following environment variables (ParallelRunStep input):\n",
|
||||
" 1.\tAZUREML_BI_OUTPUT_PATH \u00e2\u20ac\u201c output folder path\n",
|
||||
"- __run(mini_batch)__ - The method to be parallelized. Each invocation will have one minibatch.<BR>\n",
|
||||
"__mini_batch__: Batch inference will invoke run method and pass either a list or Pandas DataFrame as an argument to the method. Each entry in min_batch will be - a filepath if input is a FileDataset, a Pandas DataFrame if input is a TabularDataset.<BR>\n",
|
||||
"__run__ method response: run() method should return a Pandas DataFrame or an array. For append_row output_action, these returned elements are appended into the common output file. For summary_only, the contents of the elements are ignored. For all output actions, each returned output element indicates one successful inference of input element in the input mini-batch.\n",
|
||||
" User should make sure that enough data is included in inference result to map input to inference. Inference output will be written in output file and not guaranteed to be in order, user should use some key in the output to map it to input.\n",
|
||||
" \n",
|
||||
"\n",
|
||||
"#### Dependencies\n",
|
||||
"Helper scripts or Python/Conda packages required to run the entry script or model.\n",
|
||||
"\n",
|
||||
"The deployment configuration for the compute target that hosts the deployed model. This configuration describes things like memory and CPU requirements needed to run the model.\n",
|
||||
"\n",
|
||||
"These items are encapsulated into an inference configuration and a deployment configuration. The inference configuration references the entry script and other dependencies. You define these configurations programmatically when you use the SDK to perform the deployment. You define them in JSON files when you use the CLI."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"import os\n",
|
||||
"\n",
|
||||
"scripts_folder = \"Code\"\n",
|
||||
"script_file = \"digit_identification.py\"\n",
|
||||
"\n",
|
||||
"# peek at contents\n",
|
||||
"with open(os.path.join(scripts_folder, script_file)) as inference_file:\n",
|
||||
" print(inference_file.read())"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"## Build and run the batch inference pipeline\n",
|
||||
"The data, models, and compute resource are now available. Let's put all these together in a pipeline."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"### Specify the environment to run the script\n",
|
||||
"Specify the conda dependencies for your script. This will allow us to install pip packages as well as configure the inference environment."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"from azureml.core import Environment\n",
|
||||
"from azureml.core.runconfig import CondaDependencies, DEFAULT_CPU_IMAGE\n",
|
||||
"\n",
|
||||
"batch_conda_deps = CondaDependencies.create(pip_packages=[\"tensorflow==1.13.1\", \"pillow\"])\n",
|
||||
"\n",
|
||||
"batch_env = Environment(name=\"batch_environment\")\n",
|
||||
"batch_env.python.conda_dependencies = batch_conda_deps\n",
|
||||
"batch_env.docker.enabled = True\n",
|
||||
"batch_env.docker.base_image = DEFAULT_CPU_IMAGE"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"### Create the configuration to wrap the inference script"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"from azureml.contrib.pipeline.steps import ParallelRunStep, ParallelRunConfig\n",
|
||||
"\n",
|
||||
"parallel_run_config = ParallelRunConfig(\n",
|
||||
" source_directory=scripts_folder,\n",
|
||||
" entry_script=script_file,\n",
|
||||
" mini_batch_size=\"5\",\n",
|
||||
" error_threshold=10,\n",
|
||||
" output_action=\"append_row\",\n",
|
||||
" environment=batch_env,\n",
|
||||
" compute_target=compute_target,\n",
|
||||
" node_count=2)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"### Create the pipeline step\n",
|
||||
"Create the pipeline step using the script, environment configuration, and parameters. Specify the compute target you already attached to your workspace as the target of execution of the script. We will use ParallelRunStep to create the pipeline step."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"parallelrun_step = ParallelRunStep(\n",
|
||||
" name=\"predict-digits-mnist\",\n",
|
||||
" parallel_run_config=parallel_run_config,\n",
|
||||
" inputs=[ named_mnist_ds ],\n",
|
||||
" output=output_dir,\n",
|
||||
" models=[ model ],\n",
|
||||
" arguments=[ ],\n",
|
||||
" allow_reuse=True\n",
|
||||
")"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"### Run the pipeline\n",
|
||||
"At this point you can run the pipeline and examine the output it produced. The Experiment object is used to track the run of the pipeline"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"from azureml.core import Experiment\n",
|
||||
"\n",
|
||||
"pipeline = Pipeline(workspace=ws, steps=[parallelrun_step])\n",
|
||||
"experiment = Experiment(ws, 'digit_identification')\n",
|
||||
"pipeline_run = experiment.submit(pipeline)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"### Monitor the run"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"from azureml.widgets import RunDetails\n",
|
||||
"RunDetails(pipeline_run).show()"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"### Optional: View detailed logs (streaming) "
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"pipeline_run.wait_for_completion(show_output=True)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"### View the prediction results per input image\n",
|
||||
"In the score.py file above you can see that the ResultList with the filename and the prediction result gets returned. These are written to the DataStore specified in the PipelineData object as the output data, which in this case is called *inferences*. This containers the outputs from all of the worker nodes used in the compute cluster. You can download this data to view the results ... below just filters to the first 10 rows"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"import pandas as pd\n",
|
||||
"import shutil\n",
|
||||
"\n",
|
||||
"# remove previous run results, if present\n",
|
||||
"shutil.rmtree(\"mnist_results\", ignore_errors=True)\n",
|
||||
"\n",
|
||||
"batch_run = next(pipeline_run.get_children())\n",
|
||||
"batch_output = batch_run.get_output_data(\"inferences\")\n",
|
||||
"batch_output.download(local_path=\"mnist_results\")\n",
|
||||
"\n",
|
||||
"for root, dirs, files in os.walk(\"mnist_results\"):\n",
|
||||
" for file in files:\n",
|
||||
" if file.endswith('parallel_run_step.txt'):\n",
|
||||
" result_file = os.path.join(root,file)\n",
|
||||
"\n",
|
||||
"df = pd.read_csv(result_file, delimiter=\":\", header=None)\n",
|
||||
"df.columns = [\"Filename\", \"Prediction\"]\n",
|
||||
"print(\"Prediction has \", df.shape[0], \" rows\")\n",
|
||||
"df.head(10) "
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"## Cleanup Compute resources\n",
|
||||
"\n",
|
||||
"For re-occurring jobs, it may be wise to keep compute the compute resources and allow compute nodes to scale down to 0. However, since this is just a single-run job, we are free to release the allocated compute resources."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"# uncomment below and run if compute resources are no longer needed \n",
|
||||
"# compute_target.delete() "
|
||||
]
|
||||
}
|
||||
],
|
||||
"metadata": {
|
||||
"authors": [
|
||||
{
|
||||
"name": "joringer"
|
||||
},
|
||||
{
|
||||
"name": "asraniwa"
|
||||
},
|
||||
{
|
||||
"name": "pansav"
|
||||
},
|
||||
{
|
||||
"name": "tracych"
|
||||
}
|
||||
],
|
||||
"friendly_name": "MNIST data inferencing using ParallelRunStep",
|
||||
"exclude_from_index": false,
|
||||
"index_order": 1,
|
||||
"category": "Other notebooks",
|
||||
"compute": [
|
||||
"AML Compute"
|
||||
],
|
||||
"datasets": [
|
||||
"MNIST"
|
||||
],
|
||||
"deployment": [
|
||||
"None"
|
||||
],
|
||||
"framework": [
|
||||
"None"
|
||||
],
|
||||
"tags": [
|
||||
"Batch Inferencing",
|
||||
"Pipeline"
|
||||
],
|
||||
"task": "Digit identification",
|
||||
"kernelspec": {
|
||||
"display_name": "Python 3.6",
|
||||
"language": "python",
|
||||
"name": "python36"
|
||||
},
|
||||
"language_info": {
|
||||
"codemirror_mode": {
|
||||
"name": "ipython",
|
||||
"version": 3
|
||||
},
|
||||
"file_extension": ".py",
|
||||
"mimetype": "text/x-python",
|
||||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.6.9"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
"nbformat_minor": 2
|
||||
}
|
||||
@@ -1,6 +0,0 @@
|
||||
name: file-dataset-image-inference-mnist
|
||||
dependencies:
|
||||
- pip:
|
||||
- azureml-sdk
|
||||
- azureml-contrib-pipeline-steps
|
||||
- azureml-widgets
|
||||
@@ -1,538 +0,0 @@
|
||||
{
|
||||
"cells": [
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"Copyright (c) Microsoft Corporation. All rights reserved. \n",
|
||||
"Licensed under the MIT License."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
""
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"# Using Azure Machine Learning Pipelines for Batch Inference for CSV Files\n",
|
||||
"\n",
|
||||
"In this notebook, we will demonstrate how to make predictions on large quantities of data asynchronously using the ML pipelines with Azure Machine Learning. Batch inference (or batch scoring) provides cost-effective inference, with unparalleled throughput for asynchronous applications. Batch prediction pipelines can scale to perform inference on terabytes of production data. Batch prediction is optimized for high throughput, fire-and-forget predictions for a large collection of data.\n",
|
||||
"\n",
|
||||
"> **Tip**\n",
|
||||
"If your system requires low-latency processing (to process a single document or small set of documents quickly), use [real-time scoring](https://docs.microsoft.com/en-us/azure/machine-learning/service/how-to-consume-web-service) instead of batch prediction.\n",
|
||||
"\n",
|
||||
"In this example we will take use a machine learning model already trained to predict different types of iris flowers and run that trained model on some of the data in a CSV file which has characteristics of different iris flowers. However, the same example can be extended to manipulating data to any embarrassingly-parallel processing through a python script.\n",
|
||||
"\n",
|
||||
"The outline of this notebook is as follows:\n",
|
||||
"\n",
|
||||
"- Create a DataStore referencing the CSV files stored in a blob container.\n",
|
||||
"- Register the pretrained model into the model registry. \n",
|
||||
"- Use the registered model to do batch inference on the CSV files in the data blob container.\n",
|
||||
"\n",
|
||||
"## Prerequisites\n",
|
||||
"If you are using an Azure Machine Learning Notebook VM, you are all set. Otherwise, make sure you go through the configuration Notebook located at https://github.com/Azure/MachineLearningNotebooks first. This sets you up with a working config file that has information on your workspace, subscription id, etc. \n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"### Connect to workspace\n",
|
||||
"Create a workspace object from the existing workspace. Workspace.from_config() reads the file config.json and loads the details into an object named ws."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"from azureml.core import Workspace\n",
|
||||
"\n",
|
||||
"ws = Workspace.from_config()\n",
|
||||
"print('Workspace name: ' + ws.name, \n",
|
||||
" 'Azure region: ' + ws.location, \n",
|
||||
" 'Subscription id: ' + ws.subscription_id, \n",
|
||||
" 'Resource group: ' + ws.resource_group, sep = '\\n')"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"### Create or Attach existing compute resource\n",
|
||||
"By using Azure Machine Learning Compute, a managed service, data scientists can train machine learning models on clusters of Azure virtual machines. Examples include VMs with GPU support. In this tutorial, you create Azure Machine Learning Compute as your training environment. The code below creates the compute clusters for you if they don't already exist in your workspace.\n",
|
||||
"\n",
|
||||
"**Creation of compute takes approximately 5 minutes. If the AmlCompute with that name is already in your workspace the code will skip the creation process.**"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"import os\n",
|
||||
"from azureml.core.compute import AmlCompute, ComputeTarget\n",
|
||||
"from azureml.core.compute_target import ComputeTargetException\n",
|
||||
"\n",
|
||||
"# choose a name for your cluster\n",
|
||||
"compute_name = os.environ.get(\"AML_COMPUTE_CLUSTER_NAME\", \"cpu-cluster\")\n",
|
||||
"compute_min_nodes = os.environ.get(\"AML_COMPUTE_CLUSTER_MIN_NODES\", 0)\n",
|
||||
"compute_max_nodes = os.environ.get(\"AML_COMPUTE_CLUSTER_MAX_NODES\", 4)\n",
|
||||
"\n",
|
||||
"# This example uses CPU VM. For using GPU VM, set SKU to STANDARD_NC6\n",
|
||||
"vm_size = os.environ.get(\"AML_COMPUTE_CLUSTER_SKU\", \"STANDARD_D2_V2\")\n",
|
||||
"\n",
|
||||
"\n",
|
||||
"if compute_name in ws.compute_targets:\n",
|
||||
" compute_target = ws.compute_targets[compute_name]\n",
|
||||
" if compute_target and type(compute_target) is AmlCompute:\n",
|
||||
" print('found compute target. just use it. ' + compute_name)\n",
|
||||
"else:\n",
|
||||
" print('creating a new compute target...')\n",
|
||||
" provisioning_config = AmlCompute.provisioning_configuration(vm_size = vm_size,\n",
|
||||
" min_nodes = compute_min_nodes, \n",
|
||||
" max_nodes = compute_max_nodes)\n",
|
||||
"\n",
|
||||
" # create the cluster\n",
|
||||
" compute_target = ComputeTarget.create(ws, compute_name, provisioning_config)\n",
|
||||
" \n",
|
||||
" # can poll for a minimum number of nodes and for a specific timeout.\n",
|
||||
" # if no min node count is provided it will use the scale settings for the cluster\n",
|
||||
" compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)\n",
|
||||
" \n",
|
||||
" # For a more detailed view of current AmlCompute status, use get_status()\n",
|
||||
" print(compute_target.get_status().serialize())"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"### Create a datastore containing sample images\n",
|
||||
"The input dataset used for this notebook is CSV data which has attributes of different iris flowers. We have created a public blob container `sampledata` on an account named `pipelinedata`, containing iris data set. In the next step, we create a datastore with the name `iris_datastore`, which points to this container. In the call to `register_azure_blob_container` below, setting the `overwrite` flag to `True` overwrites any datastore that was created previously with that name. \n",
|
||||
"\n",
|
||||
"This step can be changed to point to your blob container by providing your own `datastore_name`, `container_name`, and `account_name`."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"from azureml.core.datastore import Datastore\n",
|
||||
"\n",
|
||||
"account_name = \"pipelinedata\"\n",
|
||||
"datastore_name=\"iris_datastore_data\"\n",
|
||||
"container_name=\"sampledata\"\n",
|
||||
"\n",
|
||||
"iris_data = Datastore.register_azure_blob_container(ws, \n",
|
||||
" datastore_name=datastore_name, \n",
|
||||
" container_name= container_name, \n",
|
||||
" account_name=account_name, \n",
|
||||
" overwrite=True)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"### Create a TabularDataset\n",
|
||||
"A [TabularDataSet](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.tabulardataset?view=azure-ml-py) references single or multiple files which contain data in a tabular structure (ie like CSV files) in your datastores or public urls. TabularDatasets provides you with the ability to download or mount the files to your compute. By creating a dataset, you create a reference to the data source location. If you applied any subsetting transformations to the dataset, they will be stored in the dataset as well. The data remains in its existing location, so no extra storage cost is incurred."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"from azureml.core.dataset import Dataset\n",
|
||||
"\n",
|
||||
"iris_ds_name = 'iris_data'\n",
|
||||
"\n",
|
||||
"path_on_datastore = iris_data.path('iris/')\n",
|
||||
"input_iris_ds = Dataset.Tabular.from_delimited_files(path=path_on_datastore, validate=False)\n",
|
||||
"registered_iris_ds = input_iris_ds.register(ws, iris_ds_name, create_new_version=True)\n",
|
||||
"named_iris_ds = registered_iris_ds.as_named_input(iris_ds_name)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"### Intermediate/Output Data\n",
|
||||
"Intermediate data (or output of a Step) is represented by [PipelineData](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipelinedata?view=azure-ml-py) object. PipelineData can be produced by one step and consumed in another step by providing the PipelineData object as an output of one step and the input of one or more steps.\n",
|
||||
"\n",
|
||||
"**Constructing PipelineData**\n",
|
||||
"- name: [Required] Name of the data item within the pipeline graph\n",
|
||||
"- datastore_name: Name of the Datastore to write this output to\n",
|
||||
"- output_name: Name of the output\n",
|
||||
"- output_mode: Specifies \"upload\" or \"mount\" modes for producing output (default: mount)\n",
|
||||
"- output_path_on_compute: For \"upload\" mode, the path to which the module writes this output during execution\n",
|
||||
"- output_overwrite: Flag to overwrite pre-existing data"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"from azureml.pipeline.core import PipelineData\n",
|
||||
"\n",
|
||||
"datastore = ws.get_default_datastore()\n",
|
||||
"output_folder = PipelineData(name='inferences', datastore=datastore)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"## Registering the Model with the Workspace\n",
|
||||
"Get the pretrained model from a publicly available Azure Blob container, then register it to use in your workspace"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"model_container_name=\"iris-model\"\n",
|
||||
"model_datastore_name=\"iris_model_datastore\"\n",
|
||||
"\n",
|
||||
"model_datastore = Datastore.register_azure_blob_container(ws, \n",
|
||||
" datastore_name=model_datastore_name, \n",
|
||||
" container_name= model_container_name, \n",
|
||||
" account_name=account_name, \n",
|
||||
" overwrite=True)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"from azureml.core.model import Model\n",
|
||||
"\n",
|
||||
"model_datastore.download('iris_model.pkl')\n",
|
||||
"\n",
|
||||
"# register downloaded model\n",
|
||||
"model = Model.register(model_path = \"iris_model.pkl/iris_model.pkl\",\n",
|
||||
" model_name = \"iris\", # this is the name the model is registered as\n",
|
||||
" tags = {'pretrained': \"iris\"},\n",
|
||||
" workspace = ws)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"### Using your model to make batch predictions\n",
|
||||
"To use the model to make batch predictions, you need an **entry script** and a list of **dependencies**:\n",
|
||||
"\n",
|
||||
"#### An entry script\n",
|
||||
"This script accepts requests, scores the requests by using the model, and returns the results.\n",
|
||||
"- __init()__ - Typically this function loads the model into a global object. This function is run only once at the start of batch processing per worker node/process. init method can make use of following environment variables (ParallelRunStep input):\n",
|
||||
" 1.\tAZUREML_BI_OUTPUT_PATH \u00e2\u20ac\u201c output folder path\n",
|
||||
"- __run(mini_batch)__ - The method to be parallelized. Each invocation will have one minibatch.<BR>\n",
|
||||
"__mini_batch__: Batch inference will invoke run method and pass either a list or Pandas DataFrame as an argument to the method. Each entry in min_batch will be - a filepath if input is a FileDataset, a Pandas DataFrame if input is a TabularDataset.<BR>\n",
|
||||
"__run__ method response: run() method should return a Pandas DataFrame or an array. For append_row output_action, these returned elements are appended into the common output file. For summary_only, the contents of the elements are ignored. For all output actions, each returned output element indicates one successful inference of input element in the input mini-batch.\n",
|
||||
" User should make sure that enough data is included in inference result to map input to inference. Inference output will be written in output file and not guaranteed to be in order, user should use some key in the output to map it to input.\n",
|
||||
" \n",
|
||||
"\n",
|
||||
"#### Dependencies\n",
|
||||
"Helper scripts or Python/Conda packages required to run the entry script or model.\n",
|
||||
"\n",
|
||||
"The deployment configuration for the compute target that hosts the deployed model. This configuration describes things like memory and CPU requirements needed to run the model.\n",
|
||||
"\n",
|
||||
"These items are encapsulated into an inference configuration and a deployment configuration. The inference configuration references the entry script and other dependencies. You define these configurations programmatically when you use the SDK to perform the deployment. You define them in JSON files when you use the CLI.\n",
|
||||
"\n",
|
||||
"## Print inferencing script"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"scripts_folder = \"Code\"\n",
|
||||
"script_file = \"iris_score.py\"\n",
|
||||
"\n",
|
||||
"# peek at contents\n",
|
||||
"with open(os.path.join(scripts_folder, script_file)) as inference_file:\n",
|
||||
" print(inference_file.read())"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"## Build and run the batch inference pipeline\n",
|
||||
"The data, models, and compute resource are now available. Let's put all these together in a pipeline."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"### Specify the environment to run the script\n",
|
||||
"Specify the conda dependencies for your script. This will allow us to install pip packages as well as configure the inference environment."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"from azureml.core import Environment\n",
|
||||
"from azureml.core.runconfig import CondaDependencies\n",
|
||||
"\n",
|
||||
"predict_conda_deps = CondaDependencies.create(pip_packages=[ \"scikit-learn==0.20.3\" ])\n",
|
||||
"\n",
|
||||
"predict_env = Environment(name=\"predict_environment\")\n",
|
||||
"predict_env.python.conda_dependencies = predict_conda_deps\n",
|
||||
"predict_env.docker.enabled = True\n",
|
||||
"predict_env.spark.precache_packages = False"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"### Create the configuration to wrap the inference script"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"from azureml.contrib.pipeline.steps import ParallelRunStep, ParallelRunConfig\n",
|
||||
"\n",
|
||||
"# In a real-world scenario, you'll want to shape your process per node and nodes to fit your problem domain.\n",
|
||||
"parallel_run_config = ParallelRunConfig(\n",
|
||||
" source_directory=scripts_folder,\n",
|
||||
" entry_script=script_file, # the user script to run against each input\n",
|
||||
" mini_batch_size='5MB',\n",
|
||||
" error_threshold=5,\n",
|
||||
" output_action='append_row',\n",
|
||||
" environment=predict_env,\n",
|
||||
" compute_target=compute_target, \n",
|
||||
" node_count=3,\n",
|
||||
" run_invocation_timeout=600)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"### Create the pipeline step\n",
|
||||
"Create the pipeline step using the script, environment configuration, and parameters. Specify the compute target you already attached to your workspace as the target of execution of the script. We will use ParallelRunStep to create the pipeline step."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"distributed_csv_iris_step = ParallelRunStep(\n",
|
||||
" name='example-iris',\n",
|
||||
" inputs=[named_iris_ds],\n",
|
||||
" output=output_folder,\n",
|
||||
" parallel_run_config=parallel_run_config,\n",
|
||||
" models=[model],\n",
|
||||
" arguments=['--model_name', 'iris'],\n",
|
||||
" allow_reuse=True\n",
|
||||
")"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"### Run the pipeline\n",
|
||||
"At this point you can run the pipeline and examine the output it produced. The Experiment object is used to track the run of the pipeline"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"from azureml.core import Experiment\n",
|
||||
"from azureml.pipeline.core import Pipeline\n",
|
||||
"\n",
|
||||
"pipeline = Pipeline(workspace=ws, steps=[distributed_csv_iris_step])\n",
|
||||
"\n",
|
||||
"pipeline_run = Experiment(ws, 'iris').submit(pipeline)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"# this will output a table with link to the run details in azure portal\n",
|
||||
"pipeline_run"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"## View progress of Pipeline run\n",
|
||||
"\n",
|
||||
"The progress of the pipeline is able to be viewed either through azureml.widgets or a console feed from PipelineRun.wait_for_completion()."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"# GUI\n",
|
||||
"from azureml.widgets import RunDetails\n",
|
||||
"RunDetails(pipeline_run).show() "
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"# Console logs\n",
|
||||
"pipeline_run.wait_for_completion(show_output=True)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"## View Results\n",
|
||||
"In the iris_score.py file above you can see that the Result with the prediction of the iris variety gets returned and then appended to the original input of the row from the csv file. These results are written to the DataStore specified in the PipelineData object as the output data, which in this case is called *inferences*. This contains the outputs from all of the worker nodes used in the compute cluster. You can download this data to view the results ... below just filters to a random 20 rows"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"import pandas as pd\n",
|
||||
"import shutil\n",
|
||||
"\n",
|
||||
"shutil.rmtree(\"iris_results\", ignore_errors=True)\n",
|
||||
"\n",
|
||||
"prediction_run = next(pipeline_run.get_children())\n",
|
||||
"prediction_output = prediction_run.get_output_data(\"inferences\")\n",
|
||||
"prediction_output.download(local_path=\"iris_results\")\n",
|
||||
"\n",
|
||||
"\n",
|
||||
"for root, dirs, files in os.walk(\"iris_results\"):\n",
|
||||
" for file in files:\n",
|
||||
" if file.endswith('parallel_run_step.txt'):\n",
|
||||
" result_file = os.path.join(root,file)\n",
|
||||
"\n",
|
||||
"# cleanup output format\n",
|
||||
"df = pd.read_csv(result_file, delimiter=\" \", header=None)\n",
|
||||
"df.columns = [\"sepal.length\", \"sepal.width\", \"petal.length\", \"petal.width\", \"variety\"]\n",
|
||||
"print(\"Prediction has \", df.shape[0], \" rows\")\n",
|
||||
"\n",
|
||||
"random_subset = df.sample(n=20)\n",
|
||||
"random_subset.head(20)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"## Cleanup compute resources\n",
|
||||
"For re-occurring jobs, it may be wise to keep compute the compute resources and allow compute nodes to scale down to 0. However, since this is just a single run job, we are free to release the allocated compute resources."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"# uncomment below and run if compute resources are no longer needed \n",
|
||||
"# compute_target.delete()"
|
||||
]
|
||||
}
|
||||
],
|
||||
"metadata": {
|
||||
"authors": [
|
||||
{
|
||||
"name": "joringer"
|
||||
},
|
||||
{
|
||||
"name": "asraniwa"
|
||||
},
|
||||
{
|
||||
"name": "pansav"
|
||||
},
|
||||
{
|
||||
"name": "tracych"
|
||||
}
|
||||
],
|
||||
"friendly_name": "IRIS data inferencing using ParallelRunStep",
|
||||
"exclude_from_index": false,
|
||||
"index_order": 1,
|
||||
"category": "Other notebooks",
|
||||
"compute": [
|
||||
"AML Compute"
|
||||
],
|
||||
"datasets": [
|
||||
"IRIS"
|
||||
],
|
||||
"deployment": [
|
||||
"None"
|
||||
],
|
||||
"framework": [
|
||||
"None"
|
||||
],
|
||||
"tags": [
|
||||
"Batch Inferencing",
|
||||
"Pipeline"
|
||||
],
|
||||
"task": "Recognize flower type",
|
||||
"kernelspec": {
|
||||
"display_name": "Python 3.6",
|
||||
"language": "python",
|
||||
"name": "python36"
|
||||
},
|
||||
"language_info": {
|
||||
"codemirror_mode": {
|
||||
"name": "ipython",
|
||||
"version": 3
|
||||
},
|
||||
"file_extension": ".py",
|
||||
"mimetype": "text/x-python",
|
||||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.6.2"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
"nbformat_minor": 2
|
||||
}
|
||||
@@ -1,6 +0,0 @@
|
||||
name: tabular-dataset-inference-iris
|
||||
dependencies:
|
||||
- pip:
|
||||
- azureml-sdk
|
||||
- azureml-contrib-pipeline-steps
|
||||
- azureml-widgets
|
||||
Reference in New Issue
Block a user