Copyright (c) Microsoft Corporation. All rights reserved.  
Licensed under the MIT License.

# Using Azure Machine Learning Pipelines for batch prediction

In this notebook we will demonstrate how to run a batch scoring job using Azure Machine Learning pipelines. Our example job will be to take an already-trained image classification model, and run that model on some unlabeled images. The image classification model that we'll use is the __[Inception-V3 model](https://arxiv.org/abs/1512.00567)__  and we'll run this model on unlabeled images from the __[ImageNet](http://image-net.org/)__ dataset. 

The outline of this notebook is as follows:

- Register the pretrained inception model into the model registry. 
- Store the dataset images in a blob container.
- Use the registered model to do batch scoring on the images in the data blob container.

## Prerequisites
Make sure you go through the configuration Notebook located at https://github.com/Azure/MachineLearningNotebooks first if you haven't. This sets you up with a working config file that has information on your workspace, subscription id, etc. 

In [None]:
from azureml.core import Datastore
from azureml.core import Experiment
from azureml.core.compute import AmlCompute, ComputeTarget
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.datastore import Datastore
from azureml.core.runconfig import CondaDependencies, RunConfiguration
from azureml.data.data_reference import DataReference
from azureml.pipeline.core import Pipeline, PipelineData
from azureml.pipeline.steps import PythonScriptStep

In [None]:
import os
from azureml.core import Workspace, Run, Experiment

ws = Workspace.from_config()
print('Workspace name: ' + ws.name, 
      'Azure region: ' + ws.location, 
      'Subscription id: ' + ws.subscription_id, 
      'Resource group: ' + ws.resource_group, sep = '\n')


## Set up machine learning resources

### Set up datastores
First, let’s access the datastore that has the model, labels, and images. 

### Create a datastore that points to a blob container containing sample images

We have created a public blob container `sampledata` on an account named `pipelinedata`, containing images from the ImageNet evaluation set. In the next step, we create a datastore with the name `images_datastore`, which points to this container. In the call to `register_azure_blob_container` below, setting the `overwrite` flag to `True` overwrites any datastore that was created previously with that name. 

This step can be changed to point to your blob container by providing your own `datastore_name`, `container_name`, and `account_name`.

In [None]:
account_name = "pipelinedata"
datastore_name="images_datastore"
container_name="sampledata"

batchscore_blob = Datastore.register_azure_blob_container(ws, 
                      datastore_name=datastore_name, 
                      container_name= container_name, 
                      account_name=account_name, 
                      overwrite=True)

Next, let’s specify the default datastore for the outputs.

In [None]:
def_data_store = ws.get_default_datastore()

### Configure data references
Now you need to add references to the data, as inputs to the appropriate pipeline steps in your pipeline. A data source in a pipeline is represented by a DataReference object. The DataReference object points to data that lives in, or is accessible from, a datastore. We need DataReference objects corresponding to the following: the directory containing the input images, the directory in which the pretrained model is stored, the directory containing the labels, and the output directory.

In [None]:
input_images = DataReference(datastore=batchscore_blob, 
                             data_reference_name="input_images",
                             path_on_datastore="batchscoring/images",
                             mode="download"
                            )
model_dir = DataReference(datastore=batchscore_blob, 
                          data_reference_name="input_model",
                          path_on_datastore="batchscoring/models",
                          mode="download"                          
                         )
label_dir = DataReference(datastore=batchscore_blob, 
                          data_reference_name="input_labels",
                          path_on_datastore="batchscoring/labels",
                          mode="download"                          
                         )
output_dir = PipelineData(name="scores", 
                          datastore=def_data_store, 
                          output_path_on_compute="batchscoring/results")

### Create and attach Compute targets
Use the below code to create and attach Compute targets. 

In [None]:
import os

# choose a name for your cluster
aml_compute_name = os.environ.get("AML_COMPUTE_NAME", "gpu-cluster")
cluster_min_nodes = os.environ.get("AML_COMPUTE_MIN_NODES", 0)
cluster_max_nodes = os.environ.get("AML_COMPUTE_MAX_NODES", 1)
vm_size = os.environ.get("AML_COMPUTE_SKU", "STANDARD_NC6")


if aml_compute_name in ws.compute_targets:
    compute_target = ws.compute_targets[aml_compute_name]
    if compute_target and type(compute_target) is AmlCompute:
        print('found compute target. just use it. ' + aml_compute_name)
else:
    print('creating a new compute target...')
    provisioning_config = AmlCompute.provisioning_configuration(vm_size = vm_size, # NC6 is GPU-enabled
                                                                vm_priority = 'lowpriority', # optional
                                                                min_nodes = cluster_min_nodes, 
                                                                max_nodes = cluster_max_nodes)

    # create the cluster
    compute_target = ComputeTarget.create(ws, aml_compute_name, provisioning_config)
    
    # can poll for a minimum number of nodes and for a specific timeout. 
    # if no min node count is provided it will use the scale settings for the cluster
    compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)
    
     # For a more detailed view of current Azure Machine Learning Compute  status, use the 'status' property    
    print(compute_target.status.serialize())

## Prepare the Model

### Download the Model

Download and extract the model from http://download.tensorflow.org/models/inception_v3_2016_08_28.tar.gz to `"models"`

In [None]:
# create directory for model
model_dir = 'models'
if not os.path.isdir(model_dir):
    os.mkdir(model_dir)

