{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "Copyright (c) Microsoft Corporation. All rights reserved.\n", "\n", "Licensed under the MIT License." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This notebook demonstrates how to run batch scoring job. __[Inception-V3 model](https://arxiv.org/abs/1512.00567)__ and unlabeled images from __[ImageNet](http://image-net.org/)__ dataset will be used. It registers a pretrained inception model in model registry then uses the model to do batch scoring on images in a blob container." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Prerequisites\n", "Make sure you go through the [00. Installation and Configuration](./00.configuration.ipynb) Notebook first if you haven't.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "from azureml.core import Workspace, Run, Experiment\n", "\n", "ws = Workspace.from_config()\n", "print('Workspace name: ' + ws.name, \n", " 'Azure region: ' + ws.location, \n", " 'Subscription id: ' + ws.subscription_id, \n", " 'Resource group: ' + ws.resource_group, sep = '\\n')\n", "\n", "# Also create a Project and attach to Workspace\n", "scripts_folder = \"scripts\"\n", "\n", "if not os.path.isdir(scripts_folder):\n", " os.mkdir(scripts_folder)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from azureml.core.compute import BatchAiCompute, ComputeTarget\n", "from azureml.core.datastore import Datastore\n", "from azureml.data.data_reference import DataReference\n", "from azureml.pipeline.core import Pipeline, PipelineData\n", "from azureml.pipeline.steps import PythonScriptStep\n", "from azureml.core.runconfig import CondaDependencies, RunConfiguration" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create and attach Compute targets\n", "Use the below code to create and attach Compute targets. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "\n", "# choose a name for your cluster\n", "batchai_cluster_name = os.environ.get(\"BATCHAI_CLUSTER_NAME\", \"gpu-cluster\")\n", "cluster_min_nodes = os.environ.get(\"BATCHAI_CLUSTER_MIN_NODES\", 0)\n", "cluster_max_nodes = os.environ.get(\"BATCHAI_CLUSTER_MAX_NODES\", 1)\n", "vm_size = os.environ.get(\"BATCHAI_CLUSTER_SKU\", \"STANDARD_NC6\")\n", "autoscale_enabled = os.environ.get(\"BATCHAI_CLUSTER_AUTOSCALE_ENABLED\", True)\n", "\n", "\n", "if batchai_cluster_name in ws.compute_targets:\n", " compute_target = ws.compute_targets[batchai_cluster_name]\n", " if compute_target and type(compute_target) is BatchAiCompute:\n", " print('found compute target. just use it. ' + batchai_cluster_name)\n", "else:\n", " print('creating a new compute target...')\n", " provisioning_config = BatchAiCompute.provisioning_configuration(vm_size = vm_size, # NC6 is GPU-enabled\n", " vm_priority = 'lowpriority', # optional\n", " autoscale_enabled = autoscale_enabled,\n", " cluster_min_nodes = cluster_min_nodes, \n", " cluster_max_nodes = cluster_max_nodes)\n", "\n", " # create the cluster\n", " compute_target = ComputeTarget.create(ws, batchai_cluster_name, provisioning_config)\n", " \n", " # can poll for a minimum number of nodes and for a specific timeout. \n", " # if no min node count is provided it will use the scale settings for the cluster\n", " compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)\n", " \n", " # For a more detailed view of current BatchAI cluster status, use the 'status' property \n", " print(compute_target.status.serialize())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Python scripts to run" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Python scripts that run the batch scoring. `batchai_score.py` takes input images in `dataset_path`, pretrained models in `model_dir` and outputs a `results-label.txt` to `output_dir`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile $scripts_folder/batchai_score.py\n", "import os\n", "import argparse\n", "import datetime,time\n", "import tensorflow as tf\n", "from math import ceil\n", "import numpy as np\n", "import shutil\n", "from tensorflow.contrib.slim.python.slim.nets import inception_v3\n", "from azureml.core.model import Model\n", "\n", "slim = tf.contrib.slim\n", "\n", "parser = argparse.ArgumentParser(description=\"Start a tensorflow model serving\")\n", "parser.add_argument('--model_name', dest=\"model_name\", required=True)\n", "parser.add_argument('--label_dir', dest=\"label_dir\", required=True)\n", "parser.add_argument('--dataset_path', dest=\"dataset_path\", required=True)\n", "parser.add_argument('--output_dir', dest=\"output_dir\", required=True)\n", "parser.add_argument('--batch_size', dest=\"batch_size\", type=int, required=True)\n", "\n", "args = parser.parse_args()\n", "\n", "image_size = 299\n", "num_channel = 3\n", "\n", "# create output directory if it does not exist\n", "os.makedirs(args.output_dir, exist_ok=True)\n", "\n", "def get_class_label_dict(label_file):\n", " label = []\n", " proto_as_ascii_lines = tf.gfile.GFile(label_file).readlines()\n", " for l in proto_as_ascii_lines:\n", " label.append(l.rstrip())\n", " return label\n", "\n", "\n", "class DataIterator:\n", " def __init__(self, data_dir):\n", " self.file_paths = []\n", " image_list = os.listdir(data_dir)\n", " total_size = len(image_list)\n", " self.file_paths = [data_dir + '/' + file_name.rstrip() for file_name in image_list ]\n", "\n", " self.labels = [1 for file_name in self.file_paths]\n", "\n", " @property\n", " def size(self):\n", " return len(self.labels)\n", "\n", " def input_pipeline(self, batch_size):\n", " images_tensor = tf.convert_to_tensor(self.file_paths, dtype=tf.string)\n", " labels_tensor = tf.convert_to_tensor(self.labels, dtype=tf.int64)\n", " input_queue = tf.train.slice_input_producer([images_tensor, labels_tensor], shuffle=False)\n", " labels = input_queue[1]\n", " images_content = tf.read_file(input_queue[0])\n", "\n", " image_reader = tf.image.decode_jpeg(images_content, channels=num_channel, name=\"jpeg_reader\")\n", " float_caster = tf.cast(image_reader, tf.float32)\n", " new_size = tf.constant([image_size, image_size], dtype=tf.int32)\n", " images = tf.image.resize_images(float_caster, new_size)\n", " images = tf.divide(tf.subtract(images, [0]), [255])\n", "\n", " image_batch, label_batch = tf.train.batch([images, labels], batch_size=batch_size, capacity=5 * batch_size)\n", " return image_batch\n", "\n", "def main(_):\n", " start_time = datetime.datetime.now()\n", " label_file_name = os.path.join(args.label_dir, \"labels.txt\")\n", " label_dict = get_class_label_dict(label_file_name)\n", " classes_num = len(label_dict)\n", " test_feeder = DataIterator(data_dir=args.dataset_path)\n", " total_size = len(test_feeder.labels)\n", " count = 0\n", " # get model from model registry\n", " model_path = Model.get_model_path(args.model_name)\n", " with tf.Session() as sess:\n", " test_images = test_feeder.input_pipeline(batch_size=args.batch_size)\n", " with slim.arg_scope(inception_v3.inception_v3_arg_scope()):\n", " input_images = tf.placeholder(tf.float32, [args.batch_size, image_size, image_size, num_channel])\n", " logits, _ = inception_v3.inception_v3(input_images,\n", " num_classes=classes_num,\n", " is_training=False)\n", " probabilities = tf.argmax(logits, 1)\n", "\n", " sess.run(tf.global_variables_initializer())\n", " sess.run(tf.local_variables_initializer())\n", " coord = tf.train.Coordinator()\n", " threads = tf.train.start_queue_runners(sess=sess, coord=coord)\n", " saver = tf.train.Saver()\n", " saver.restore(sess, model_path)\n", " out_filename = os.path.join(args.output_dir, \"result-labels.txt\")\n", " with open(out_filename, \"w\") as result_file:\n", " i = 0\n", " while count < total_size and not coord.should_stop():\n", " test_images_batch = sess.run(test_images)\n", " file_names_batch = test_feeder.file_paths[i*args.batch_size: min(test_feeder.size, (i+1)*args.batch_size)]\n", " results = sess.run(probabilities, feed_dict={input_images: test_images_batch})\n", " new_add = min(args.batch_size, total_size-count)\n", " count += new_add\n", " i += 1\n", " for j in range(new_add):\n", " result_file.write(os.path.basename(file_names_batch[j]) + \": \" + label_dict[results[j]] + \"\\n\")\n", " result_file.flush()\n", " coord.request_stop()\n", " coord.join(threads)\n", " \n", " # copy the file to artifacts\n", " shutil.copy(out_filename, \"./outputs/\")\n", " # Move the processed data out of the blob so that the next run can process the data.\n", "\n", "if __name__ == \"__main__\":\n", " tf.app.run()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Prepare Model and Input data" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Download Model\n", "\n", "Download and extract model from http://download.tensorflow.org/models/inception_v3_2016_08_28.tar.gz to `\"models\"`" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# create directory for model\n", "model_dir = 'models'\n", "if not os.path.isdir(model_dir):\n", " os.mkdir(model_dir)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import tarfile\n", "import urllib.request\n", "\n", "url=\"http://download.tensorflow.org/models/inception_v3_2016_08_28.tar.gz\"\n", "response = urllib.request.urlretrieve(url, \"model.tar.gz\")\n", "tar = tarfile.open(\"model.tar.gz\", \"r:gz\")\n", "tar.extractall(model_dir)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create a datastore that points to blob container containing sample images\n", "\n", "We have created a public blob container `sampledata` on an account named `pipelinedata` containing images from ImageNet evaluation set. In the next step, we create a datastore with name `images_datastore` that points to this container. The `overwrite=True` step overwrites any datastore that was created previously with that name. \n", "\n", "This step can be changed to point to your blob container by providing an additional `account_key` parameter with `account_name`. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "account_name = \"pipelinedata\"\n", "sample_data = Datastore.register_azure_blob_container(ws, datastore_name=\"images_datastore\", container_name=\"sampledata\", \n", " account_name=account_name, \n", " overwrite=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Output datastore" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We write the outputs to the default datastore" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "default_ds = ws.get_default_datastore()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Specify where the data is stored or will be written to" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from azureml.core.conda_dependencies import CondaDependencies\n", "from azureml.data.data_reference import DataReference\n", "from azureml.pipeline.core import Pipeline, PipelineData\n", "from azureml.core import Datastore\n", "from azureml.core import Experiment" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "input_images = DataReference(datastore=sample_data, \n", " data_reference_name=\"input_images\",\n", " path_on_datastore=\"batchscoring/images\",\n", " mode=\"download\"\n", " )\n", "model_dir = DataReference(datastore=sample_data, \n", " data_reference_name=\"input_model\",\n", " path_on_datastore=\"batchscoring/models\",\n", " mode=\"download\" \n", " )\n", "label_dir = DataReference(datastore=sample_data, \n", " data_reference_name=\"input_labels\",\n", " path_on_datastore=\"batchscoring/labels\",\n", " mode=\"download\" \n", " )\n", "output_dir = PipelineData(name=\"scores\", \n", " datastore=default_ds, \n", " output_path_on_compute=\"batchscoring/results\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Register the model with Workspace" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import shutil\n", "from azureml.core.model import Model\n", "\n", "# register downloaded model \n", "model = Model.register(model_path = \"models/inception_v3.ckpt\",\n", " model_name = \"inception\", # this is the name the model is registered as\n", " tags = {'pretrained': \"inception\"},\n", " description = \"Imagenet trained tensorflow inception\",\n", " workspace = ws)\n", "# remove the downloaded dir after registration if you wish\n", "shutil.rmtree(\"models\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Specify environment to run the script" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "cd = CondaDependencies.create(pip_packages=[\"tensorflow-gpu==1.4.0\", \"azureml-defaults\"])\n", "\n", "# Runconfig\n", "batchai_run_config = RunConfiguration(conda_dependencies=cd)\n", "batchai_run_config.environment.docker.enabled = True\n", "batchai_run_config.environment.docker.gpu_support = True\n", "batchai_run_config.environment.docker.base_image = \"microsoft/mmlspark:gpu-0.12\"\n", "batchai_run_config.environment.spark.precache_packages = False" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Steps to run" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "A subset of the parameters to the python script can be given as input when we re-run a `PublishedPipeline`. In the current example, we define `batch_size` taken by the script as such parameter." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from azureml.pipeline.core.graph import PipelineParameter\n", "batch_size_param = PipelineParameter(name=\"param_batch_size\", default_value=20)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "inception_model_name = \"inception_v3.ckpt\"\n", "\n", "batch_score_step = PythonScriptStep(\n", " name=\"batch ai scoring\",\n", " script_name=\"batchai_score.py\",\n", " arguments=[\"--dataset_path\", input_images, \n", " \"--model_name\", \"inception\",\n", " \"--label_dir\", label_dir, \n", " \"--output_dir\", output_dir, \n", " \"--batch_size\", batch_size_param],\n", " target=compute_target,\n", " inputs=[input_images, label_dir],\n", " outputs=[output_dir],\n", " runconfig=batchai_run_config,\n", " source_directory=scripts_folder\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pipeline = Pipeline(workspace=ws, steps=[batch_score_step])\n", "pipeline_run = Experiment(ws, 'batch_scoring').submit(pipeline, pipeline_params={\"param_batch_size\": 20})" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Monitor run" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from azureml.train.widgets import RunDetails\n", "RunDetails(pipeline_run).show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pipeline_run.wait_for_completion(show_output=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Download and review output" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "step_run = list(pipeline_run.get_children())[0]\n", "step_run.download_file(\"./outputs/result-labels.txt\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "df = pd.read_csv(\"result-labels.txt\", delimiter=\":\", header=None)\n", "df.columns = [\"Filename\", \"Prediction\"]\n", "df.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Publish a pipeline and rerun using a REST call" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create a published pipeline" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "published_pipeline = pipeline_run.publish_pipeline(\n", " name=\"Inception v3 scoring\", description=\"Batch scoring using Inception v3 model\", version=\"1.0\")\n", "\n", "published_id = published_pipeline.id" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Rerun using REST call" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Get AAD token" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from azureml.core.authentication import AzureCliAuthentication\n", "import requests\n", "\n", "cli_auth = AzureCliAuthentication()\n", "aad_token = cli_auth.get_authentication_header()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Run published pipeline using its REST endpoint" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from azureml.pipeline.core import PublishedPipeline\n", "\n", "rest_endpoint = published_pipeline.endpoint\n", "# specify batch size when running the pipeline\n", "response = requests.post(rest_endpoint, \n", " headers=aad_token, \n", " json={\"ExperimentName\": \"batch_scoring\",\n", " \"ParameterAssignments\": {\"param_batch_size\": 50}})\n", "run_id = response.json()[\"Id\"]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Monitor the new run" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from azureml.pipeline.core.run import PipelineRun\n", "published_pipeline_run = PipelineRun(ws.experiments[\"batch_scoring\"], run_id)\n", "\n", "RunDetails(published_pipeline_run).show()" ] } ], "metadata": { "authors": [ { "name": "hichando" } ], "kernelspec": { "display_name": "Python 3.6", "language": "python", "name": "python36" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.6" } }, "nbformat": 4, "nbformat_minor": 2 }