Copyright (c) Microsoft Corporation. All rights reserved.

Licensed under the MIT License.

# Batch Predictions for an Image Classification model trained using AutoML
In this notebook, we go over how you can use [Azure Machine Learning pipelines](https://docs.microsoft.com/en-us/azure/machine-learning/tutorial-pipeline-batch-scoring-classification) to run a batch scoring image classification job.

**Please note:** For this notebook you can use an existing image classification model trained using AutoML for Images or use the simple model training we included below for convenience. For detailed instructions on how to train an image classification model with AutoML, please refer to the official [documentation](https://docs.microsoft.com/en-us/azure/machine-learning/how-to-auto-train-image-models) and to the [image classification multiclass notebook](https://github.com/Azure/azureml-examples/blob/main/python-sdk/tutorials/automl-with-azureml/image-classification-multiclass/auto-ml-image-classification-multiclass.ipynb).

**Important:** This feature is currently in public preview. This preview version is provided without a service-level agreement. Certain features might not be supported or might have constrained capabilities. For more information, see [Supplemental Terms of Use for Microsoft Azure Previews](https://azure.microsoft.com/en-us/support/legal/preview-supplemental-terms/).

## Environment Setup
Please follow the ["Setup a new conda environment"](https://github.com/Azure/azureml-examples/tree/main/python-sdk/tutorials/automl-with-azureml#3-setup-a-new-conda-environment) instructions to get started.

In [None]:
import azureml.core

print("This notebook was created using version 1.35.0 of the Azure ML SDK.")
print("You are currently using version", azureml.core.VERSION, "of the Azure ML SDK.")
assert (
    azureml.core.VERSION >= "1.35"
), "Please upgrade the Azure ML SDK by running '!pip install --upgrade azureml-sdk' then restart the kernel."

## You will perform the following tasks:

* Register a Model already trained using AutoML for Image Classification.
* Create an Inference Dataset.
* Provision compute targets and create a Batch Scoring script.
* Use ParallelRunStep to do batch scoring.
* Build, run, and publish a pipeline.
* Enable a REST endpoint for the pipeline.

## Workspace setup

An [Azure ML Workspace](https://docs.microsoft.com/en-us/azure/machine-learning/concept-azure-machine-learning-architecture#workspace) is an Azure resource that organizes and coordinates the actions of many other Azure resources to assist in executing and sharing machine learning workflows. In particular, an Azure ML Workspace coordinates storage, databases, and compute resources providing added functionality for machine learning experimentation, deployment, inference, and the monitoring of deployed models.

Create an Azure ML Workspace within your Azure subscription or load an existing workspace.

In [None]:
from azureml.core.workspace import Workspace

ws = Workspace.from_config()

## Workspace default datastore is used to store inference input images and outputs

In [None]:
def_data_store = ws.get_default_datastore()

## Compute target setup
You will need to provide a [Compute Target](https://docs.microsoft.com/en-us/azure/machine-learning/concept-azure-machine-learning-architecture#computes) that will be used for your AutoML model training. AutoML models for image tasks require [GPU SKUs](https://docs.microsoft.com/en-us/azure/virtual-machines/sizes-gpu) such as the ones from the NC, NCv2, NCv3, ND, NDv2 and NCasT4 series. We recommend using the NCsv3-series (with v100 GPUs) for faster training. Using a compute target with a multi-GPU VM SKU will leverage the multiple GPUs to speed up training. Additionally, setting up a compute target with multiple nodes will allow for faster model training by leveraging parallelism, when tuning hyperparameters for your model.

In [None]:
from azureml.core.compute import AmlCompute, ComputeTarget

cluster_name = "gpu-cluster-nc6"

try:
    compute_target = ws.compute_targets[cluster_name]
    print("Found existing compute target.")
except KeyError:
    print("Creating a new compute target...")
    compute_config = AmlCompute.provisioning_configuration(
        vm_size="Standard_NC6",
        idle_seconds_before_scaledown=600,
        min_nodes=0,
        max_nodes=4,
    )
    compute_target = ComputeTarget.create(ws, cluster_name, compute_config)
# Can poll for a minimum number of nodes and for a specific timeout.
# If no min_node_count is provided, it will use the scale settings for the cluster.
compute_target.wait_for_completion(
    show_output=True, min_node_count=None, timeout_in_minutes=20
)

## Train an Image Classification model

In this section we will do a quick model train to use for the batch scoring. For a datailed example on how to train an image classification model, please refer to the official [documentation](https://docs.microsoft.com/en-us/azure/machine-learning/how-to-auto-train-image-models) or to the [image classification multiclass notebook](https://github.com/Azure/azureml-examples/blob/main/python-sdk/tutorials/automl-with-azureml/image-classification-multiclass/auto-ml-image-classification-multiclass.ipynb). If you already have a model trained in the same workspace, you can skip to section ["Create data objects"](#Create-data-objects).

#### Experiment Setup

In [None]:
from azureml.core import Experiment

experiment_name = "automl-image-batchscoring"
experiment = Experiment(ws, name=experiment_name)

#### Download dataset with input Training Data

All images in this notebook are hosted in [this repository](https://github.com/microsoft/computervision-recipes) and are made available under the [MIT license](https://github.com/microsoft/computervision-recipes/blob/master/LICENSE).

In [None]:
import os
import urllib
from zipfile import ZipFile

# download data
download_url = "https://cvbp-secondary.z19.web.core.windows.net/datasets/image_classification/fridgeObjects.zip"
data_file = "./fridgeObjects.zip"
urllib.request.urlretrieve(download_url, filename=data_file)

# extract files
with ZipFile(data_file, "r") as zip:
    print("extracting files...")
    zip.extractall()
    print("done")
# delete zip file
os.remove(data_file)

#### Convert the downloaded data to JSONL

In [None]:
import json
import os

src = "./fridgeObjects/"
train_validation_ratio = 5

# Retrieving default datastore that got automatically created when we setup a workspace
workspaceblobstore = ws.get_default_datastore().name

# Path to the training and validation files
train_annotations_file = os.path.join(src, "train_annotations.jsonl")
validation_annotations_file = os.path.join(src, "validation_annotations.jsonl")

# sample json line dictionary
json_line_sample = {
    "image_url": "AmlDatastore://"
    + workspaceblobstore
    + "/"
    + os.path.basename(os.path.dirname(src)),
    "label": "",
}

index = 0
# Scan each sub directary and generate jsonl line
with open(train_annotations_file, "w") as train_f:
    with open(validation_annotations_file, "w") as validation_f:
        for className in os.listdir(src):
            subDir = src + className
            if not os.path.isdir(subDir):
                continue
            # Scan each sub directary
            print("Parsing " + subDir)
            for image in os.listdir(subDir):
                json_line = dict(json_line_sample)
                json_line["image_url"] += f"/{className}/{image}"
                json_line["label"] = className

                if index % train_validation_ratio == 0:
                    # validation annotation
                    validation_f.write(json.dumps(json_line) + "\n")
                else:
                    # train annotation
                    train_f.write(json.dumps(json_line) + "\n")
                index += 1

#### Upload the JSONL file and images to Datastore

In [None]:
# Retrieving default datastore that got automatically created when we setup a workspace
ds = ws.get_default_datastore()
ds.upload(src_dir="./fridgeObjects", target_path="fridgeObjects")

#### Create and register datasets in workspace

In [None]:
from azureml.core import Dataset
from azureml.data import DataType

# get existing training dataset
training_dataset_name = "fridgeObjectsTrainingDataset"
if training_dataset_name in ws.datasets:
    training_dataset = ws.datasets.get(training_dataset_name)
    print("Found the training dataset", training_dataset_name)
else:
    # create training dataset
    training_dataset = Dataset.Tabular.from_json_lines_files(
        path=ds.path("fridgeObjects/train_annotations.jsonl"),
        set_column_types={"image_url": DataType.to_stream(ds.workspace)},
    )
    training_dataset = training_dataset.register(
        workspace=ws, name=training_dataset_name
    )
# get existing validation dataset
validation_dataset_name = "fridgeObjectsValidationDataset"
if validation_dataset_name in ws.datasets:
    validation_dataset = ws.datasets.get(validation_dataset_name)
    print("Found the validation dataset", validation_dataset_name)
else:
    # create validation dataset
    validation_dataset = Dataset.Tabular.from_json_lines_files(
        path=ds.path("fridgeObjects/validation_annotations.jsonl"),
        set_column_types={"image_url": DataType.to_stream(ds.workspace)},
    )
    validation_dataset = validation_dataset.register(
        workspace=ws, name=validation_dataset_name
    )
print("Training dataset name: " + training_dataset.name)
print("Validation dataset name: " + validation_dataset.name)

#### Submit training 1 training run with default hyperparameters

In [None]:
from azureml.automl.core.shared.constants import ImageTask
from azureml.train.automl import AutoMLImageConfig
from azureml.train.hyperdrive import GridParameterSampling, choice

image_config_vit = AutoMLImageConfig(
    task=ImageTask.IMAGE_CLASSIFICATION,
    compute_target=compute_target,
    training_data=training_dataset,
    validation_data=validation_dataset,
    hyperparameter_sampling=GridParameterSampling({"model_name": choice("vitb16r224")}),
    iterations=1,
)

In [None]:
automl_image_run = experiment.submit(image_config_vit)

In [None]:
automl_image_run.wait_for_completion(wait_post_processing=True)

## Create data objects

When building pipelines, `Dataset` objects are used for reading data from workspace datastores, and `PipelineData` objects are used for transferring intermediate data between pipeline steps.

This batch scoring example only uses one pipeline step, but in use-cases with multiple steps, the typical flow will include:

1. Using `Dataset` objects as inputs to fetch raw data, performing some transformations, then output a `PipelineData` object. 
1. Use the previous step's `PipelineData` **output object** as an **input object**, repeated for subsequent steps.

For this scenario you create `Dataset` objects corresponding to the datastore directories for the input images. You also create a `PipelineData` object for the batch scoring output data. An object reference in the `outputs` array becomes available as an **input** for a subsequent pipeline step, for scenarios where there is more than one step. In this case we are just going to build a single step pipeline.

It is assumed that an image classification training run was already performed in this workspace and the files are already in the datastore. If this is not the case, please refer to the [documentation](https://docs.microsoft.com/en-us/azure/machine-learning/how-to-auto-train-image-models) to know how to train an image classification model with AutoML.

All images in this notebook are hosted in [this repository](https://github.com/microsoft/computervision-recipes) and are made available under the [MIT license](https://github.com/microsoft/computervision-recipes/blob/master/LICENSE).

In [None]:
from azureml.core.dataset import Dataset
from azureml.pipeline.core import PipelineData

input_images = Dataset.File.from_files((def_data_store, "fridgeObjects/**/*.jpg"))

output_dir = PipelineData(name="scores", datastore=def_data_store)

Next, we need to register the input datasets for batch scoring with the workspace.

In [None]:
input_images = input_images.register(
    workspace=ws, name="fridgeObjects_scoring_images", create_new_version=True
)

## Retrieve the environment and metrics from the training run

In [None]:
from azureml.core.experiment import Experiment
from azureml.core import Run

experiment_name = "automl-image-batchscoring"
# If your model was not trained with this notebook, replace the id below
# with the run id of the child training run (i.e., the one ending with HD_0)
training_run_id = automl_image_run.id + "_HD_0"
exp = Experiment(ws, experiment_name)
training_run = Run(exp, training_run_id)

# The below will give only the requested metric
metrics = training_run.get_metrics("accuracy")
best_metric = max(metrics["accuracy"])
print("best_metric:", best_metric)

# Retrieve the training environment
env = training_run.get_environment()
print(env)

### Register model with metric and environment tags

Now you register the model to your workspace, which allows you to easily retrieve it in the pipeline process. In the `register()` static function, the `model_name` parameter is the key you use to locate your model throughout the SDK.
Tag the model with the metrics and the environment used to train the model.

In [None]:
from azureml.core.model import Model

tags = dict()
tags["accuracy"] = best_metric
tags["env_name"] = env.name
tags["env_version"] = env.version

model_name = "fridgeObjectsClassifier"
model = training_run.register_model(
    model_name=model_name, model_path="train_artifacts", tags=tags
)

In [None]:
# List the models from the workspace
models = Model.list(ws, name=model_name, latest=True)
print(model.name)
print(model.tags)

## Write a scoring script

To do the scoring, you create a batch scoring script `batch_scoring.py`, and write it to the scripts folder in current directory. The script takes a minibatch of input images, applies the classification model, and outputs the predictions to a results file.

The script `batch_scoring.py` takes the following parameters, which get passed from the `ParallelRunStep` that you create later:

- `--model_name`: the name of the model being used

While creating the batch scoring script, refer to the scoring scripts generated under the outputs folder of the Automl training runs. This will help to identify the right model settings to be used in the batch scoring script init method while loading the model.
Note: The batch scoring script we generate in the subsequent step is different from the scoring script generated by the training runs in the below screenshot. We refer to it just to identify the right model settings to be used in the batch scoring script.

![Training run outputs](ui_outputs.PNG "Training run outputs")

In [None]:
# View the batch scoring script. Use the model settings as appropriate for your model.
with open("./scripts/batch_scoring.py", "r") as f:
    print(f.read())

## Build and run the pipeline

### Create the parallel-run configuration to wrap the inference script
Create the pipeline run configuration specifying the script, environment configuration, and parameters. Specify the compute target you already attached to your workspace as the target of execution of the script. This will set the run configuration of the ParallelRunStep we will define next.

Refer this [site](https://github.com/Azure/MachineLearningNotebooks/tree/master/how-to-use-azureml/machine-learning-pipelines/parallel-run) for more details on ParallelRunStep of Azure Machine Learning Pipelines.

In [None]:
from azureml.pipeline.steps import ParallelRunConfig

parallel_run_config = ParallelRunConfig(
    environment=env,
    entry_script="batch_scoring.py",
    source_directory="scripts",
    output_action="append_row",
    append_row_file_name="parallel_run_step.txt",
    mini_batch_size="20",  # Num files to process in one call
    error_threshold=1,
    compute_target=compute_target,
    process_count_per_node=2,
    node_count=1,
)

### Create the pipeline step

A pipeline step is an object that encapsulates everything you need for running a pipeline including:

* environment and dependency settings
* the compute resource to run the pipeline on
* input and output data, and any custom parameters
* reference to a script to run during the step

There are multiple classes that inherit from the parent class [`PipelineStep`](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/?view=azure-ml-py) to assist with building a step using certain frameworks and stacks. In this example, you use the [`ParallelRunStep`](https://docs.microsoft.com/en-us/python/api/azureml-contrib-pipeline-steps/azureml.contrib.pipeline.steps.parallelrunstep?view=azure-ml-py) class to define your step logic using a scoring script. `ParallelRunStep` executes the script in a distributed fashion.

The pipelines infrastructure uses the `ArgumentParser` class to pass parameters into pipeline steps. For example, in the code below the first argument `--model_name` is given the property identifier `model_name`. In the `main()` function, this property is accessed using `Model.get_model_path(args.model_name)`.

Note: The pipeline in this tutorial only has one step and writes the output to a file, but for multi-step pipelines, you also use `ArgumentParser` to define a directory to write output data for input to subsequent steps. See the [notebook](https://github.com/Azure/MachineLearningNotebooks/blob/master/how-to-use-azureml/machine-learning-pipelines/nyc-taxi-data-regression-model-building/nyc-taxi-data-regression-model-building.ipynb) for an example of passing data between multiple pipeline steps using the `ArgumentParser` design pattern.

In [None]:
from azureml.pipeline.steps import ParallelRunStep
from datetime import datetime

parallel_step_name = "batchscoring-" + datetime.now().strftime("%Y%m%d%H%M")

arguments = ["--model_name", model_name]

# Specify inference batch_size, otherwise uses default value. (This is different from the mini_batch_size above)
# NOTE: Large batch sizes may result in OOM errors.
# arguments = arguments + ["--batch_size", "20"]

batch_score_step = ParallelRunStep(
    name=parallel_step_name,
    inputs=[input_images.as_named_input("input_images")],
    output=output_dir,
    arguments=arguments,
    parallel_run_config=parallel_run_config,
    allow_reuse=False,
)

For a list of all classes for different step types, see the [steps package](https://docs.microsoft.com/python/api/azureml-pipeline-steps/azureml.pipeline.steps?view=azure-ml-py).

### Run the pipeline

Now you run the pipeline. First create a `Pipeline` object with your workspace reference and the pipeline step you created. The `steps` parameter is an array of steps, and in this case, there is only one step for batch scoring. To build pipelines with multiple steps, you place the steps in order in this array.

Next use the `Experiment.submit()` function to submit the pipeline for execution. You also specify the custom parameter `param_batch_size`. The `wait_for_completion` function will output logs during the pipeline build process, which allows you to see current progress.

Note: The first pipeline run takes roughly **15 minutes**, as all dependencies must be downloaded, a Docker image is created, and the Python environment is provisioned/created. Running it again takes significantly less time as those resources are reused. However, total run time depends on the workload of your scripts and processes running in each pipeline step.

In [None]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline

pipeline = Pipeline(workspace=ws, steps=[batch_score_step])
pipeline_run = Experiment(ws, "batch_scoring_automl_image").submit(pipeline)

In [None]:
# This will output information of the pipeline run, including the link to the details page of portal.
pipeline_run

In [None]:
# Wait the run for completion and show output log to console
pipeline_run.wait_for_completion(show_output=True)

### Download and review output

In [None]:
import tempfile
import os

batch_run = pipeline_run.find_step_run(batch_score_step.name)[0]
batch_output = batch_run.get_output_data(output_dir.name)

target_dir = tempfile.mkdtemp()
batch_output.download(local_path=target_dir)
result_file = os.path.join(
    target_dir, batch_output.path_on_datastore, parallel_run_config.append_row_file_name
)
result_file

# Print the first five lines of the output
with open(result_file) as f:
    for x in range(5):
        print(next(f))

## Choose a random file for visualization

In [None]:
import random
import json

with open(result_file, "r") as f:
    contents = f.readlines()
rand_file = contents[random.randrange(len(contents))]
prediction = json.loads(rand_file)
print(prediction["filename"])
print(prediction["probs"])
print(prediction["labels"])

In [None]:
# Download the image file from the datastore
path = (
    "fridgeObjects"
    + "/"
    + prediction["filename"].split("/")[-2]
    + "/"
    + prediction["filename"].split("/")[-1]
)
path_on_datastore = def_data_store.path(path)
single_image_ds = Dataset.File.from_files(path=path_on_datastore, validate=False)
image = single_image_ds.download()[0]

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
from PIL import Image
import numpy as np
import json

IMAGE_SIZE = (18, 12)
plt.figure(figsize=IMAGE_SIZE)
img_np = mpimg.imread(image)
img = Image.fromarray(img_np.astype("uint8"), "RGB")
x, y = img.size

fig, ax = plt.subplots(1, figsize=(15, 15))
# Display the image
ax.imshow(img_np)

label_index = np.argmax(prediction["probs"])
label = prediction["labels"][label_index]
conf_score = prediction["probs"][label_index]

display_text = "{} ({})".format(label, round(conf_score, 3))
print(display_text)

color = "red"
plt.text(30, 30, display_text, color=color, fontsize=30)

plt.show()

## Publish and run from REST endpoint

Run the following code to publish the pipeline to your workspace. In your workspace in the portal, you can see metadata for the pipeline including run history and durations. You can also run the pipeline manually from the portal.

Additionally, publishing the pipeline enables a REST endpoint to rerun the pipeline from any HTTP library on any platform.

In [None]:
published_pipeline = pipeline_run.publish_pipeline(
    name="automl-image-batch-scoring",
    description="Batch scoring using Automl for Image",
    version="1.0",
)

published_pipeline

To run the pipeline from the REST endpoint, you first need an OAuth2 Bearer-type authentication header. This example uses interactive authentication for illustration purposes, but for most production scenarios requiring automated or headless authentication, use service principal authentication as [described in this notebook](https://aka.ms/pl-restep-auth).

Service principal authentication involves creating an **App Registration** in **Azure Active Directory**, generating a client secret, and then granting your service principal **role access** to your machine learning workspace. You then use the [`ServicePrincipalAuthentication`](https://docs.microsoft.com/python/api/azureml-core/azureml.core.authentication.serviceprincipalauthentication?view=azure-ml-py) class to manage your auth flow.

Both `InteractiveLoginAuthentication` and `ServicePrincipalAuthentication` inherit from `AbstractAuthentication`, and in both cases you use the `get_authentication_header()` function in the same way to fetch the header.

In [None]:
from azureml.core.authentication import InteractiveLoginAuthentication

interactive_auth = InteractiveLoginAuthentication()
auth_header = interactive_auth.get_authentication_header()

Get the REST url from the `endpoint` property of the published pipeline object. You can also find the REST url in your workspace in the portal. Build an HTTP POST request to the endpoint, specifying your authentication header. Additionally, add a JSON payload object with the experiment name and the batch size parameter. As a reminder, the `process_count_per_node` is passed through to `ParallelRunStep` because you defined it is defined as a `PipelineParameter` object in the step configuration.

Make the request to trigger the run. Access the `Id` key from the response dictionary to get the value of the run id.

In [None]:
import requests

rest_endpoint = published_pipeline.endpoint
response = requests.post(
    rest_endpoint,
    headers=auth_header,
    json={
        "ExperimentName": "batch_scoring",
        "ParameterAssignments": {"process_count_per_node": 2},
    },
)

In [None]:
try:
    response.raise_for_status()
except Exception:
    raise Exception(
        "Received bad response from the endpoint: {}\n"
        "Response Code: {}\n"
        "Headers: {}\n"
        "Content: {}".format(
            rest_endpoint, response.status_code, response.headers, response.content
        )
    )
run_id = response.json().get("Id")
print("Submitted pipeline run: ", run_id)

Use the run id to monitor the status of the new run. This will take another 10-15 min to run and will look similar to the previous pipeline run, so if you don't need to see another pipeline run, you can skip watching the full output.

In [None]:
from azureml.pipeline.core.run import PipelineRun

published_pipeline_run = PipelineRun(ws.experiments["batch_scoring"], run_id)
published_pipeline_run

In [None]:
# Wait the run for completion and show output log to console
published_pipeline_run.wait_for_completion(show_output=True)