Copyright (c) Microsoft Corporation. All rights reserved.

Licensed under the MIT License.

This notebook demonstrates how to run batch scoring job. __[Inception-V3 model](https://arxiv.org/abs/1512.00567)__  and unlabeled images from __[ImageNet](http://image-net.org/)__ dataset will be used. It registers a pretrained inception model in model registry then uses the model to do batch scoring on images in a blob container.

## Prerequisites
Make sure you go through the [00. Installation and Configuration](./00.configuration.ipynb) Notebook first if you haven't.


In [None]:
import os
from azureml.core import Workspace, Run, Experiment

ws = Workspace.from_config()
print('Workspace name: ' + ws.name, 
      'Azure region: ' + ws.location, 
      'Subscription id: ' + ws.subscription_id, 
      'Resource group: ' + ws.resource_group, sep = '\n')

# Also create a Project and attach to Workspace
scripts_folder = "scripts"

if not os.path.isdir(scripts_folder):
    os.mkdir(scripts_folder)

In [None]:
from azureml.core.compute import BatchAiCompute, ComputeTarget
from azureml.core.datastore import Datastore
from azureml.data.data_reference import DataReference
from azureml.pipeline.core import Pipeline, PipelineData
from azureml.pipeline.steps import PythonScriptStep
from azureml.core.runconfig import CondaDependencies, RunConfiguration

## Create and attach Compute targets
Use the below code to create and attach Compute targets. 

In [None]:
# Batch AI compute
cluster_name = "gpu_cluster"
try:
    cluster = BatchAiCompute(ws, cluster_name)
    print("found existing cluster.")
except:
    print("creating new cluster")
    provisioning_config = BatchAiCompute.provisioning_configuration(vm_size = "STANDARD_NC6",
                                                                    autoscale_enabled = True,
                                                                    cluster_min_nodes = 0, 
                                                                    cluster_max_nodes = 1)

    # create the cluster
    cluster = ComputeTarget.create(ws, cluster_name, provisioning_config)
    cluster.wait_for_completion(show_output=True)

# Python scripts to run

Python scripts that run the batch scoring. `batchai_score.py` takes input images in `dataset_path`, pretrained models in `model_dir` and outputs a `results-label.txt` to `output_dir`.

In [None]:
%%writefile $scripts_folder/batchai_score.py
import os
import argparse
import datetime,time
import tensorflow as tf
from math import ceil
import numpy as np
import shutil
from tensorflow.contrib.slim.python.slim.nets import inception_v3
from azureml.core.model import Model

slim = tf.contrib.slim

parser = argparse.ArgumentParser(description="Start a tensorflow model serving")
parser.add_argument('--model_name', dest="model_name", required=True)
parser.add_argument('--label_dir', dest="label_dir", required=True)
parser.add_argument('--dataset_path', dest="dataset_path", required=True)
parser.add_argument('--output_dir', dest="output_dir", required=True)
parser.add_argument('--batch_size', dest="batch_size", type=int, required=True)

args = parser.parse_args()

image_size = 299
num_channel = 3

# create output directory if it does not exist
os.makedirs(args.output_dir, exist_ok=True)

def get_class_label_dict(label_file):
  label = []
  proto_as_ascii_lines = tf.gfile.GFile(label_file).readlines()
  for l in proto_as_ascii_lines:
    label.append(l.rstrip())
  return label


class DataIterator:
    def __init__(self, data_dir):
        self.file_paths = []
        image_list = os.listdir(data_dir)
        total_size = len(image_list)
        self.file_paths = [data_dir + '/' + file_name.rstrip() for file_name in image_list ]

        self.labels = [1 for file_name in self.file_paths]

    @property
    def size(self):
        return len(self.labels)

    def input_pipeline(self, batch_size):
        images_tensor = tf.convert_to_tensor(self.file_paths, dtype=tf.string)
        labels_tensor = tf.convert_to_tensor(self.labels, dtype=tf.int64)
        input_queue = tf.train.slice_input_producer([images_tensor, labels_tensor], shuffle=False)
        labels = input_queue[1]
        images_content = tf.read_file(input_queue[0])

        image_reader = tf.image.decode_jpeg(images_content, channels=num_channel, name="jpeg_reader")
        float_caster = tf.cast(image_reader, tf.float32)
        new_size = tf.constant([image_size, image_size], dtype=tf.int32)
        images = tf.image.resize_images(float_caster, new_size)
        images = tf.divide(tf.subtract(images, [0]), [255])

        image_batch, label_batch = tf.train.batch([images, labels], batch_size=batch_size, capacity=5 * batch_size)
        return image_batch

def main(_):
    start_time = datetime.datetime.now()
    label_file_name = os.path.join(args.label_dir, "labels.txt")
    label_dict = get_class_label_dict(label_file_name)
    classes_num = len(label_dict)
    test_feeder = DataIterator(data_dir=args.dataset_path)
    total_size = len(test_feeder.labels)
    count = 0
    # get model from model registry
    model_path = Model.get_model_path(args.model_name)
    with tf.Session() as sess:
        test_images = test_feeder.input_pipeline(batch_size=args.batch_size)
        with slim.arg_scope(inception_v3.inception_v3_arg_scope()):
            input_images = tf.placeholder(tf.float32, [args.batch_size, image_size, image_size, num_channel])
            logits, _ = inception_v3.inception_v3(input_images,
                                                        num_classes=classes_num,
                                                        is_training=False)
            probabilities = tf.argmax(logits, 1)

        sess.run(tf.global_variables_initializer())
        sess.run(tf.local_variables_initializer())
        coord = tf.train.Coordinator()
        threads = tf.train.start_queue_runners(sess=sess, coord=coord)
        saver = tf.train.Saver()
        saver.restore(sess, model_path)
        out_filename = os.path.join(args.output_dir, "result-labels.txt")
        with open(out_filename, "w") as result_file:
            i = 0
            while count < total_size and not coord.should_stop():
                test_images_batch = sess.run(test_images)
                file_names_batch = test_feeder.file_paths[i*args.batch_size: min(test_feeder.size, (i+1)*args.batch_size)]
                results = sess.run(probabilities, feed_dict={input_images: test_images_batch})
                new_add = min(args.batch_size, total_size-count)
                count += new_add
                i += 1
                for j in range(new_add):
                    result_file.write(os.path.basename(file_names_batch[j]) + ": " + label_dict[results[j]] + "\n")
                result_file.flush()
            coord.request_stop()
            coord.join(threads)
            
        # copy the file to artifacts
        shutil.copy(out_filename, "./outputs/")
        # Move the processed data out of the blob so that the next run can process the data.

if __name__ == "__main__":
    tf.app.run()

## Prepare Model and Input data

### Download Model

Download and extract model from http://download.tensorflow.org/models/inception_v3_2016_08_28.tar.gz to `"models"`

In [None]:
# create directory for model
model_dir = 'models'
if not os.path.isdir(model_dir):
    os.mkdir(model_dir)

In [None]:
import tarfile
import urllib.request

url="http://download.tensorflow.org/models/inception_v3_2016_08_28.tar.gz"
response = urllib.request.urlretrieve(url, "model.tar.gz")
tar = tarfile.open("model.tar.gz", "r:gz")
tar.extractall(model_dir)

### Create a datastore that points to blob container containing sample images

We have created a public blob container `sampledata` on an account named `pipelinedata` containing images from ImageNet evaluation set. In the next step, we create a datastore with name `images_datastore` that points to this container. The `overwrite=True` step overwrites any datastore that was created previously with that name. 

This step can be changed to point to your blob container by providing an additional `account_key` parameter with `account_name`. 

In [None]:
account_name = "pipelinedata"
sample_data = Datastore.register_azure_blob_container(ws, datastore_name="images_datastore", container_name="sampledata", 
                                                        account_name=account_name, 
                                                        overwrite=True)

# Output datastore

We write the outputs to the default datastore

In [None]:
default_ds = ws.get_default_datastore()

# Specify where the data is stored or will be written to

In [None]:
from azureml.core.conda_dependencies import CondaDependencies
from azureml.data.data_reference import DataReference
from azureml.pipeline.core import Pipeline, PipelineData
from azureml.core import Datastore
from azureml.core import Experiment

In [None]:
input_images = DataReference(datastore=sample_data, 
                             data_reference_name="input_images",
                             path_on_datastore="batchscoring/images",
                             mode="download"
                            )
model_dir = DataReference(datastore=sample_data, 
                          data_reference_name="input_model",
                          path_on_datastore="batchscoring/models",
                          mode="download"                          
                         )
label_dir = DataReference(datastore=sample_data, 
                          data_reference_name="input_labels",
                          path_on_datastore="batchscoring/labels",
                          mode="download"                          
                         )
output_dir = PipelineData(name="scores", 
                          datastore_name=default_ds.name, 
                          output_path_on_compute="batchscoring/results")

## Register the model with Workspace

In [None]:
import shutil
from azureml.core.model import Model

# register downloaded model 
model = Model.register(model_path = "models/inception_v3.ckpt",
                       model_name = "inception", # this is the name the model is registered as
                       tags = {'pretrained': "inception"},
                       description = "Imagenet trained tensorflow inception",
                       workspace = ws)
# remove the downloaded dir after registration if you wish
shutil.rmtree("models")

# Specify environment to run the script

In [None]:
cd = CondaDependencies.create(pip_packages=["tensorflow-gpu==1.4.0", "azureml-defaults"])

# Runconfig
batchai_run_config = RunConfiguration(conda_dependencies=cd)
batchai_run_config.environment.docker.enabled = True
batchai_run_config.environment.docker.gpu_support = True
batchai_run_config.environment.docker.base_image = "microsoft/mmlspark:gpu-0.12"
batchai_run_config.environment.spark.precache_packages = False

# Steps to run

A subset of the parameters to the python script can be given as input when we re-run a `PublishedPipeline`. In the current example, we define `batch_size` taken by the script as such parameter.

In [None]:
from azureml.pipeline.core.graph import PipelineParameter
batch_size_param = PipelineParameter(name="param_batch_size", default_value=20)

In [None]:
inception_model_name = "inception_v3.ckpt"

batch_score_step = PythonScriptStep(
    name="batch ai scoring",
    script_name="batchai_score.py",
    arguments=["--dataset_path", input_images, 
               "--model_name", "inception",
               "--label_dir", label_dir, 
               "--output_dir", output_dir, 
               "--batch_size", batch_size_param],
    target=cluster,
    inputs=[input_images, label_dir],
    outputs=[output_dir],
    runconfig=batchai_run_config,
    source_directory=scripts_folder
)

In [None]:
pipeline = Pipeline(workspace=ws, steps=[batch_score_step])
pipeline_run = Experiment(ws, 'batch_scoring').submit(pipeline, pipeline_params={"param_batch_size": 20})

# Monitor run

In [None]:
from azureml.train.widgets import RunDetails
RunDetails(pipeline_run).show()

In [None]:
pipeline_run.wait_for_completion(show_output=True)

# Download and review output

In [None]:
step_run = list(pipeline_run.get_children())[0]
step_run.download_file("./outputs/result-labels.txt")

In [None]:
import pandas as pd
df = pd.read_csv("result-labels.txt", delimiter=":", header=None)
df.columns = ["Filename", "Prediction"]
df.head()

# Publish a pipeline and rerun using a REST call

## Create a published pipeline

In [None]:
published_pipeline = pipeline_run.publish_pipeline(
    name="Inception v3 scoring", description="Batch scoring using Inception v3 model", version="1.0")

published_id = published_pipeline.id

## Rerun using REST call

## Get AAD token

In [None]:
from azureml.core.authentication import AzureCliAuthentication
import requests

cli_auth = AzureCliAuthentication()
aad_token = cli_auth.get_authentication_header()

## Run published pipeline using its REST endpoint

In [None]:
from azureml.pipeline.core import PublishedPipeline

rest_endpoint = PublishedPipeline.get_endpoint(published_id, ws)
# specify batch size when running the pipeline
response = requests.post(rest_endpoint, headers=aad_token, json={"param_batch_size": 50})
run_id = response.json()["Id"]

## Monitor the new run

In [None]:
from azureml.pipeline.core.run import PipelineRun
published_pipeline_run = PipelineRun(ws.experiments()["batch_scoring"], run_id)

RunDetails(published_pipeline_run).show()