In [None]:
import tarfile
import urllib.request

url="http://download.tensorflow.org/models/inception_v3_2016_08_28.tar.gz"
response = urllib.request.urlretrieve(url, "model.tar.gz")
tar = tarfile.open("model.tar.gz", "r:gz")
tar.extractall(model_dir)

### Register the model with Workspace

In [None]:
import shutil
from azureml.core.model import Model

# register downloaded model 
model = Model.register(model_path = "models/inception_v3.ckpt",
                       model_name = "inception", # this is the name the model is registered as
                       tags = {'pretrained': "inception"},
                       description = "Imagenet trained tensorflow inception",
                       workspace = ws)
# remove the downloaded dir after registration if you wish
shutil.rmtree("models")

## Write your scoring script

To do the scoring, we use a batch scoring script `batch_scoring.py`, which is located in the same directory that this notebook is in. You can take a look at this script to see how you might modify it for your custom batch scoring task.

The python script `batch_scoring.py` takes input images, applies the image classification model to these images, and outputs a classification result to a results file.

The script `batch_scoring.py` takes the following parameters:

- `--model_name`: the name of the model being used, which is expected to be in the `model_dir` directory
- `--label_dir` : the directory holding the `labels.txt` file 
- `--dataset_path`: the directory containing the input images
- `--output_dir` : the script will run the model on the data and output a `results-label.txt` to this directory
- `--batch_size` : the batch size used in running the model.


## Build and run the batch scoring pipeline
You have everything you need to build the pipeline. Let’s put all these together.

###  Specify the environment to run the script
Specify the conda dependencies for your script. You will need this object when you create the pipeline step later on.

In [None]:
from azureml.core.runconfig import DEFAULT_GPU_IMAGE

cd = CondaDependencies.create(pip_packages=["tensorflow-gpu==1.10.0", "azureml-defaults"])

# Runconfig
amlcompute_run_config = RunConfiguration(conda_dependencies=cd)
amlcompute_run_config.environment.docker.enabled = True
amlcompute_run_config.environment.docker.gpu_support = True
amlcompute_run_config.environment.docker.base_image = DEFAULT_GPU_IMAGE
amlcompute_run_config.environment.spark.precache_packages = False

### Specify the parameters for your pipeline
A subset of the parameters to the python script can be given as input when we re-run a `PublishedPipeline`. In the current example, we define `batch_size` taken by the script as such parameter.

In [None]:
from azureml.pipeline.core.graph import PipelineParameter
batch_size_param = PipelineParameter(name="param_batch_size", default_value=20)

### Create the pipeline step
Create the pipeline step using the script, environment configuration, and parameters. Specify the compute target you already attached to your workspace as the target of execution of the script. We will use PythonScriptStep to create the pipeline step.

In [None]:
inception_model_name = "inception_v3.ckpt"

batch_score_step = PythonScriptStep(
    name="batch_scoring",
    script_name="batch_scoring.py",
    arguments=["--dataset_path", input_images, 
               "--model_name", "inception",
               "--label_dir", label_dir, 
               "--output_dir", output_dir, 
               "--batch_size", batch_size_param],
    compute_target=compute_target,
    inputs=[input_images, label_dir],
    outputs=[output_dir],
    runconfig=amlcompute_run_config
)

### Run the pipeline
At this point you can run the pipeline and examine the output it produced. 

In [None]:
pipeline = Pipeline(workspace=ws, steps=[batch_score_step])
pipeline_run = Experiment(ws, 'batch_scoring').submit(pipeline, pipeline_params={"param_batch_size": 20})

### Monitor the run

In [None]:
from azureml.widgets import RunDetails
RunDetails(pipeline_run).show()

In [None]:
pipeline_run.wait_for_completion(show_output=True)

### Download and review output

In [None]:
step_run = list(pipeline_run.get_children())[0]
step_run.download_file("./outputs/result-labels.txt")

In [None]:
import pandas as pd
df = pd.read_csv("result-labels.txt", delimiter=":", header=None)
df.columns = ["Filename", "Prediction"]
df.head()

## Publish a pipeline and rerun using a REST call

### Create a published pipeline
Once you are satisfied with the outcome of the run, you can publish the pipeline to run it with different input values later. When you publish a pipeline, you will get a REST endpoint that accepts invoking of the pipeline with the set of parameters you have already incorporated above using PipelineParameter.

In [None]:
published_pipeline = pipeline_run.publish_pipeline(
    name="Inception_v3_scoring", description="Batch scoring using Inception v3 model", version="1.0")

published_id = published_pipeline.id

## Rerun the pipeline using the REST endpoint

### Get AAD token

In [None]:
from azureml.core.authentication import AzureCliAuthentication
import requests

cli_auth = AzureCliAuthentication()
aad_token = cli_auth.get_authentication_header()

### Run published pipeline

In [None]:
from azureml.pipeline.core import PublishedPipeline

rest_endpoint = published_pipeline.endpoint
# specify batch size when running the pipeline
response = requests.post(rest_endpoint, 
                         headers=aad_token, 
                         json={"ExperimentName": "batch_scoring",
                               "ParameterAssignments": {"param_batch_size": 50}})
run_id = response.json()["Id"]

### Monitor the new run

In [None]:
from azureml.pipeline.core.run import PipelineRun
published_pipeline_run = PipelineRun(ws.experiments["batch_scoring"], run_id)

RunDetails(published_pipeline_run).show()