From 2c391a448657df1e680d50835f635762ae4e92ec Mon Sep 17 00:00:00 2001 From: Roope Astala Date: Tue, 2 Oct 2018 16:12:22 -0400 Subject: [PATCH] update pipeline notebook --- pipeline/pipeline-batch-scoring.ipynb | 1258 +++++++++++++------------ 1 file changed, 638 insertions(+), 620 deletions(-) diff --git a/pipeline/pipeline-batch-scoring.ipynb b/pipeline/pipeline-batch-scoring.ipynb index 62fd8cea..1cc660d6 100644 --- a/pipeline/pipeline-batch-scoring.ipynb +++ b/pipeline/pipeline-batch-scoring.ipynb @@ -1,622 +1,640 @@ { - "cells": [ - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Copyright (c) Microsoft Corporation. All rights reserved.\n", - "\n", - "Licensed under the MIT License." - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "This notebook demonstrates how to run batch scoring job. __[Inception-V3 model](https://arxiv.org/abs/1512.00567)__ and unlabeled images from __[ImageNet](http://image-net.org/)__ dataset will be used. It registers a pretrained inception model in model registry then uses the model to do batch scoring on images in a blob container." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import os\n", - "from azureml.core import Workspace, Run, Experiment\n", - "\n", - "ws = Workspace.from_config()\n", - "print('Workspace name: ' + ws.name, \n", - " 'Azure region: ' + ws.location, \n", - " 'Subscription id: ' + ws.subscription_id, \n", - " 'Resource group: ' + ws.resource_group, sep = '\\n')\n", - "\n", - "# Also create a Project and attach to Workspace\n", - "project_folder = \"sample_projects\"\n", - "run_history_name = project_folder\n", - "\n", - "if not os.path.isdir(project_folder):\n", - " os.mkdir(project_folder)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from azureml.core.compute import BatchAiCompute, ComputeTarget\n", - "from azureml.core.datastore import Datastore\n", - "from azureml.data.data_reference import DataReference\n", - "from azureml.pipeline.core import Pipeline, PipelineData\n", - "from azureml.pipeline.steps import PythonScriptStep\n", - "from azureml.core.runconfig import CondaDependencies, RunConfiguration" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Create and attach Compute targets\n", - "Use the below code to create and attach Compute targets. " - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Batch AI compute\n", - "cluster_name = \"gpu_cluster\"\n", - "try:\n", - " cluster = BatchAiCompute(ws, cluster_name)\n", - " print(\"found existing cluster.\")\n", - "except:\n", - " print(\"creating new cluster\")\n", - " provisioning_config = BatchAiCompute.provisioning_configuration(vm_size = \"STANDARD_NC6\",\n", - " autoscale_enabled = True,\n", - " cluster_min_nodes = 0, \n", - " cluster_max_nodes = 1)\n", - "\n", - " # create the cluster\n", - " cluster = ComputeTarget.create(ws, cluster_name, provisioning_config)\n", - " cluster.wait_for_completion(show_output=True)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Python scripts to run" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Python scripts that run the batch scoring. `batchai_score.py` takes input images in `dataset_path`, pretrained models in `model_dir` and outputs a `results-label.txt` to `output_dir`." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "%%writefile $project_folder/batchai_score.py\n", - "import os\n", - "import argparse\n", - "import datetime,time\n", - "import tensorflow as tf\n", - "from math import ceil\n", - "import numpy as np\n", - "import shutil\n", - "from tensorflow.contrib.slim.python.slim.nets import inception_v3\n", - "from azureml.core.model import Model\n", - "\n", - "slim = tf.contrib.slim\n", - "\n", - "parser = argparse.ArgumentParser(description=\"Start a tensorflow model serving\")\n", - "parser.add_argument('--model_name', dest=\"model_name\", required=True)\n", - "parser.add_argument('--label_dir', dest=\"label_dir\", required=True)\n", - "parser.add_argument('--dataset_path', dest=\"dataset_path\", required=True)\n", - "parser.add_argument('--output_dir', dest=\"output_dir\", required=True)\n", - "parser.add_argument('--batch_size', dest=\"batch_size\", type=int, required=True)\n", - "\n", - "args = parser.parse_args()\n", - "\n", - "image_size = 299\n", - "num_channel = 3\n", - "\n", - "# create output directory if it does not exist\n", - "os.makedirs(args.output_dir, exist_ok=True)\n", - "\n", - "def get_class_label_dict(label_file):\n", - " label = []\n", - " proto_as_ascii_lines = tf.gfile.GFile(label_file).readlines()\n", - " for l in proto_as_ascii_lines:\n", - " label.append(l.rstrip())\n", - " return label\n", - "\n", - "\n", - "class DataIterator:\n", - " def __init__(self, data_dir):\n", - " self.file_paths = []\n", - " image_list = os.listdir(data_dir)\n", - " total_size = len(image_list)\n", - " self.file_paths = [data_dir + '/' + file_name.rstrip() for file_name in image_list ]\n", - "\n", - " self.labels = [1 for file_name in self.file_paths]\n", - "\n", - " @property\n", - " def size(self):\n", - " return len(self.labels)\n", - "\n", - " def input_pipeline(self, batch_size):\n", - " images_tensor = tf.convert_to_tensor(self.file_paths, dtype=tf.string)\n", - " labels_tensor = tf.convert_to_tensor(self.labels, dtype=tf.int64)\n", - " input_queue = tf.train.slice_input_producer([images_tensor, labels_tensor], shuffle=False)\n", - " labels = input_queue[1]\n", - " images_content = tf.read_file(input_queue[0])\n", - "\n", - " image_reader = tf.image.decode_jpeg(images_content, channels=num_channel, name=\"jpeg_reader\")\n", - " float_caster = tf.cast(image_reader, tf.float32)\n", - " new_size = tf.constant([image_size, image_size], dtype=tf.int32)\n", - " images = tf.image.resize_images(float_caster, new_size)\n", - " images = tf.divide(tf.subtract(images, [0]), [255])\n", - "\n", - " image_batch, label_batch = tf.train.batch([images, labels], batch_size=batch_size, capacity=5 * batch_size)\n", - " return image_batch\n", - "\n", - "def main(_):\n", - " start_time = datetime.datetime.now()\n", - " label_file_name = os.path.join(args.label_dir, \"labels.txt\")\n", - " label_dict = get_class_label_dict(label_file_name)\n", - " classes_num = len(label_dict)\n", - " test_feeder = DataIterator(data_dir=args.dataset_path)\n", - " total_size = len(test_feeder.labels)\n", - " count = 0\n", - " # get model from model registry\n", - " model_path = Model.get_model_path(args.model_name)\n", - " with tf.Session() as sess:\n", - " test_images = test_feeder.input_pipeline(batch_size=args.batch_size)\n", - " with slim.arg_scope(inception_v3.inception_v3_arg_scope()):\n", - " input_images = tf.placeholder(tf.float32, [args.batch_size, image_size, image_size, num_channel])\n", - " logits, _ = inception_v3.inception_v3(input_images,\n", - " num_classes=classes_num,\n", - " is_training=False)\n", - " probabilities = tf.argmax(logits, 1)\n", - "\n", - " sess.run(tf.global_variables_initializer())\n", - " sess.run(tf.local_variables_initializer())\n", - " coord = tf.train.Coordinator()\n", - " threads = tf.train.start_queue_runners(sess=sess, coord=coord)\n", - " saver = tf.train.Saver()\n", - " saver.restore(sess, model_path)\n", - " out_filename = os.path.join(args.output_dir, \"result-labels.txt\")\n", - " with open(out_filename, \"w\") as result_file:\n", - " i = 0\n", - " while count < total_size and not coord.should_stop():\n", - " test_images_batch = sess.run(test_images)\n", - " file_names_batch = test_feeder.file_paths[i*args.batch_size: min(test_feeder.size, (i+1)*args.batch_size)]\n", - " results = sess.run(probabilities, feed_dict={input_images: test_images_batch})\n", - " new_add = min(args.batch_size, total_size-count)\n", - " count += new_add\n", - " i += 1\n", - " for j in range(new_add):\n", - " result_file.write(os.path.basename(file_names_batch[j]) + \": \" + label_dict[results[j]] + \"\\n\")\n", - " result_file.flush()\n", - " coord.request_stop()\n", - " coord.join(threads)\n", - " \n", - " # copy the file to artifacts\n", - " shutil.copy(out_filename, \"./outputs/\")\n", - " # Move the processed data out of the blob so that the next run can process the data.\n", - "\n", - "if __name__ == \"__main__\":\n", - " tf.app.run()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Prepare Model and Input data" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# create directory for model\n", - "model_dir = 'models'\n", - "if not os.path.isdir(model_dir):\n", - " os.mkdir(model_dir)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Download Model\n", - "This manual step is required to register the model to the workspace\n", - "\n", - "Download and extract model from http://download.tensorflow.org/models/inception_v3_2016_08_28.tar.gz to model_dir" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Get samples images and upload to Datastore\n", - "This manual step is required to run batchai_score.py\n", - "\n", - "Download and extract sample images from ImageNet evaluation set and **upload** to a blob that will be registered as a Datastore in the next step\n", - "\n", - "A copy of sample images from ImageNet evaluation set can be found at __[BatchAI Samples Blob](https://batchaisamples.blob.core.windows.net/samples/imagenet_samples.zip?st=2017-09-29T18%3A29%3A00Z&se=2099-12-31T08%3A00%3A00Z&sp=rl&sv=2016-05-31&sr=c&sig=PmhL%2BYnYAyNTZr1DM2JySvrI12e%2F4wZNIwCtf7TRI%2BM%3D)__ \n", - "\n", - "There are multiple ways to create folders and upload files into Azure Blob Container - you can use __[Azure Portal](https://ms.portal.azure.com/)__, __[Storage Explorer](http://storageexplorer.com/)__, __[Azure CLI2](https://render.githubusercontent.com/azure-cli-extension)__ or Azure SDK for your preferable programming language. " - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "account_name = \"batchscoringdata\"\n", - "sample_data = Datastore.register_azure_blob_container(ws, \"sampledata\", \"sampledata\", \n", - " account_name=account_name, \n", - " overwrite=True)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Output datastore" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "We write the outputs to the default datastore" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "default_ds = \"workspaceblobstore\"" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Specify where the data is stored or will be written to" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from azureml.core.conda_dependencies import CondaDependencies\n", - "from azureml.data.data_reference import DataReference\n", - "from azureml.pipeline.core import Pipeline, PipelineData\n", - "from azureml.core import Datastore\n", - "from azureml.core import Experiment" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "input_images = DataReference(datastore=sample_data, \n", - " data_reference_name=\"input_images\",\n", - " path_on_datastore=\"batchscoring/images\",\n", - " mode=\"download\"\n", - " )\n", - "model_dir = DataReference(datastore=sample_data, \n", - " data_reference_name=\"input_model\",\n", - " path_on_datastore=\"batchscoring/models\",\n", - " mode=\"download\" \n", - " )\n", - "label_dir = DataReference(datastore=sample_data, \n", - " data_reference_name=\"input_labels\",\n", - " path_on_datastore=\"batchscoring/labels\",\n", - " mode=\"download\" \n", - " )\n", - "output_dir = PipelineData(name=\"scores\", \n", - " datastore_name=default_ds, \n", - " output_path_on_compute=\"batchscoring/results\")" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Register the model with Workspace" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import shutil\n", - "from azureml.core.model import Model\n", - "\n", - "# register downloaded model \n", - "model = Model.register(model_path = \"models/inception_v3.ckpt\",\n", - " model_name = \"inception\", # this is the name the model is registered as\n", - " tags = {'pretrained': \"inception\"},\n", - " description = \"Imagenet trained tensorflow inception\",\n", - " workspace = ws)\n", - "# remove the downloaded dir after registration if you wish\n", - "shutil.rmtree(\"models\")" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Specify environment to run the script" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "cd = CondaDependencies.create(pip_packages=[\"tensorflow-gpu==1.4.0\", \"azureml-defaults\"])\n", - "\n", - "# Runconfig\n", - "batchai_run_config = RunConfiguration(conda_dependencies=cd)\n", - "batchai_run_config.environment.docker.enabled = True\n", - "batchai_run_config.environment.docker.gpu_support = True\n", - "batchai_run_config.environment.docker.base_image = \"microsoft/mmlspark:gpu-0.12\"\n", - "batchai_run_config.environment.spark.precache_packages = False" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Steps to run" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "A subset of the parameters to the python script can be given as input when we re-run a `PublishedPipeline`. In the current example, we define `batch_size` taken by the script as such parameter." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from azureml.pipeline.core.graph import PipelineParameter\n", - "batch_size_param = PipelineParameter(name=\"param_batch_size\", default_value=20)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "inception_model_name = \"inception_v3.ckpt\"\n", - "\n", - "batch_score_step = PythonScriptStep(\n", - " name=\"batch ai scoring\",\n", - " script_name=\"batchai_score.py\",\n", - " arguments=[\"--dataset_path\", input_images, \n", - " \"--model_name\", \"inception\",\n", - " \"--label_dir\", label_dir, \n", - " \"--output_dir\", output_dir, \n", - " \"--batch_size\", batch_size_param],\n", - " target=cluster,\n", - " inputs=[input_images, label_dir],\n", - " outputs=[output_dir],\n", - " runconfig=batchai_run_config,\n", - " source_directory=project_folder\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "pipeline = Pipeline(workspace=ws, steps=[batch_score_step])\n", - "pipeline_run = Experiment(ws, 'batch_scoring').submit(pipeline, pipeline_params={\"param_batch_size\": 20})" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Monitor run" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from azureml.train.widgets import RunDetails\n", - "RunDetails(pipeline_run).show()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "pipeline_run.wait_for_completion(show_output=True)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Download and review output" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "step_run = list(pipeline_run.get_children())[0]\n", - "step_run.download_file(\"./outputs/result-labels.txt\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import pandas as pd\n", - "df = pd.read_csv(\"result-labels.txt\", delimiter=\":\", header=None)\n", - "df.columns = [\"Filename\", \"Prediction\"]\n", - "df.head()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Publish a pipeline and rerun using a REST call" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Create a published pipeline" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "published_pipeline = pipeline_run.publish_pipeline(\n", - " name=\"Inception v3 scoring\", description=\"Batch scoring using Inception v3 model\", version=\"1.0\")\n", - "\n", - "published_id = published_pipeline.id" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Rerun using REST call" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Get AAD token" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from azureml.core.authentication import AzureCliAuthentication\n", - "import requests\n", - "\n", - "cli_auth = AzureCliAuthentication()\n", - "aad_token = cli_auth.get_authentication_header()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Run published pipeline using its REST endpoint" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from azureml.pipeline.core import PublishedPipeline\n", - "\n", - "rest_endpoint = PublishedPipeline.get_endpoint(published_id, ws)\n", - "# specify batch size when running the pipeline\n", - "response = requests.post(rest_endpoint, headers=aad_token, json={\"param_batch_size\": 50})\n", - "run_id = response.json()[\"Id\"]" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Monitor the new run" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from azureml.pipeline.core.run import PipelineRun\n", - "published_pipeline_run = PipelineRun(ws.experiments()[\"batch_scoring\"], run_id)\n", - "\n", - "RunDetails(published_pipeline_run).show()" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3.6", - "language": "python", - "name": "python36" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.6.5" - } + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Copyright (c) Microsoft Corporation. All rights reserved.\n", + "\n", + "Licensed under the MIT License." + ] }, - "nbformat": 4, - "nbformat_minor": 2 -} \ No newline at end of file + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "This notebook demonstrates how to run batch scoring job. __[Inception-V3 model](https://arxiv.org/abs/1512.00567)__ and unlabeled images from __[ImageNet](http://image-net.org/)__ dataset will be used. It registers a pretrained inception model in model registry then uses the model to do batch scoring on images in a blob container." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Prerequisites\n", + "Make sure you go through the [00. Installation and Configuration](./00.configuration.ipynb) Notebook first if you haven't.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "from azureml.core import Workspace, Run, Experiment\n", + "\n", + "ws = Workspace.from_config()\n", + "print('Workspace name: ' + ws.name, \n", + " 'Azure region: ' + ws.location, \n", + " 'Subscription id: ' + ws.subscription_id, \n", + " 'Resource group: ' + ws.resource_group, sep = '\\n')\n", + "\n", + "# Also create a Project and attach to Workspace\n", + "scripts_folder = \"scripts\"\n", + "\n", + "if not os.path.isdir(scripts_folder):\n", + " os.mkdir(scripts_folder)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from azureml.core.compute import BatchAiCompute, ComputeTarget\n", + "from azureml.core.datastore import Datastore\n", + "from azureml.data.data_reference import DataReference\n", + "from azureml.pipeline.core import Pipeline, PipelineData\n", + "from azureml.pipeline.steps import PythonScriptStep\n", + "from azureml.core.runconfig import CondaDependencies, RunConfiguration" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Create and attach Compute targets\n", + "Use the below code to create and attach Compute targets. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Batch AI compute\n", + "cluster_name = \"gpu_cluster\"\n", + "try:\n", + " cluster = BatchAiCompute(ws, cluster_name)\n", + " print(\"found existing cluster.\")\n", + "except:\n", + " print(\"creating new cluster\")\n", + " provisioning_config = BatchAiCompute.provisioning_configuration(vm_size = \"STANDARD_NC6\",\n", + " autoscale_enabled = True,\n", + " cluster_min_nodes = 0, \n", + " cluster_max_nodes = 1)\n", + "\n", + " # create the cluster\n", + " cluster = ComputeTarget.create(ws, cluster_name, provisioning_config)\n", + " cluster.wait_for_completion(show_output=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Python scripts to run" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Python scripts that run the batch scoring. `batchai_score.py` takes input images in `dataset_path`, pretrained models in `model_dir` and outputs a `results-label.txt` to `output_dir`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%writefile $scripts_folder/batchai_score.py\n", + "import os\n", + "import argparse\n", + "import datetime,time\n", + "import tensorflow as tf\n", + "from math import ceil\n", + "import numpy as np\n", + "import shutil\n", + "from tensorflow.contrib.slim.python.slim.nets import inception_v3\n", + "from azureml.core.model import Model\n", + "\n", + "slim = tf.contrib.slim\n", + "\n", + "parser = argparse.ArgumentParser(description=\"Start a tensorflow model serving\")\n", + "parser.add_argument('--model_name', dest=\"model_name\", required=True)\n", + "parser.add_argument('--label_dir', dest=\"label_dir\", required=True)\n", + "parser.add_argument('--dataset_path', dest=\"dataset_path\", required=True)\n", + "parser.add_argument('--output_dir', dest=\"output_dir\", required=True)\n", + "parser.add_argument('--batch_size', dest=\"batch_size\", type=int, required=True)\n", + "\n", + "args = parser.parse_args()\n", + "\n", + "image_size = 299\n", + "num_channel = 3\n", + "\n", + "# create output directory if it does not exist\n", + "os.makedirs(args.output_dir, exist_ok=True)\n", + "\n", + "def get_class_label_dict(label_file):\n", + " label = []\n", + " proto_as_ascii_lines = tf.gfile.GFile(label_file).readlines()\n", + " for l in proto_as_ascii_lines:\n", + " label.append(l.rstrip())\n", + " return label\n", + "\n", + "\n", + "class DataIterator:\n", + " def __init__(self, data_dir):\n", + " self.file_paths = []\n", + " image_list = os.listdir(data_dir)\n", + " total_size = len(image_list)\n", + " self.file_paths = [data_dir + '/' + file_name.rstrip() for file_name in image_list ]\n", + "\n", + " self.labels = [1 for file_name in self.file_paths]\n", + "\n", + " @property\n", + " def size(self):\n", + " return len(self.labels)\n", + "\n", + " def input_pipeline(self, batch_size):\n", + " images_tensor = tf.convert_to_tensor(self.file_paths, dtype=tf.string)\n", + " labels_tensor = tf.convert_to_tensor(self.labels, dtype=tf.int64)\n", + " input_queue = tf.train.slice_input_producer([images_tensor, labels_tensor], shuffle=False)\n", + " labels = input_queue[1]\n", + " images_content = tf.read_file(input_queue[0])\n", + "\n", + " image_reader = tf.image.decode_jpeg(images_content, channels=num_channel, name=\"jpeg_reader\")\n", + " float_caster = tf.cast(image_reader, tf.float32)\n", + " new_size = tf.constant([image_size, image_size], dtype=tf.int32)\n", + " images = tf.image.resize_images(float_caster, new_size)\n", + " images = tf.divide(tf.subtract(images, [0]), [255])\n", + "\n", + " image_batch, label_batch = tf.train.batch([images, labels], batch_size=batch_size, capacity=5 * batch_size)\n", + " return image_batch\n", + "\n", + "def main(_):\n", + " start_time = datetime.datetime.now()\n", + " label_file_name = os.path.join(args.label_dir, \"labels.txt\")\n", + " label_dict = get_class_label_dict(label_file_name)\n", + " classes_num = len(label_dict)\n", + " test_feeder = DataIterator(data_dir=args.dataset_path)\n", + " total_size = len(test_feeder.labels)\n", + " count = 0\n", + " # get model from model registry\n", + " model_path = Model.get_model_path(args.model_name)\n", + " with tf.Session() as sess:\n", + " test_images = test_feeder.input_pipeline(batch_size=args.batch_size)\n", + " with slim.arg_scope(inception_v3.inception_v3_arg_scope()):\n", + " input_images = tf.placeholder(tf.float32, [args.batch_size, image_size, image_size, num_channel])\n", + " logits, _ = inception_v3.inception_v3(input_images,\n", + " num_classes=classes_num,\n", + " is_training=False)\n", + " probabilities = tf.argmax(logits, 1)\n", + "\n", + " sess.run(tf.global_variables_initializer())\n", + " sess.run(tf.local_variables_initializer())\n", + " coord = tf.train.Coordinator()\n", + " threads = tf.train.start_queue_runners(sess=sess, coord=coord)\n", + " saver = tf.train.Saver()\n", + " saver.restore(sess, model_path)\n", + " out_filename = os.path.join(args.output_dir, \"result-labels.txt\")\n", + " with open(out_filename, \"w\") as result_file:\n", + " i = 0\n", + " while count < total_size and not coord.should_stop():\n", + " test_images_batch = sess.run(test_images)\n", + " file_names_batch = test_feeder.file_paths[i*args.batch_size: min(test_feeder.size, (i+1)*args.batch_size)]\n", + " results = sess.run(probabilities, feed_dict={input_images: test_images_batch})\n", + " new_add = min(args.batch_size, total_size-count)\n", + " count += new_add\n", + " i += 1\n", + " for j in range(new_add):\n", + " result_file.write(os.path.basename(file_names_batch[j]) + \": \" + label_dict[results[j]] + \"\\n\")\n", + " result_file.flush()\n", + " coord.request_stop()\n", + " coord.join(threads)\n", + " \n", + " # copy the file to artifacts\n", + " shutil.copy(out_filename, \"./outputs/\")\n", + " # Move the processed data out of the blob so that the next run can process the data.\n", + "\n", + "if __name__ == \"__main__\":\n", + " tf.app.run()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Prepare Model and Input data" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Download Model\n", + "\n", + "Download and extract model from http://download.tensorflow.org/models/inception_v3_2016_08_28.tar.gz to `\"models\"`" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# create directory for model\n", + "model_dir = 'models'\n", + "if not os.path.isdir(model_dir):\n", + " os.mkdir(model_dir)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import tarfile\n", + "import urllib.request\n", + "\n", + "url=\"http://download.tensorflow.org/models/inception_v3_2016_08_28.tar.gz\"\n", + "response = urllib.request.urlretrieve(url, \"model.tar.gz\")\n", + "tar = tarfile.open(\"model.tar.gz\", \"r:gz\")\n", + "tar.extractall(model_dir)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Create a datastore that points to blob container containing sample images\n", + "\n", + "We have created a public blob container `sampledata` on an account named `pipelinedata` containing images from ImageNet evaluation set. In the next step, we create a datastore with name `images_datastore` that points to this container. The `overwrite=True` step overwrites any datastore that was created previously with that name. \n", + "\n", + "This step can be changed to point to your blob container by providing an additional `account_key` parameter with `account_name`. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "account_name = \"pipelinedata\"\n", + "sample_data = Datastore.register_azure_blob_container(ws, datastore_name=\"images_datastore\", container_name=\"sampledata\", \n", + " account_name=account_name, \n", + " overwrite=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Output datastore" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We write the outputs to the default datastore" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "default_ds = ws.get_default_datastore()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Specify where the data is stored or will be written to" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from azureml.core.conda_dependencies import CondaDependencies\n", + "from azureml.data.data_reference import DataReference\n", + "from azureml.pipeline.core import Pipeline, PipelineData\n", + "from azureml.core import Datastore\n", + "from azureml.core import Experiment" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "input_images = DataReference(datastore=sample_data, \n", + " data_reference_name=\"input_images\",\n", + " path_on_datastore=\"batchscoring/images\",\n", + " mode=\"download\"\n", + " )\n", + "model_dir = DataReference(datastore=sample_data, \n", + " data_reference_name=\"input_model\",\n", + " path_on_datastore=\"batchscoring/models\",\n", + " mode=\"download\" \n", + " )\n", + "label_dir = DataReference(datastore=sample_data, \n", + " data_reference_name=\"input_labels\",\n", + " path_on_datastore=\"batchscoring/labels\",\n", + " mode=\"download\" \n", + " )\n", + "output_dir = PipelineData(name=\"scores\", \n", + " datastore_name=default_ds.name, \n", + " output_path_on_compute=\"batchscoring/results\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Register the model with Workspace" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import shutil\n", + "from azureml.core.model import Model\n", + "\n", + "# register downloaded model \n", + "model = Model.register(model_path = \"models/inception_v3.ckpt\",\n", + " model_name = \"inception\", # this is the name the model is registered as\n", + " tags = {'pretrained': \"inception\"},\n", + " description = \"Imagenet trained tensorflow inception\",\n", + " workspace = ws)\n", + "# remove the downloaded dir after registration if you wish\n", + "shutil.rmtree(\"models\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Specify environment to run the script" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "cd = CondaDependencies.create(pip_packages=[\"tensorflow-gpu==1.4.0\", \"azureml-defaults\"])\n", + "\n", + "# Runconfig\n", + "batchai_run_config = RunConfiguration(conda_dependencies=cd)\n", + "batchai_run_config.environment.docker.enabled = True\n", + "batchai_run_config.environment.docker.gpu_support = True\n", + "batchai_run_config.environment.docker.base_image = \"microsoft/mmlspark:gpu-0.12\"\n", + "batchai_run_config.environment.spark.precache_packages = False" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Steps to run" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "A subset of the parameters to the python script can be given as input when we re-run a `PublishedPipeline`. In the current example, we define `batch_size` taken by the script as such parameter." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from azureml.pipeline.core.graph import PipelineParameter\n", + "batch_size_param = PipelineParameter(name=\"param_batch_size\", default_value=20)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "inception_model_name = \"inception_v3.ckpt\"\n", + "\n", + "batch_score_step = PythonScriptStep(\n", + " name=\"batch ai scoring\",\n", + " script_name=\"batchai_score.py\",\n", + " arguments=[\"--dataset_path\", input_images, \n", + " \"--model_name\", \"inception\",\n", + " \"--label_dir\", label_dir, \n", + " \"--output_dir\", output_dir, \n", + " \"--batch_size\", batch_size_param],\n", + " target=cluster,\n", + " inputs=[input_images, label_dir],\n", + " outputs=[output_dir],\n", + " runconfig=batchai_run_config,\n", + " source_directory=scripts_folder\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "pipeline = Pipeline(workspace=ws, steps=[batch_score_step])\n", + "pipeline_run = Experiment(ws, 'batch_scoring').submit(pipeline, pipeline_params={\"param_batch_size\": 20})" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Monitor run" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from azureml.train.widgets import RunDetails\n", + "RunDetails(pipeline_run).show()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "pipeline_run.wait_for_completion(show_output=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Download and review output" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "step_run = list(pipeline_run.get_children())[0]\n", + "step_run.download_file(\"./outputs/result-labels.txt\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import pandas as pd\n", + "df = pd.read_csv(\"result-labels.txt\", delimiter=\":\", header=None)\n", + "df.columns = [\"Filename\", \"Prediction\"]\n", + "df.head()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Publish a pipeline and rerun using a REST call" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Create a published pipeline" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "published_pipeline = pipeline_run.publish_pipeline(\n", + " name=\"Inception v3 scoring\", description=\"Batch scoring using Inception v3 model\", version=\"1.0\")\n", + "\n", + "published_id = published_pipeline.id" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Rerun using REST call" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Get AAD token" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from azureml.core.authentication import AzureCliAuthentication\n", + "import requests\n", + "\n", + "cli_auth = AzureCliAuthentication()\n", + "aad_token = cli_auth.get_authentication_header()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Run published pipeline using its REST endpoint" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from azureml.pipeline.core import PublishedPipeline\n", + "\n", + "rest_endpoint = PublishedPipeline.get_endpoint(published_id, ws)\n", + "# specify batch size when running the pipeline\n", + "response = requests.post(rest_endpoint, headers=aad_token, json={\"param_batch_size\": 50})\n", + "run_id = response.json()[\"Id\"]" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Monitor the new run" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from azureml.pipeline.core.run import PipelineRun\n", + "published_pipeline_run = PipelineRun(ws.experiments()[\"batch_scoring\"], run_id)\n", + "\n", + "RunDetails(published_pipeline_run).show()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3.6", + "language": "python", + "name": "python36" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.6" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +}