diff --git a/pipeline/pipeline-batch-scoring.ipynb b/pipeline/pipeline-batch-scoring.ipynb
index 62fd8cea..1cc660d6 100644
--- a/pipeline/pipeline-batch-scoring.ipynb
+++ b/pipeline/pipeline-batch-scoring.ipynb
@@ -1,622 +1,640 @@
{
- "cells": [
- {
- "cell_type": "markdown",
- "metadata": {},
- "source": [
- "Copyright (c) Microsoft Corporation. All rights reserved.\n",
- "\n",
- "Licensed under the MIT License."
- ]
- },
- {
- "cell_type": "markdown",
- "metadata": {},
- "source": [
- "This notebook demonstrates how to run batch scoring job. __[Inception-V3 model](https://arxiv.org/abs/1512.00567)__ and unlabeled images from __[ImageNet](http://image-net.org/)__ dataset will be used. It registers a pretrained inception model in model registry then uses the model to do batch scoring on images in a blob container."
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "metadata": {},
- "outputs": [],
- "source": [
- "import os\n",
- "from azureml.core import Workspace, Run, Experiment\n",
- "\n",
- "ws = Workspace.from_config()\n",
- "print('Workspace name: ' + ws.name, \n",
- " 'Azure region: ' + ws.location, \n",
- " 'Subscription id: ' + ws.subscription_id, \n",
- " 'Resource group: ' + ws.resource_group, sep = '\\n')\n",
- "\n",
- "# Also create a Project and attach to Workspace\n",
- "project_folder = \"sample_projects\"\n",
- "run_history_name = project_folder\n",
- "\n",
- "if not os.path.isdir(project_folder):\n",
- " os.mkdir(project_folder)"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "metadata": {},
- "outputs": [],
- "source": [
- "from azureml.core.compute import BatchAiCompute, ComputeTarget\n",
- "from azureml.core.datastore import Datastore\n",
- "from azureml.data.data_reference import DataReference\n",
- "from azureml.pipeline.core import Pipeline, PipelineData\n",
- "from azureml.pipeline.steps import PythonScriptStep\n",
- "from azureml.core.runconfig import CondaDependencies, RunConfiguration"
- ]
- },
- {
- "cell_type": "markdown",
- "metadata": {},
- "source": [
- "## Create and attach Compute targets\n",
- "Use the below code to create and attach Compute targets. "
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "metadata": {},
- "outputs": [],
- "source": [
- "# Batch AI compute\n",
- "cluster_name = \"gpu_cluster\"\n",
- "try:\n",
- " cluster = BatchAiCompute(ws, cluster_name)\n",
- " print(\"found existing cluster.\")\n",
- "except:\n",
- " print(\"creating new cluster\")\n",
- " provisioning_config = BatchAiCompute.provisioning_configuration(vm_size = \"STANDARD_NC6\",\n",
- " autoscale_enabled = True,\n",
- " cluster_min_nodes = 0, \n",
- " cluster_max_nodes = 1)\n",
- "\n",
- " # create the cluster\n",
- " cluster = ComputeTarget.create(ws, cluster_name, provisioning_config)\n",
- " cluster.wait_for_completion(show_output=True)"
- ]
- },
- {
- "cell_type": "markdown",
- "metadata": {},
- "source": [
- "# Python scripts to run"
- ]
- },
- {
- "cell_type": "markdown",
- "metadata": {},
- "source": [
- "Python scripts that run the batch scoring. `batchai_score.py` takes input images in `dataset_path`, pretrained models in `model_dir` and outputs a `results-label.txt` to `output_dir`."
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "metadata": {},
- "outputs": [],
- "source": [
- "%%writefile $project_folder/batchai_score.py\n",
- "import os\n",
- "import argparse\n",
- "import datetime,time\n",
- "import tensorflow as tf\n",
- "from math import ceil\n",
- "import numpy as np\n",
- "import shutil\n",
- "from tensorflow.contrib.slim.python.slim.nets import inception_v3\n",
- "from azureml.core.model import Model\n",
- "\n",
- "slim = tf.contrib.slim\n",
- "\n",
- "parser = argparse.ArgumentParser(description=\"Start a tensorflow model serving\")\n",
- "parser.add_argument('--model_name', dest=\"model_name\", required=True)\n",
- "parser.add_argument('--label_dir', dest=\"label_dir\", required=True)\n",
- "parser.add_argument('--dataset_path', dest=\"dataset_path\", required=True)\n",
- "parser.add_argument('--output_dir', dest=\"output_dir\", required=True)\n",
- "parser.add_argument('--batch_size', dest=\"batch_size\", type=int, required=True)\n",
- "\n",
- "args = parser.parse_args()\n",
- "\n",
- "image_size = 299\n",
- "num_channel = 3\n",
- "\n",
- "# create output directory if it does not exist\n",
- "os.makedirs(args.output_dir, exist_ok=True)\n",
- "\n",
- "def get_class_label_dict(label_file):\n",
- " label = []\n",
- " proto_as_ascii_lines = tf.gfile.GFile(label_file).readlines()\n",
- " for l in proto_as_ascii_lines:\n",
- " label.append(l.rstrip())\n",
- " return label\n",
- "\n",
- "\n",
- "class DataIterator:\n",
- " def __init__(self, data_dir):\n",
- " self.file_paths = []\n",
- " image_list = os.listdir(data_dir)\n",
- " total_size = len(image_list)\n",
- " self.file_paths = [data_dir + '/' + file_name.rstrip() for file_name in image_list ]\n",
- "\n",
- " self.labels = [1 for file_name in self.file_paths]\n",
- "\n",
- " @property\n",
- " def size(self):\n",
- " return len(self.labels)\n",
- "\n",
- " def input_pipeline(self, batch_size):\n",
- " images_tensor = tf.convert_to_tensor(self.file_paths, dtype=tf.string)\n",
- " labels_tensor = tf.convert_to_tensor(self.labels, dtype=tf.int64)\n",
- " input_queue = tf.train.slice_input_producer([images_tensor, labels_tensor], shuffle=False)\n",
- " labels = input_queue[1]\n",
- " images_content = tf.read_file(input_queue[0])\n",
- "\n",
- " image_reader = tf.image.decode_jpeg(images_content, channels=num_channel, name=\"jpeg_reader\")\n",
- " float_caster = tf.cast(image_reader, tf.float32)\n",
- " new_size = tf.constant([image_size, image_size], dtype=tf.int32)\n",
- " images = tf.image.resize_images(float_caster, new_size)\n",
- " images = tf.divide(tf.subtract(images, [0]), [255])\n",
- "\n",
- " image_batch, label_batch = tf.train.batch([images, labels], batch_size=batch_size, capacity=5 * batch_size)\n",
- " return image_batch\n",
- "\n",
- "def main(_):\n",
- " start_time = datetime.datetime.now()\n",
- " label_file_name = os.path.join(args.label_dir, \"labels.txt\")\n",
- " label_dict = get_class_label_dict(label_file_name)\n",
- " classes_num = len(label_dict)\n",
- " test_feeder = DataIterator(data_dir=args.dataset_path)\n",
- " total_size = len(test_feeder.labels)\n",
- " count = 0\n",
- " # get model from model registry\n",
- " model_path = Model.get_model_path(args.model_name)\n",
- " with tf.Session() as sess:\n",
- " test_images = test_feeder.input_pipeline(batch_size=args.batch_size)\n",
- " with slim.arg_scope(inception_v3.inception_v3_arg_scope()):\n",
- " input_images = tf.placeholder(tf.float32, [args.batch_size, image_size, image_size, num_channel])\n",
- " logits, _ = inception_v3.inception_v3(input_images,\n",
- " num_classes=classes_num,\n",
- " is_training=False)\n",
- " probabilities = tf.argmax(logits, 1)\n",
- "\n",
- " sess.run(tf.global_variables_initializer())\n",
- " sess.run(tf.local_variables_initializer())\n",
- " coord = tf.train.Coordinator()\n",
- " threads = tf.train.start_queue_runners(sess=sess, coord=coord)\n",
- " saver = tf.train.Saver()\n",
- " saver.restore(sess, model_path)\n",
- " out_filename = os.path.join(args.output_dir, \"result-labels.txt\")\n",
- " with open(out_filename, \"w\") as result_file:\n",
- " i = 0\n",
- " while count < total_size and not coord.should_stop():\n",
- " test_images_batch = sess.run(test_images)\n",
- " file_names_batch = test_feeder.file_paths[i*args.batch_size: min(test_feeder.size, (i+1)*args.batch_size)]\n",
- " results = sess.run(probabilities, feed_dict={input_images: test_images_batch})\n",
- " new_add = min(args.batch_size, total_size-count)\n",
- " count += new_add\n",
- " i += 1\n",
- " for j in range(new_add):\n",
- " result_file.write(os.path.basename(file_names_batch[j]) + \": \" + label_dict[results[j]] + \"\\n\")\n",
- " result_file.flush()\n",
- " coord.request_stop()\n",
- " coord.join(threads)\n",
- " \n",
- " # copy the file to artifacts\n",
- " shutil.copy(out_filename, \"./outputs/\")\n",
- " # Move the processed data out of the blob so that the next run can process the data.\n",
- "\n",
- "if __name__ == \"__main__\":\n",
- " tf.app.run()"
- ]
- },
- {
- "cell_type": "markdown",
- "metadata": {},
- "source": [
- "## Prepare Model and Input data"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "metadata": {},
- "outputs": [],
- "source": [
- "# create directory for model\n",
- "model_dir = 'models'\n",
- "if not os.path.isdir(model_dir):\n",
- " os.mkdir(model_dir)"
- ]
- },
- {
- "cell_type": "markdown",
- "metadata": {},
- "source": [
- "### Download Model\n",
- "This manual step is required to register the model to the workspace\n",
- "\n",
- "Download and extract model from http://download.tensorflow.org/models/inception_v3_2016_08_28.tar.gz to model_dir"
- ]
- },
- {
- "cell_type": "markdown",
- "metadata": {},
- "source": [
- "### Get samples images and upload to Datastore\n",
- "This manual step is required to run batchai_score.py\n",
- "\n",
- "Download and extract sample images from ImageNet evaluation set and **upload** to a blob that will be registered as a Datastore in the next step\n",
- "\n",
- "A copy of sample images from ImageNet evaluation set can be found at __[BatchAI Samples Blob](https://batchaisamples.blob.core.windows.net/samples/imagenet_samples.zip?st=2017-09-29T18%3A29%3A00Z&se=2099-12-31T08%3A00%3A00Z&sp=rl&sv=2016-05-31&sr=c&sig=PmhL%2BYnYAyNTZr1DM2JySvrI12e%2F4wZNIwCtf7TRI%2BM%3D)__ \n",
- "\n",
- "There are multiple ways to create folders and upload files into Azure Blob Container - you can use __[Azure Portal](https://ms.portal.azure.com/)__, __[Storage Explorer](http://storageexplorer.com/)__, __[Azure CLI2](https://render.githubusercontent.com/azure-cli-extension)__ or Azure SDK for your preferable programming language. "
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "metadata": {},
- "outputs": [],
- "source": [
- "account_name = \"batchscoringdata\"\n",
- "sample_data = Datastore.register_azure_blob_container(ws, \"sampledata\", \"sampledata\", \n",
- " account_name=account_name, \n",
- " overwrite=True)"
- ]
- },
- {
- "cell_type": "markdown",
- "metadata": {},
- "source": [
- "# Output datastore"
- ]
- },
- {
- "cell_type": "markdown",
- "metadata": {},
- "source": [
- "We write the outputs to the default datastore"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "metadata": {},
- "outputs": [],
- "source": [
- "default_ds = \"workspaceblobstore\""
- ]
- },
- {
- "cell_type": "markdown",
- "metadata": {},
- "source": [
- "# Specify where the data is stored or will be written to"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "metadata": {},
- "outputs": [],
- "source": [
- "from azureml.core.conda_dependencies import CondaDependencies\n",
- "from azureml.data.data_reference import DataReference\n",
- "from azureml.pipeline.core import Pipeline, PipelineData\n",
- "from azureml.core import Datastore\n",
- "from azureml.core import Experiment"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "metadata": {},
- "outputs": [],
- "source": [
- "input_images = DataReference(datastore=sample_data, \n",
- " data_reference_name=\"input_images\",\n",
- " path_on_datastore=\"batchscoring/images\",\n",
- " mode=\"download\"\n",
- " )\n",
- "model_dir = DataReference(datastore=sample_data, \n",
- " data_reference_name=\"input_model\",\n",
- " path_on_datastore=\"batchscoring/models\",\n",
- " mode=\"download\" \n",
- " )\n",
- "label_dir = DataReference(datastore=sample_data, \n",
- " data_reference_name=\"input_labels\",\n",
- " path_on_datastore=\"batchscoring/labels\",\n",
- " mode=\"download\" \n",
- " )\n",
- "output_dir = PipelineData(name=\"scores\", \n",
- " datastore_name=default_ds, \n",
- " output_path_on_compute=\"batchscoring/results\")"
- ]
- },
- {
- "cell_type": "markdown",
- "metadata": {},
- "source": [
- "## Register the model with Workspace"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "metadata": {},
- "outputs": [],
- "source": [
- "import shutil\n",
- "from azureml.core.model import Model\n",
- "\n",
- "# register downloaded model \n",
- "model = Model.register(model_path = \"models/inception_v3.ckpt\",\n",
- " model_name = \"inception\", # this is the name the model is registered as\n",
- " tags = {'pretrained': \"inception\"},\n",
- " description = \"Imagenet trained tensorflow inception\",\n",
- " workspace = ws)\n",
- "# remove the downloaded dir after registration if you wish\n",
- "shutil.rmtree(\"models\")"
- ]
- },
- {
- "cell_type": "markdown",
- "metadata": {},
- "source": [
- "# Specify environment to run the script"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "metadata": {},
- "outputs": [],
- "source": [
- "cd = CondaDependencies.create(pip_packages=[\"tensorflow-gpu==1.4.0\", \"azureml-defaults\"])\n",
- "\n",
- "# Runconfig\n",
- "batchai_run_config = RunConfiguration(conda_dependencies=cd)\n",
- "batchai_run_config.environment.docker.enabled = True\n",
- "batchai_run_config.environment.docker.gpu_support = True\n",
- "batchai_run_config.environment.docker.base_image = \"microsoft/mmlspark:gpu-0.12\"\n",
- "batchai_run_config.environment.spark.precache_packages = False"
- ]
- },
- {
- "cell_type": "markdown",
- "metadata": {},
- "source": [
- "# Steps to run"
- ]
- },
- {
- "cell_type": "markdown",
- "metadata": {},
- "source": [
- "A subset of the parameters to the python script can be given as input when we re-run a `PublishedPipeline`. In the current example, we define `batch_size` taken by the script as such parameter."
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "metadata": {},
- "outputs": [],
- "source": [
- "from azureml.pipeline.core.graph import PipelineParameter\n",
- "batch_size_param = PipelineParameter(name=\"param_batch_size\", default_value=20)"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "metadata": {},
- "outputs": [],
- "source": [
- "inception_model_name = \"inception_v3.ckpt\"\n",
- "\n",
- "batch_score_step = PythonScriptStep(\n",
- " name=\"batch ai scoring\",\n",
- " script_name=\"batchai_score.py\",\n",
- " arguments=[\"--dataset_path\", input_images, \n",
- " \"--model_name\", \"inception\",\n",
- " \"--label_dir\", label_dir, \n",
- " \"--output_dir\", output_dir, \n",
- " \"--batch_size\", batch_size_param],\n",
- " target=cluster,\n",
- " inputs=[input_images, label_dir],\n",
- " outputs=[output_dir],\n",
- " runconfig=batchai_run_config,\n",
- " source_directory=project_folder\n",
- ")"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "metadata": {},
- "outputs": [],
- "source": [
- "pipeline = Pipeline(workspace=ws, steps=[batch_score_step])\n",
- "pipeline_run = Experiment(ws, 'batch_scoring').submit(pipeline, pipeline_params={\"param_batch_size\": 20})"
- ]
- },
- {
- "cell_type": "markdown",
- "metadata": {},
- "source": [
- "# Monitor run"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "metadata": {},
- "outputs": [],
- "source": [
- "from azureml.train.widgets import RunDetails\n",
- "RunDetails(pipeline_run).show()"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "metadata": {},
- "outputs": [],
- "source": [
- "pipeline_run.wait_for_completion(show_output=True)"
- ]
- },
- {
- "cell_type": "markdown",
- "metadata": {},
- "source": [
- "# Download and review output"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "metadata": {},
- "outputs": [],
- "source": [
- "step_run = list(pipeline_run.get_children())[0]\n",
- "step_run.download_file(\"./outputs/result-labels.txt\")"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "metadata": {},
- "outputs": [],
- "source": [
- "import pandas as pd\n",
- "df = pd.read_csv(\"result-labels.txt\", delimiter=\":\", header=None)\n",
- "df.columns = [\"Filename\", \"Prediction\"]\n",
- "df.head()"
- ]
- },
- {
- "cell_type": "markdown",
- "metadata": {},
- "source": [
- "# Publish a pipeline and rerun using a REST call"
- ]
- },
- {
- "cell_type": "markdown",
- "metadata": {},
- "source": [
- "## Create a published pipeline"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "metadata": {},
- "outputs": [],
- "source": [
- "published_pipeline = pipeline_run.publish_pipeline(\n",
- " name=\"Inception v3 scoring\", description=\"Batch scoring using Inception v3 model\", version=\"1.0\")\n",
- "\n",
- "published_id = published_pipeline.id"
- ]
- },
- {
- "cell_type": "markdown",
- "metadata": {},
- "source": [
- "## Rerun using REST call"
- ]
- },
- {
- "cell_type": "markdown",
- "metadata": {},
- "source": [
- "## Get AAD token"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "metadata": {},
- "outputs": [],
- "source": [
- "from azureml.core.authentication import AzureCliAuthentication\n",
- "import requests\n",
- "\n",
- "cli_auth = AzureCliAuthentication()\n",
- "aad_token = cli_auth.get_authentication_header()"
- ]
- },
- {
- "cell_type": "markdown",
- "metadata": {},
- "source": [
- "## Run published pipeline using its REST endpoint"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "metadata": {},
- "outputs": [],
- "source": [
- "from azureml.pipeline.core import PublishedPipeline\n",
- "\n",
- "rest_endpoint = PublishedPipeline.get_endpoint(published_id, ws)\n",
- "# specify batch size when running the pipeline\n",
- "response = requests.post(rest_endpoint, headers=aad_token, json={\"param_batch_size\": 50})\n",
- "run_id = response.json()[\"Id\"]"
- ]
- },
- {
- "cell_type": "markdown",
- "metadata": {},
- "source": [
- "## Monitor the new run"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "metadata": {},
- "outputs": [],
- "source": [
- "from azureml.pipeline.core.run import PipelineRun\n",
- "published_pipeline_run = PipelineRun(ws.experiments()[\"batch_scoring\"], run_id)\n",
- "\n",
- "RunDetails(published_pipeline_run).show()"
- ]
- }
- ],
- "metadata": {
- "kernelspec": {
- "display_name": "Python 3.6",
- "language": "python",
- "name": "python36"
- },
- "language_info": {
- "codemirror_mode": {
- "name": "ipython",
- "version": 3
- },
- "file_extension": ".py",
- "mimetype": "text/x-python",
- "name": "python",
- "nbconvert_exporter": "python",
- "pygments_lexer": "ipython3",
- "version": "3.6.5"
- }
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Copyright (c) Microsoft Corporation. All rights reserved.\n",
+ "\n",
+ "Licensed under the MIT License."
+ ]
},
- "nbformat": 4,
- "nbformat_minor": 2
-}
\ No newline at end of file
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "This notebook demonstrates how to run batch scoring job. __[Inception-V3 model](https://arxiv.org/abs/1512.00567)__ and unlabeled images from __[ImageNet](http://image-net.org/)__ dataset will be used. It registers a pretrained inception model in model registry then uses the model to do batch scoring on images in a blob container."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## Prerequisites\n",
+ "Make sure you go through the [00. Installation and Configuration](./00.configuration.ipynb) Notebook first if you haven't.\n"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "import os\n",
+ "from azureml.core import Workspace, Run, Experiment\n",
+ "\n",
+ "ws = Workspace.from_config()\n",
+ "print('Workspace name: ' + ws.name, \n",
+ " 'Azure region: ' + ws.location, \n",
+ " 'Subscription id: ' + ws.subscription_id, \n",
+ " 'Resource group: ' + ws.resource_group, sep = '\\n')\n",
+ "\n",
+ "# Also create a Project and attach to Workspace\n",
+ "scripts_folder = \"scripts\"\n",
+ "\n",
+ "if not os.path.isdir(scripts_folder):\n",
+ " os.mkdir(scripts_folder)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "from azureml.core.compute import BatchAiCompute, ComputeTarget\n",
+ "from azureml.core.datastore import Datastore\n",
+ "from azureml.data.data_reference import DataReference\n",
+ "from azureml.pipeline.core import Pipeline, PipelineData\n",
+ "from azureml.pipeline.steps import PythonScriptStep\n",
+ "from azureml.core.runconfig import CondaDependencies, RunConfiguration"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## Create and attach Compute targets\n",
+ "Use the below code to create and attach Compute targets. "
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# Batch AI compute\n",
+ "cluster_name = \"gpu_cluster\"\n",
+ "try:\n",
+ " cluster = BatchAiCompute(ws, cluster_name)\n",
+ " print(\"found existing cluster.\")\n",
+ "except:\n",
+ " print(\"creating new cluster\")\n",
+ " provisioning_config = BatchAiCompute.provisioning_configuration(vm_size = \"STANDARD_NC6\",\n",
+ " autoscale_enabled = True,\n",
+ " cluster_min_nodes = 0, \n",
+ " cluster_max_nodes = 1)\n",
+ "\n",
+ " # create the cluster\n",
+ " cluster = ComputeTarget.create(ws, cluster_name, provisioning_config)\n",
+ " cluster.wait_for_completion(show_output=True)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "# Python scripts to run"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Python scripts that run the batch scoring. `batchai_score.py` takes input images in `dataset_path`, pretrained models in `model_dir` and outputs a `results-label.txt` to `output_dir`."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "%%writefile $scripts_folder/batchai_score.py\n",
+ "import os\n",
+ "import argparse\n",
+ "import datetime,time\n",
+ "import tensorflow as tf\n",
+ "from math import ceil\n",
+ "import numpy as np\n",
+ "import shutil\n",
+ "from tensorflow.contrib.slim.python.slim.nets import inception_v3\n",
+ "from azureml.core.model import Model\n",
+ "\n",
+ "slim = tf.contrib.slim\n",
+ "\n",
+ "parser = argparse.ArgumentParser(description=\"Start a tensorflow model serving\")\n",
+ "parser.add_argument('--model_name', dest=\"model_name\", required=True)\n",
+ "parser.add_argument('--label_dir', dest=\"label_dir\", required=True)\n",
+ "parser.add_argument('--dataset_path', dest=\"dataset_path\", required=True)\n",
+ "parser.add_argument('--output_dir', dest=\"output_dir\", required=True)\n",
+ "parser.add_argument('--batch_size', dest=\"batch_size\", type=int, required=True)\n",
+ "\n",
+ "args = parser.parse_args()\n",
+ "\n",
+ "image_size = 299\n",
+ "num_channel = 3\n",
+ "\n",
+ "# create output directory if it does not exist\n",
+ "os.makedirs(args.output_dir, exist_ok=True)\n",
+ "\n",
+ "def get_class_label_dict(label_file):\n",
+ " label = []\n",
+ " proto_as_ascii_lines = tf.gfile.GFile(label_file).readlines()\n",
+ " for l in proto_as_ascii_lines:\n",
+ " label.append(l.rstrip())\n",
+ " return label\n",
+ "\n",
+ "\n",
+ "class DataIterator:\n",
+ " def __init__(self, data_dir):\n",
+ " self.file_paths = []\n",
+ " image_list = os.listdir(data_dir)\n",
+ " total_size = len(image_list)\n",
+ " self.file_paths = [data_dir + '/' + file_name.rstrip() for file_name in image_list ]\n",
+ "\n",
+ " self.labels = [1 for file_name in self.file_paths]\n",
+ "\n",
+ " @property\n",
+ " def size(self):\n",
+ " return len(self.labels)\n",
+ "\n",
+ " def input_pipeline(self, batch_size):\n",
+ " images_tensor = tf.convert_to_tensor(self.file_paths, dtype=tf.string)\n",
+ " labels_tensor = tf.convert_to_tensor(self.labels, dtype=tf.int64)\n",
+ " input_queue = tf.train.slice_input_producer([images_tensor, labels_tensor], shuffle=False)\n",
+ " labels = input_queue[1]\n",
+ " images_content = tf.read_file(input_queue[0])\n",
+ "\n",
+ " image_reader = tf.image.decode_jpeg(images_content, channels=num_channel, name=\"jpeg_reader\")\n",
+ " float_caster = tf.cast(image_reader, tf.float32)\n",
+ " new_size = tf.constant([image_size, image_size], dtype=tf.int32)\n",
+ " images = tf.image.resize_images(float_caster, new_size)\n",
+ " images = tf.divide(tf.subtract(images, [0]), [255])\n",
+ "\n",
+ " image_batch, label_batch = tf.train.batch([images, labels], batch_size=batch_size, capacity=5 * batch_size)\n",
+ " return image_batch\n",
+ "\n",
+ "def main(_):\n",
+ " start_time = datetime.datetime.now()\n",
+ " label_file_name = os.path.join(args.label_dir, \"labels.txt\")\n",
+ " label_dict = get_class_label_dict(label_file_name)\n",
+ " classes_num = len(label_dict)\n",
+ " test_feeder = DataIterator(data_dir=args.dataset_path)\n",
+ " total_size = len(test_feeder.labels)\n",
+ " count = 0\n",
+ " # get model from model registry\n",
+ " model_path = Model.get_model_path(args.model_name)\n",
+ " with tf.Session() as sess:\n",
+ " test_images = test_feeder.input_pipeline(batch_size=args.batch_size)\n",
+ " with slim.arg_scope(inception_v3.inception_v3_arg_scope()):\n",
+ " input_images = tf.placeholder(tf.float32, [args.batch_size, image_size, image_size, num_channel])\n",
+ " logits, _ = inception_v3.inception_v3(input_images,\n",
+ " num_classes=classes_num,\n",
+ " is_training=False)\n",
+ " probabilities = tf.argmax(logits, 1)\n",
+ "\n",
+ " sess.run(tf.global_variables_initializer())\n",
+ " sess.run(tf.local_variables_initializer())\n",
+ " coord = tf.train.Coordinator()\n",
+ " threads = tf.train.start_queue_runners(sess=sess, coord=coord)\n",
+ " saver = tf.train.Saver()\n",
+ " saver.restore(sess, model_path)\n",
+ " out_filename = os.path.join(args.output_dir, \"result-labels.txt\")\n",
+ " with open(out_filename, \"w\") as result_file:\n",
+ " i = 0\n",
+ " while count < total_size and not coord.should_stop():\n",
+ " test_images_batch = sess.run(test_images)\n",
+ " file_names_batch = test_feeder.file_paths[i*args.batch_size: min(test_feeder.size, (i+1)*args.batch_size)]\n",
+ " results = sess.run(probabilities, feed_dict={input_images: test_images_batch})\n",
+ " new_add = min(args.batch_size, total_size-count)\n",
+ " count += new_add\n",
+ " i += 1\n",
+ " for j in range(new_add):\n",
+ " result_file.write(os.path.basename(file_names_batch[j]) + \": \" + label_dict[results[j]] + \"\\n\")\n",
+ " result_file.flush()\n",
+ " coord.request_stop()\n",
+ " coord.join(threads)\n",
+ " \n",
+ " # copy the file to artifacts\n",
+ " shutil.copy(out_filename, \"./outputs/\")\n",
+ " # Move the processed data out of the blob so that the next run can process the data.\n",
+ "\n",
+ "if __name__ == \"__main__\":\n",
+ " tf.app.run()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## Prepare Model and Input data"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "### Download Model\n",
+ "\n",
+ "Download and extract model from http://download.tensorflow.org/models/inception_v3_2016_08_28.tar.gz to `\"models\"`"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# create directory for model\n",
+ "model_dir = 'models'\n",
+ "if not os.path.isdir(model_dir):\n",
+ " os.mkdir(model_dir)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "import tarfile\n",
+ "import urllib.request\n",
+ "\n",
+ "url=\"http://download.tensorflow.org/models/inception_v3_2016_08_28.tar.gz\"\n",
+ "response = urllib.request.urlretrieve(url, \"model.tar.gz\")\n",
+ "tar = tarfile.open(\"model.tar.gz\", \"r:gz\")\n",
+ "tar.extractall(model_dir)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "### Create a datastore that points to blob container containing sample images\n",
+ "\n",
+ "We have created a public blob container `sampledata` on an account named `pipelinedata` containing images from ImageNet evaluation set. In the next step, we create a datastore with name `images_datastore` that points to this container. The `overwrite=True` step overwrites any datastore that was created previously with that name. \n",
+ "\n",
+ "This step can be changed to point to your blob container by providing an additional `account_key` parameter with `account_name`. "
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "account_name = \"pipelinedata\"\n",
+ "sample_data = Datastore.register_azure_blob_container(ws, datastore_name=\"images_datastore\", container_name=\"sampledata\", \n",
+ " account_name=account_name, \n",
+ " overwrite=True)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "# Output datastore"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "We write the outputs to the default datastore"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "default_ds = ws.get_default_datastore()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "# Specify where the data is stored or will be written to"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "from azureml.core.conda_dependencies import CondaDependencies\n",
+ "from azureml.data.data_reference import DataReference\n",
+ "from azureml.pipeline.core import Pipeline, PipelineData\n",
+ "from azureml.core import Datastore\n",
+ "from azureml.core import Experiment"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "input_images = DataReference(datastore=sample_data, \n",
+ " data_reference_name=\"input_images\",\n",
+ " path_on_datastore=\"batchscoring/images\",\n",
+ " mode=\"download\"\n",
+ " )\n",
+ "model_dir = DataReference(datastore=sample_data, \n",
+ " data_reference_name=\"input_model\",\n",
+ " path_on_datastore=\"batchscoring/models\",\n",
+ " mode=\"download\" \n",
+ " )\n",
+ "label_dir = DataReference(datastore=sample_data, \n",
+ " data_reference_name=\"input_labels\",\n",
+ " path_on_datastore=\"batchscoring/labels\",\n",
+ " mode=\"download\" \n",
+ " )\n",
+ "output_dir = PipelineData(name=\"scores\", \n",
+ " datastore_name=default_ds.name, \n",
+ " output_path_on_compute=\"batchscoring/results\")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## Register the model with Workspace"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "import shutil\n",
+ "from azureml.core.model import Model\n",
+ "\n",
+ "# register downloaded model \n",
+ "model = Model.register(model_path = \"models/inception_v3.ckpt\",\n",
+ " model_name = \"inception\", # this is the name the model is registered as\n",
+ " tags = {'pretrained': \"inception\"},\n",
+ " description = \"Imagenet trained tensorflow inception\",\n",
+ " workspace = ws)\n",
+ "# remove the downloaded dir after registration if you wish\n",
+ "shutil.rmtree(\"models\")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "# Specify environment to run the script"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "cd = CondaDependencies.create(pip_packages=[\"tensorflow-gpu==1.4.0\", \"azureml-defaults\"])\n",
+ "\n",
+ "# Runconfig\n",
+ "batchai_run_config = RunConfiguration(conda_dependencies=cd)\n",
+ "batchai_run_config.environment.docker.enabled = True\n",
+ "batchai_run_config.environment.docker.gpu_support = True\n",
+ "batchai_run_config.environment.docker.base_image = \"microsoft/mmlspark:gpu-0.12\"\n",
+ "batchai_run_config.environment.spark.precache_packages = False"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "# Steps to run"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "A subset of the parameters to the python script can be given as input when we re-run a `PublishedPipeline`. In the current example, we define `batch_size` taken by the script as such parameter."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "from azureml.pipeline.core.graph import PipelineParameter\n",
+ "batch_size_param = PipelineParameter(name=\"param_batch_size\", default_value=20)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "inception_model_name = \"inception_v3.ckpt\"\n",
+ "\n",
+ "batch_score_step = PythonScriptStep(\n",
+ " name=\"batch ai scoring\",\n",
+ " script_name=\"batchai_score.py\",\n",
+ " arguments=[\"--dataset_path\", input_images, \n",
+ " \"--model_name\", \"inception\",\n",
+ " \"--label_dir\", label_dir, \n",
+ " \"--output_dir\", output_dir, \n",
+ " \"--batch_size\", batch_size_param],\n",
+ " target=cluster,\n",
+ " inputs=[input_images, label_dir],\n",
+ " outputs=[output_dir],\n",
+ " runconfig=batchai_run_config,\n",
+ " source_directory=scripts_folder\n",
+ ")"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "pipeline = Pipeline(workspace=ws, steps=[batch_score_step])\n",
+ "pipeline_run = Experiment(ws, 'batch_scoring').submit(pipeline, pipeline_params={\"param_batch_size\": 20})"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "# Monitor run"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "from azureml.train.widgets import RunDetails\n",
+ "RunDetails(pipeline_run).show()"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "pipeline_run.wait_for_completion(show_output=True)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "# Download and review output"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "step_run = list(pipeline_run.get_children())[0]\n",
+ "step_run.download_file(\"./outputs/result-labels.txt\")"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "import pandas as pd\n",
+ "df = pd.read_csv(\"result-labels.txt\", delimiter=\":\", header=None)\n",
+ "df.columns = [\"Filename\", \"Prediction\"]\n",
+ "df.head()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "# Publish a pipeline and rerun using a REST call"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## Create a published pipeline"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "published_pipeline = pipeline_run.publish_pipeline(\n",
+ " name=\"Inception v3 scoring\", description=\"Batch scoring using Inception v3 model\", version=\"1.0\")\n",
+ "\n",
+ "published_id = published_pipeline.id"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## Rerun using REST call"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## Get AAD token"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "from azureml.core.authentication import AzureCliAuthentication\n",
+ "import requests\n",
+ "\n",
+ "cli_auth = AzureCliAuthentication()\n",
+ "aad_token = cli_auth.get_authentication_header()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## Run published pipeline using its REST endpoint"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "from azureml.pipeline.core import PublishedPipeline\n",
+ "\n",
+ "rest_endpoint = PublishedPipeline.get_endpoint(published_id, ws)\n",
+ "# specify batch size when running the pipeline\n",
+ "response = requests.post(rest_endpoint, headers=aad_token, json={\"param_batch_size\": 50})\n",
+ "run_id = response.json()[\"Id\"]"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## Monitor the new run"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "from azureml.pipeline.core.run import PipelineRun\n",
+ "published_pipeline_run = PipelineRun(ws.experiments()[\"batch_scoring\"], run_id)\n",
+ "\n",
+ "RunDetails(published_pipeline_run).show()"
+ ]
+ }
+ ],
+ "metadata": {
+ "kernelspec": {
+ "display_name": "Python 3.6",
+ "language": "python",
+ "name": "python36"
+ },
+ "language_info": {
+ "codemirror_mode": {
+ "name": "ipython",
+ "version": 3
+ },
+ "file_extension": ".py",
+ "mimetype": "text/x-python",
+ "name": "python",
+ "nbconvert_exporter": "python",
+ "pygments_lexer": "ipython3",
+ "version": "3.6.6"
+ }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 2
+}