Update notebooks

This commit is contained in:
Roope Astala
2018-09-14 15:14:43 -04:00
parent 01a12c0b74
commit 8178484586
40 changed files with 14985 additions and 67 deletions

View File

@@ -0,0 +1,502 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Copyright (c) Microsoft Corporation. All rights reserved.\n",
"\n",
"Licensed under the MIT License."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# 40. Tensorboard Integration with Run History\n",
"\n",
"1. Run a Tensorflow job locally and view its TB output live.\n",
"2. The same, for a DSVM.\n",
"3. And once more, with Batch AI.\n",
"4. Finally, we'll collect all of these historical runs together into a single Tensorboard graph."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Prerequisites\n",
"Make sure you go through the [00. Installation and Configuration](00.configuration.ipynb) Notebook first if you haven't."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Check core SDK version number\n",
"import azureml.core\n",
"\n",
"print(\"SDK version:\", azureml.core.VERSION)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Initialize Workspace\n",
"\n",
"Initialize a workspace object from persisted configuration."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.core import Workspace\n",
"\n",
"ws = Workspace.from_config()\n",
"print('Workspace name: ' + ws.name, \n",
" 'Azure region: ' + ws.location, \n",
" 'Subscription id: ' + ws.subscription_id, \n",
" 'Resource group: ' + ws.resource_group, sep = '\\n')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Set experiment name and create project\n",
"Choose a name for your run history container in the workspace, and create a folder for the project."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from os import path, makedirs\n",
"experiment_name = 'tensorboard-demo'\n",
"\n",
"# experiment folder\n",
"exp_dir = './sample_projects/' + experiment_name\n",
"\n",
"if not path.exists(exp_dir):\n",
" makedirs(exp_dir)\n",
"\n",
"# runs we started in this session, for the finale\n",
"runs = []"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Download Tensorflow Tensorboard demo code\n",
"\n",
"Tensorflow's repository has an MNIST demo with extensive Tensorboard instrumentation. We'll use it here for our purposes.\n",
"\n",
"Note that we don't need to make any code changes at all - the code works without modification from the Tensorflow repository."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import requests\n",
"import os\n",
"import tempfile\n",
"tf_code = requests.get(\"https://raw.githubusercontent.com/tensorflow/tensorflow/r1.8/tensorflow/examples/tutorials/mnist/mnist_with_summaries.py\")\n",
"with open(os.path.join(exp_dir, \"mnist_with_summaries.py\"), \"w\") as file:\n",
" file.write(tf_code.text)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Configure and run locally\n",
"\n",
"We'll start by running this locally. While it might not initially seem that useful to use this for a local run - why not just run TB against the files generated locally? - even in this case there is some value to using this feature. Your local run will be registered in the run history, and your Tensorboard logs will be uploaded to the artifact store associated with this run. Later, you'll be able to restore the logs from any run, regardless of where it happened.\n",
"\n",
"Note that for this run, you will need to install Tensorflow on your local machine by yourself. Further, the Tensorboard module (that is, the one included with Tensorflow) must be accessible to this notebook's kernel, as the local machine is what runs Tensorboard."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.core.runconfig import RunConfiguration\n",
"\n",
"# Create a run configuration.\n",
"run_config = RunConfiguration()\n",
"run_config.environment.python.user_managed_dependencies = True\n",
"\n",
"# You can choose a specific Python environment by pointing to a Python path \n",
"#run_config.environment.python.interpreter_path = '/home/ninghai/miniconda3/envs/sdk2/bin/python'"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.core import Experiment, Run\n",
"from azureml.core.script_run_config import ScriptRunConfig\n",
"import tensorflow as tf\n",
"\n",
"logs_dir = os.curdir + os.sep + \"logs\"\n",
"tensorflow_logs_dir = os.path.join(logs_dir, \"tensorflow\")\n",
"\n",
"if not path.exists(tensorflow_logs_dir):\n",
" makedirs(tensorflow_logs_dir)\n",
"\n",
"os.environ[\"TEST_TMPDIR\"] = logs_dir\n",
"\n",
"# Writing logs to ./logs results in their being uploaded to Artifact Service,\n",
"# and thus, made accessible to our Tensorboard instance.\n",
"arguments_list = [\"--log_dir\", logs_dir]\n",
"\n",
"# Create an experiment\n",
"exp = Experiment(ws, experiment_name)\n",
"\n",
"script = ScriptRunConfig(exp_dir,\n",
" script=\"mnist_with_summaries.py\",\n",
" run_config=run_config)\n",
"\n",
"# If you would like the run to go for longer, add --max_steps 5000 to the arguments list:\n",
"# arguments_list += [\"--max_steps\", \"5000\"]\n",
"kwargs = {}\n",
"kwargs['arguments_list'] = arguments_list\n",
"run = exp.submit(script, kwargs)\n",
"# You can also wait for the run to complete\n",
"# run.wait_for_completion(show_output=True)\n",
"runs.append(run)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Start Tensorboard\n",
"\n",
"Now, while the run is in progress, we just need to start Tensorboard with the run as its target, and it will begin streaming logs."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.contrib.tensorboard import Tensorboard\n",
"\n",
"# The Tensorboard constructor takes an array of runs, so be sure and pass it in as a single-element array here\n",
"tb = Tensorboard([run])\n",
"\n",
"# If successful, start() returns a string with the URI of the instance.\n",
"tb.start()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Stop Tensorboard\n",
"\n",
"When you're done, make sure to call the `stop()` method of the Tensorboard object, or it will stay running even after your job completes."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"tb.stop()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Now, with a DSVM\n",
"\n",
"Tensorboard uploading works with all compute targets. Here we demonstrate it from a DSVM.\n",
"Note that the Tensorboard instance itself will be run by the notebook kernel. Again, this means this notebook's kernel must have access to the Tensorboard module.\n",
"\n",
"If you are unfamiliar with DSVM configuration, check [04. Train in a remote VM (Ubuntu DSVM)](04.train-on-remote-vm.ipynb) for a more detailed breakdown."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.core.compute import DsvmCompute\n",
"from azureml.core.compute_target import ComputeTargetException\n",
"\n",
"compute_target_name = 'cpu-dsvm'\n",
"\n",
"try:\n",
" compute_target = DsvmCompute(workspace = ws, name = compute_target_name)\n",
" print('found existing:', compute_target.name)\n",
"except ComputeTargetException:\n",
" print('creating new.')\n",
" dsvm_config = DsvmCompute.provisioning_configuration(vm_size = \"Standard_D2_v2\")\n",
" compute_target = DsvmCompute.create(ws, name = compute_target_name, provisioning_configuration = dsvm_config)\n",
" compute_target.wait_for_completion(show_output = True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Submit run using TensorFlow estimator\n",
"\n",
"Instead of manually configuring the DSVM environment, we can use the TensorFlow estimator and everything is set up automatically."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.train.dnn import TensorFlow\n",
"\n",
"script_params = {\"--log_dir\": \"./logs\"}\n",
"\n",
"# If you want the run to go longer, set --max-steps to a higher number.\n",
"# script_params[\"--max_steps\"] = \"5000\"\n",
"\n",
"tf_estimator = TensorFlow(source_directory=exp_dir,\n",
" compute_target=compute_target,\n",
" entry_script='mnist_with_summaries.py',\n",
" script_params=script_params)\n",
"\n",
"run = exp.submit(tf_estimator)\n",
"\n",
"runs.append(run)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Start Tensorboard with this run\n",
"\n",
"Just like before."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# The Tensorboard constructor takes an array of runs, so be sure and pass it in as a single-element array here\n",
"tb = Tensorboard([run])\n",
"\n",
"# If successful, start() returns a string with the URI of the instance.\n",
"tb.start()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Stop Tensorboard\n",
"\n",
"When you're done, make sure to call the `stop()` method of the Tensorboard object, or it will stay running even after your job completes."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"tb.stop()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Once more, with a Batch AI cluster\n",
"\n",
"Just to prove we can, let's create a Batch AI cluster using MLC, and run our demo there, as well."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.core.compute import BatchAiCompute\n",
"\n",
"clust_name = ws.name + \"cpu\"\n",
"\n",
"try:\n",
" # If you already have a cluster named this, we don't need to make a new one.\n",
" compute_target = [ct for ct in ws.compute_targets() if ct.name == clust_name and ct.type == 'BatchAI'][0]\n",
"except:\n",
" # Let's make a new one here.\n",
" provisioning_config = BatchAiCompute.provisioning_configuration(cluster_max_nodes=2, \n",
" autoscale_enabled=True, \n",
" cluster_min_nodes=1,\n",
" vm_size='Standard_D11_V2')\n",
" \n",
" compute_target = BatchAiCompute.create(ws, clust_name, provisioning_config)\n",
" compute_target.wait_for_completion(show_output=True, min_node_count=1, timeout_in_minutes=20)\n",
"print(compute_target.name)\n",
" # For a more detailed view of current BatchAI cluster status, use the 'status' property \n",
" # print(compute_target.status.serialize())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Submit run using TensorFlow estimator\n",
"\n",
"Again, we can use the TensorFlow estimator and everything is set up automatically."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"script_params = {\"--log_dir\": \"./logs\"}\n",
"\n",
"# If you want the run to go longer, set --max-steps to a higher number.\n",
"# script_params[\"--max_steps\"] = \"5000\"\n",
"\n",
"tf_estimator = TensorFlow(source_directory=exp_dir,\n",
" compute_target=compute_target,\n",
" entry_script='mnist_with_summaries.py',\n",
" script_params=script_params)\n",
"\n",
"run = exp.submit(tf_estimator)\n",
"\n",
"runs.append(run)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Start Tensorboard with this run\n",
"\n",
"Once more..."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# The Tensorboard constructor takes an array of runs, so be sure and pass it in as a single-element array here\n",
"tb = Tensorboard([run])\n",
"\n",
"# If successful, start() returns a string with the URI of the instance.\n",
"tb.start()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Stop Tensorboard\n",
"\n",
"When you're done, make sure to call the `stop()` method of the Tensorboard object, or it will stay running even after your job completes."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"tb.stop()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Finale\n",
"\n",
"If you've paid close attention, you'll have noticed that we've been saving the run objects in an array as we went along. We can start a Tensorboard instance that combines all of these run objects into a single process. This way, you can compare historical runs. You can even do this with live runs; if you made some of those previous runs longer via the `--max_steps` parameter, they might still be running, and you'll see them live in this instance as well."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# The Tensorboard constructor takes an array of runs...\n",
"# and it turns out that we have been building one of those all along.\n",
"tb = Tensorboard(runs)\n",
"\n",
"# If successful, start() returns a string with the URI of the instance.\n",
"tb.start()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Stop Tensorboard\n",
"\n",
"As you might already know, make sure to call the `stop()` method of the Tensorboard object, or it will stay running (until you kill the kernel associated with this notebook, at least)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"tb.stop()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.5"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

View File

@@ -0,0 +1,243 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Copyright (c) Microsoft Corporation. All rights reserved.\n",
"\n",
"Licensed under the MIT License."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# 41. Export Run History as Tensorboard logs\n",
"\n",
"1. Run some training and log some metrics into Run History\n",
"2. Export the run history to some directory as Tensorboard logs\n",
"3. Launch a local Tensorboard to view the run history"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Prerequisites\n",
"Make sure you go through the [00. Installation and Configuration](00.configuration.ipynb) Notebook first if you haven't."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Check core SDK version number\n",
"import azureml.core\n",
"\n",
"print(\"SDK version:\", azureml.core.VERSION)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Initialize Workspace\n",
"\n",
"Initialize a workspace object from persisted configuration."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.core import Workspace, Run, Experiment\n",
"\n",
"\n",
"ws = Workspace.from_config()\n",
"print('Workspace name: ' + ws.name, \n",
" 'Azure region: ' + ws.location, \n",
" 'Subscription id: ' + ws.subscription_id, \n",
" 'Resource group: ' + ws.resource_group, sep = '\\n')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Set experiment name and start the run"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"experiment_name = 'export-to-tensorboard'\n",
"exp = Experiment(ws, experiment_name)\n",
"root_run = exp.start_logging()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# load diabetes dataset, a well-known built-in small dataset that comes with scikit-learn\n",
"from sklearn.datasets import load_diabetes\n",
"from sklearn.linear_model import Ridge\n",
"from sklearn.metrics import mean_squared_error\n",
"from sklearn.model_selection import train_test_split\n",
"\n",
"X, y = load_diabetes(return_X_y=True)\n",
"\n",
"columns = ['age', 'gender', 'bmi', 'bp', 's1', 's2', 's3', 's4', 's5', 's6']\n",
"\n",
"x_train, x_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=0)\n",
"data = {\n",
" \"train\":{\"x\":x_train, \"y\":y_train}, \n",
" \"test\":{\"x\":x_test, \"y\":y_test}\n",
"}"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Example experiment\n",
"from tqdm import tqdm\n",
"\n",
"alphas = [.1, .2, .3, .4, .5, .6 , .7]\n",
"\n",
"# try a bunch of alpha values in a Linear Regression (Ridge) model\n",
"for alpha in tqdm(alphas):\n",
" # create a bunch of child runs\n",
" with root_run.child_run(\"alpha\" + str(alpha)) as run:\n",
" # More data science stuff\n",
" reg = Ridge(alpha=alpha)\n",
" reg.fit(data[\"train\"][\"x\"], data[\"train\"][\"y\"])\n",
" # TODO save model\n",
" preds = reg.predict(data[\"test\"][\"x\"])\n",
" mse = mean_squared_error(preds, data[\"test\"][\"y\"])\n",
" # End train and eval\n",
"\n",
" # log alpha, mean_squared_error and feature names in run history\n",
" root_run.log(\"alpha\", alpha)\n",
" root_run.log(\"mse\", mse)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Export Run History to Tensorboard logs"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Export Run History to Tensorboard logs\n",
"from azureml.contrib.tensorboard.export import export_to_tensorboard\n",
"import os\n",
"import tensorflow as tf\n",
"\n",
"logdir = 'exportedTBlogs'\n",
"log_path = os.path.join(os.getcwd(), logdir)\n",
"try:\n",
" os.stat(log_path)\n",
"except os.error:\n",
" os.mkdir(log_path)\n",
"print(logdir)\n",
"\n",
"# export run history for the project\n",
"export_to_tensorboard(root_run, logdir)\n",
"\n",
"# or export a particular run\n",
"# export_to_tensorboard(run, logdir)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"root_run.complete()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Start Tensorboard\n",
"\n",
"Or you can start the Tensorboard outside this notebook to view the result"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.contrib.tensorboard import Tensorboard\n",
"\n",
"# The Tensorboard constructor takes an array of runs, so be sure and pass it in as a single-element array here\n",
"tb = Tensorboard([], local_root=logdir, port=6006)\n",
"\n",
"# If successful, start() returns a string with the URI of the instance.\n",
"tb.start()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Stop Tensorboard\n",
"\n",
"When you're done, make sure to call the `stop()` method of the Tensorboard object."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"tb.stop()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.5"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

View File

@@ -0,0 +1,500 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Copyright (c) Microsoft Corporation. All rights reserved.\n",
"\n",
"Licensed under the MIT License."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# 50. Distributed Tensorflow Horovod\n",
"\n",
"In this tutorial we demonstrate how to use the Azure ML Training SDK to train Tensorflow model in a distributed manner using Horovod framework.\n",
"\n",
"# Prerequisites\n",
"\n",
"Make sure you go through the [00. Installation and Configuration](00.configuration.ipynb) Notebook first if you haven't."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Check core SDK version number\n",
"import azureml.core\n",
"\n",
"print(\"SDK version:\", azureml.core.VERSION)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.core.workspace import Workspace\n",
"\n",
"ws = Workspace.from_config()\n",
"print('Workspace name: ' + ws.name, \n",
" 'Azure region: ' + ws.location, \n",
" 'Subscription id: ' + ws.subscription_id, \n",
" 'Resource group: ' + ws.resource_group, sep = '\\n')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import getpass\n",
"import os\n",
"from azureml.core.experiment import Experiment\n",
"\n",
"username = getpass.getuser().replace('-','')\n",
"\n",
"# choose a name for the run history container in the workspace\n",
"experiment = Experiment(ws, username + '-horovod')\n",
"\n",
"# project folder name\n",
"project_folder = './samples/distributed-tensorflow-horovod'\n",
"os.makedirs(project_folder, exist_ok = True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This recipe is using a MLC-managed Batch AI cluster. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.core.compute import BatchAiCompute\n",
"from azureml.core.compute import ComputeTarget\n",
"\n",
"batchai_cluster_name='gpucluster'\n",
"\n",
"\n",
"try:\n",
" # Check for existing cluster\n",
" compute_target = ComputeTarget(ws,batchai_cluster_name)\n",
" print('Found existing compute target')\n",
"except:\n",
" # Else, create new one\n",
" print('Creating a new compute target...')\n",
" provisioning_config = BatchAiCompute.provisioning_configuration(vm_size = \"STANDARD_NC6\", # NC6 is GPU-enabled\n",
" #vm_priority = 'lowpriority', # optional\n",
" autoscale_enabled = True,\n",
" cluster_min_nodes = 0, \n",
" cluster_max_nodes = 4)\n",
" compute_target = ComputeTarget.create(ws, batchai_cluster_name, provisioning_config)\n",
" # can poll for a minimum number of nodes and for a specific timeout. \n",
" # if no min node count is provided it will use the scale settings for the cluster\n",
" compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)\n",
"\n",
" # For a more detailed view of current BatchAI cluster status, use the 'status' property \n",
"print(compute_target.status.serialize())"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%writefile {project_folder}/word2vec.py\n",
"\n",
"# Copyright 2015 The TensorFlow Authors. All Rights Reserved.\n",
"# Modifications copyright (C) 2017 Uber Technologies, Inc.\n",
"#\n",
"# Licensed under the Apache License, Version 2.0 (the \"License\");\n",
"# you may not use this file except in compliance with the License.\n",
"# You may obtain a copy of the License at\n",
"#\n",
"# http://www.apache.org/licenses/LICENSE-2.0\n",
"#\n",
"# Unless required by applicable law or agreed to in writing, software\n",
"# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
"# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
"# See the License for the specific language governing permissions and\n",
"# limitations under the License.\n",
"# ==============================================================================\n",
"\"\"\"Basic word2vec example.\"\"\"\n",
"\n",
"from __future__ import absolute_import\n",
"from __future__ import division\n",
"from __future__ import print_function\n",
"\n",
"import collections\n",
"import math\n",
"import os\n",
"import random\n",
"import zipfile\n",
"import argparse\n",
"\n",
"import numpy as np\n",
"from six.moves import urllib\n",
"from six.moves import xrange # pylint: disable=redefined-builtin\n",
"import tensorflow as tf\n",
"import horovod.tensorflow as hvd\n",
"from azureml.core.run import Run\n",
"\n",
"# Horovod: initialize Horovod.\n",
"hvd.init()\n",
"\n",
"parser = argparse.ArgumentParser()\n",
"parser.add_argument('--data_dir', type=str, help='input directory')\n",
"\n",
"args = parser.parse_args()\n",
"\n",
"data_dir = args.data_dir\n",
"print(\"the input data_dir is %s\" % data_dir)\n",
"\n",
"# Step 1: Download the data.\n",
"url = 'http://mattmahoney.net/dc/text8.zip'\n",
"\n",
"\n",
"def maybe_download(filename, expected_bytes):\n",
" \"\"\"Download a file if not present, and make sure it's the right size.\"\"\"\n",
" if not filename:\n",
" filename = \"text8.zip\"\n",
" if not os.path.exists(filename):\n",
" print(\"Downloading the data from http://mattmahoney.net/dc/text8.zip\")\n",
" filename, _ = urllib.request.urlretrieve(url, filename)\n",
" else:\n",
" print(\"Use the data from the input data_dir %s\" % data_dir)\n",
" statinfo = os.stat(filename)\n",
" if statinfo.st_size == expected_bytes:\n",
" print('Found and verified', filename)\n",
" else:\n",
" print(statinfo.st_size)\n",
" raise Exception(\n",
" 'Failed to verify ' + url + '. Can you get to it with a browser?')\n",
" return filename\n",
"\n",
"filename = maybe_download(data_dir, 31344016)\n",
"\n",
"\n",
"# Read the data into a list of strings.\n",
"def read_data(filename):\n",
" \"\"\"Extract the first file enclosed in a zip file as a list of words.\"\"\"\n",
" with zipfile.ZipFile(filename) as f:\n",
" data = tf.compat.as_str(f.read(f.namelist()[0])).split()\n",
" return data\n",
"\n",
"vocabulary = read_data(filename)\n",
"print('Data size', len(vocabulary))\n",
"\n",
"# Step 2: Build the dictionary and replace rare words with UNK token.\n",
"vocabulary_size = 50000\n",
"\n",
"\n",
"def build_dataset(words, n_words):\n",
" \"\"\"Process raw inputs into a dataset.\"\"\"\n",
" count = [['UNK', -1]]\n",
" count.extend(collections.Counter(words).most_common(n_words - 1))\n",
" dictionary = dict()\n",
" for word, _ in count:\n",
" dictionary[word] = len(dictionary)\n",
" data = list()\n",
" unk_count = 0\n",
" for word in words:\n",
" if word in dictionary:\n",
" index = dictionary[word]\n",
" else:\n",
" index = 0 # dictionary['UNK']\n",
" unk_count += 1\n",
" data.append(index)\n",
" count[0][1] = unk_count\n",
" reversed_dictionary = dict(zip(dictionary.values(), dictionary.keys()))\n",
" return data, count, dictionary, reversed_dictionary\n",
"\n",
"data, count, dictionary, reverse_dictionary = build_dataset(vocabulary,\n",
" vocabulary_size)\n",
"del vocabulary # Hint to reduce memory.\n",
"print('Most common words (+UNK)', count[:5])\n",
"print('Sample data', data[:10], [reverse_dictionary[i] for i in data[:10]])\n",
"\n",
"\n",
"# Step 3: Function to generate a training batch for the skip-gram model.\n",
"def generate_batch(batch_size, num_skips, skip_window):\n",
" assert num_skips <= 2 * skip_window\n",
" # Adjust batch_size to match num_skips\n",
" batch_size = batch_size // num_skips * num_skips\n",
" span = 2 * skip_window + 1 # [ skip_window target skip_window ]\n",
" # Backtrack a little bit to avoid skipping words in the end of a batch\n",
" data_index = random.randint(0, len(data) - span - 1)\n",
" batch = np.ndarray(shape=(batch_size), dtype=np.int32)\n",
" labels = np.ndarray(shape=(batch_size, 1), dtype=np.int32)\n",
" buffer = collections.deque(maxlen=span)\n",
" for _ in range(span):\n",
" buffer.append(data[data_index])\n",
" data_index = (data_index + 1) % len(data)\n",
" for i in range(batch_size // num_skips):\n",
" target = skip_window # target label at the center of the buffer\n",
" targets_to_avoid = [skip_window]\n",
" for j in range(num_skips):\n",
" while target in targets_to_avoid:\n",
" target = random.randint(0, span - 1)\n",
" targets_to_avoid.append(target)\n",
" batch[i * num_skips + j] = buffer[skip_window]\n",
" labels[i * num_skips + j, 0] = buffer[target]\n",
" buffer.append(data[data_index])\n",
" data_index = (data_index + 1) % len(data)\n",
" return batch, labels\n",
"\n",
"batch, labels = generate_batch(batch_size=8, num_skips=2, skip_window=1)\n",
"for i in range(8):\n",
" print(batch[i], reverse_dictionary[batch[i]],\n",
" '->', labels[i, 0], reverse_dictionary[labels[i, 0]])\n",
"\n",
"# Step 4: Build and train a skip-gram model.\n",
"\n",
"max_batch_size = 128\n",
"embedding_size = 128 # Dimension of the embedding vector.\n",
"skip_window = 1 # How many words to consider left and right.\n",
"num_skips = 2 # How many times to reuse an input to generate a label.\n",
"\n",
"# We pick a random validation set to sample nearest neighbors. Here we limit the\n",
"# validation samples to the words that have a low numeric ID, which by\n",
"# construction are also the most frequent.\n",
"valid_size = 16 # Random set of words to evaluate similarity on.\n",
"valid_window = 100 # Only pick dev samples in the head of the distribution.\n",
"valid_examples = np.random.choice(valid_window, valid_size, replace=False)\n",
"num_sampled = 64 # Number of negative examples to sample.\n",
"\n",
"graph = tf.Graph()\n",
"\n",
"with graph.as_default():\n",
"\n",
" # Input data.\n",
" train_inputs = tf.placeholder(tf.int32, shape=[None])\n",
" train_labels = tf.placeholder(tf.int32, shape=[None, 1])\n",
" valid_dataset = tf.constant(valid_examples, dtype=tf.int32)\n",
"\n",
" # Look up embeddings for inputs.\n",
" embeddings = tf.Variable(\n",
" tf.random_uniform([vocabulary_size, embedding_size], -1.0, 1.0))\n",
" embed = tf.nn.embedding_lookup(embeddings, train_inputs)\n",
"\n",
" # Construct the variables for the NCE loss\n",
" nce_weights = tf.Variable(\n",
" tf.truncated_normal([vocabulary_size, embedding_size],\n",
" stddev=1.0 / math.sqrt(embedding_size)))\n",
" nce_biases = tf.Variable(tf.zeros([vocabulary_size]))\n",
"\n",
" # Compute the average NCE loss for the batch.\n",
" # tf.nce_loss automatically draws a new sample of the negative labels each\n",
" # time we evaluate the loss.\n",
" loss = tf.reduce_mean(\n",
" tf.nn.nce_loss(weights=nce_weights,\n",
" biases=nce_biases,\n",
" labels=train_labels,\n",
" inputs=embed,\n",
" num_sampled=num_sampled,\n",
" num_classes=vocabulary_size))\n",
"\n",
" # Horovod: adjust learning rate based on number of GPUs.\n",
" optimizer = tf.train.GradientDescentOptimizer(1.0 * hvd.size())\n",
"\n",
" # Horovod: add Horovod Distributed Optimizer.\n",
" optimizer = hvd.DistributedOptimizer(optimizer)\n",
"\n",
" train_op = optimizer.minimize(loss)\n",
"\n",
" # Compute the cosine similarity between minibatch examples and all embeddings.\n",
" norm = tf.sqrt(tf.reduce_sum(tf.square(embeddings), 1, keep_dims=True))\n",
" normalized_embeddings = embeddings / norm\n",
" valid_embeddings = tf.nn.embedding_lookup(\n",
" normalized_embeddings, valid_dataset)\n",
" similarity = tf.matmul(\n",
" valid_embeddings, normalized_embeddings, transpose_b=True)\n",
"\n",
" # Add variable initializer.\n",
" init = tf.global_variables_initializer()\n",
"\n",
" # Horovod: broadcast initial variable states from rank 0 to all other processes.\n",
" # This is necessary to ensure consistent initialization of all workers when\n",
" # training is started with random weights or restored from a checkpoint.\n",
" bcast = hvd.broadcast_global_variables(0)\n",
"\n",
"# Step 5: Begin training.\n",
"\n",
"# Horovod: adjust number of steps based on number of GPUs.\n",
"num_steps = 4000 // hvd.size() + 1\n",
"\n",
"# Horovod: pin GPU to be used to process local rank (one GPU per process)\n",
"config = tf.ConfigProto()\n",
"config.gpu_options.allow_growth = True\n",
"config.gpu_options.visible_device_list = str(hvd.local_rank())\n",
"\n",
"with tf.Session(graph=graph, config=config) as session:\n",
" # We must initialize all variables before we use them.\n",
" init.run()\n",
" bcast.run()\n",
" print('Initialized')\n",
" run = Run.get_submitted_run()\n",
" average_loss = 0\n",
" for step in xrange(num_steps):\n",
" # simulate various sentence length by randomization\n",
" batch_size = random.randint(max_batch_size // 2, max_batch_size)\n",
" batch_inputs, batch_labels = generate_batch(\n",
" batch_size, num_skips, skip_window)\n",
" feed_dict = {train_inputs: batch_inputs, train_labels: batch_labels}\n",
"\n",
" # We perform one update step by evaluating the optimizer op (including it\n",
" # in the list of returned values for session.run()\n",
" _, loss_val = session.run([train_op, loss], feed_dict=feed_dict)\n",
" average_loss += loss_val\n",
"\n",
" if step % 2000 == 0:\n",
" if step > 0:\n",
" average_loss /= 2000\n",
" # The average loss is an estimate of the loss over the last 2000 batches.\n",
" print('Average loss at step ', step, ': ', average_loss)\n",
" run.log(\"Loss\", average_loss)\n",
" average_loss = 0\n",
" final_embeddings = normalized_embeddings.eval()\n",
"\n",
" # Evaluate similarity in the end on worker 0.\n",
" if hvd.rank() == 0:\n",
" sim = similarity.eval()\n",
" for i in xrange(valid_size):\n",
" valid_word = reverse_dictionary[valid_examples[i]]\n",
" top_k = 8 # number of nearest neighbors\n",
" nearest = (-sim[i, :]).argsort()[1:top_k + 1]\n",
" log_str = 'Nearest to %s:' % valid_word\n",
" for k in xrange(top_k):\n",
" close_word = reverse_dictionary[nearest[k]]\n",
" log_str = '%s %s,' % (log_str, close_word)\n",
" print(log_str)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Upload http://mattmahoney.net/dc/text8.zip to the azure blob storage."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ds = ws.get_default_datastore()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"import urllib\n",
"\n",
"os.makedirs('./data', exist_ok = True)\n",
"\n",
"urllib.request.urlretrieve('http://mattmahoney.net/dc/text8.zip', filename = './data/text8.zip')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ds.upload(src_dir = 'data', target_path = 'data', overwrite=True, show_progress = True)\n",
"\n",
"path_on_datastore = \"/data/text8.zip\"\n",
"ds_data = ds.path(path_on_datastore)\n",
"print(ds_data)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.train.dnn import *\n",
"script_params={\n",
" \"--data_dir\": ds_data\n",
"}\n",
"tf_estimator = TensorFlow(source_directory=project_folder,\n",
" compute_target=compute_target,\n",
" entry_script='word2vec.py',\n",
" script_params=script_params,\n",
" node_count=2,\n",
" process_count_per_node=1,\n",
" distributed_backend=\"mpi\",\n",
" use_gpu=False)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"run = experiment.submit(tf_estimator)\n",
"print(run)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.train.widgets import RunDetails\n",
"RunDetails(run).show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"run.wait_for_completion(show_output=True)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python [default]",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.6"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

View File

@@ -0,0 +1,473 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Copyright (c) Microsoft Corporation. All rights reserved.\n",
"\n",
"Licensed under the MIT License."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# 51. Distributed TensorFlow using Parameter Server\n",
"In this tutorial we demonstrate how to use the Azure ML Training SDK to train Tensorflow model in a distributed manner using Parameter Server.\n",
"\n",
"# Prerequisites\n",
"\n",
"Make sure you go through the [00. Installation and Configuration](00.configuration.ipynb) Notebook first if you haven't."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Check core SDK version number\n",
"import azureml.core\n",
"\n",
"print(\"SDK version:\", azureml.core.VERSION)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.core.workspace import Workspace\n",
"\n",
"ws = Workspace.from_config()\n",
"print('Workspace name: ' + ws.name, \n",
" 'Azure region: ' + ws.location, \n",
" 'Subscription id: ' + ws.subscription_id, \n",
" 'Resource group: ' + ws.resource_group, sep = '\\n')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import getpass\n",
"import os\n",
"from azureml.core.experiment import Experiment\n",
"\n",
"username = getpass.getuser().replace('-','')\n",
"\n",
"# choose a name for the run history container in the workspace\n",
"run_history_name = username + '-tf_ps'\n",
"\n",
"experiment = Experiment(ws, run_history_name)\n",
"\n",
"# project folder name\n",
"project_folder = './' + run_history_name\n",
"\n",
"print(project_folder)\n",
"os.makedirs(project_folder, exist_ok = True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This recipe is using a MLC-managed Batch AI cluster. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.core.compute import BatchAiCompute\n",
"from azureml.core.compute import ComputeTarget\n",
"\n",
"batchai_cluster_name='gpucluster'\n",
"\n",
"\n",
"try:\n",
" # Check for existing cluster\n",
" compute_target = ComputeTarget(ws,batchai_cluster_name)\n",
" print('Found existing compute target')\n",
"except:\n",
" # Else, create new one\n",
" print('Creating a new compute target...')\n",
" provisioning_config = BatchAiCompute.provisioning_configuration(vm_size = \"STANDARD_NC6\", # NC6 is GPU-enabled\n",
" #vm_priority = 'lowpriority', # optional\n",
" autoscale_enabled = True,\n",
" cluster_min_nodes = 0, \n",
" cluster_max_nodes = 4)\n",
" compute_target = ComputeTarget.create(ws, batchai_cluster_name, provisioning_config)\n",
" # can poll for a minimum number of nodes and for a specific timeout. \n",
" # if no min node count is provided it will use the scale settings for the cluster\n",
" compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)\n",
"\n",
" # For a more detailed view of current BatchAI cluster status, use the 'status' property \n",
"print(compute_target.status.serialize())"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%writefile {project_folder}/mnist_replica.py\n",
"\n",
"# Copyright 2016 The TensorFlow Authors. All Rights Reserved.\n",
"#\n",
"# Licensed under the Apache License, Version 2.0 (the \"License\");\n",
"# you may not use this file except in compliance with the License.\n",
"# You may obtain a copy of the License at\n",
"#\n",
"# http://www.apache.org/licenses/LICENSE-2.0\n",
"#\n",
"# Unless required by applicable law or agreed to in writing, software\n",
"# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
"# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
"# See the License for the specific language governing permissions and\n",
"# limitations under the License.\n",
"# ==============================================================================\n",
"\"\"\"Distributed MNIST training and validation, with model replicas.\n",
"A simple softmax model with one hidden layer is defined. The parameters\n",
"(weights and biases) are located on one parameter server (ps), while the ops\n",
"are executed on two worker nodes by default. The TF sessions also run on the\n",
"worker node.\n",
"Multiple invocations of this script can be done in parallel, with different\n",
"values for --task_index. There should be exactly one invocation with\n",
"--task_index, which will create a master session that carries out variable\n",
"initialization. The other, non-master, sessions will wait for the master\n",
"session to finish the initialization before proceeding to the training stage.\n",
"The coordination between the multiple worker invocations occurs due to\n",
"the definition of the parameters on the same ps devices. The parameter updates\n",
"from one worker is visible to all other workers. As such, the workers can\n",
"perform forward computation and gradient calculation in parallel, which\n",
"should lead to increased training speed for the simple model.\n",
"\"\"\"\n",
"\n",
"from __future__ import absolute_import\n",
"from __future__ import division\n",
"from __future__ import print_function\n",
"\n",
"import os\n",
"import math\n",
"import sys\n",
"import tempfile\n",
"import time\n",
"import json\n",
"\n",
"import tensorflow as tf\n",
"from tensorflow.examples.tutorials.mnist import input_data\n",
"from azureml.core.run import Run\n",
"\n",
"flags = tf.app.flags\n",
"flags.DEFINE_string(\"data_dir\", \"/tmp/mnist-data\",\n",
" \"Directory for storing mnist data\")\n",
"flags.DEFINE_boolean(\"download_only\", False,\n",
" \"Only perform downloading of data; Do not proceed to \"\n",
" \"session preparation, model definition or training\")\n",
"flags.DEFINE_integer(\"num_gpus\", 0, \"Total number of gpus for each machine.\"\n",
" \"If you don't use GPU, please set it to '0'\")\n",
"flags.DEFINE_integer(\"replicas_to_aggregate\", None,\n",
" \"Number of replicas to aggregate before parameter update \"\n",
" \"is applied (For sync_replicas mode only; default: \"\n",
" \"num_workers)\")\n",
"flags.DEFINE_integer(\"hidden_units\", 100,\n",
" \"Number of units in the hidden layer of the NN\")\n",
"flags.DEFINE_integer(\"train_steps\", 200,\n",
" \"Number of (global) training steps to perform\")\n",
"flags.DEFINE_integer(\"batch_size\", 100, \"Training batch size\")\n",
"flags.DEFINE_float(\"learning_rate\", 0.01, \"Learning rate\")\n",
"flags.DEFINE_boolean(\n",
" \"sync_replicas\", False,\n",
" \"Use the sync_replicas (synchronized replicas) mode, \"\n",
" \"wherein the parameter updates from workers are aggregated \"\n",
" \"before applied to avoid stale gradients\")\n",
"flags.DEFINE_boolean(\n",
" \"existing_servers\", False, \"Whether servers already exists. If True, \"\n",
" \"will use the worker hosts via their GRPC URLs (one client process \"\n",
" \"per worker host). Otherwise, will create an in-process TensorFlow \"\n",
" \"server.\")\n",
"\n",
"FLAGS = flags.FLAGS\n",
"\n",
"IMAGE_PIXELS = 28\n",
"\n",
"\n",
"def main(unused_argv):\n",
" data_root = os.path.join(\"outputs\", \"MNIST\")\n",
" mnist = None\n",
" tf_config = os.environ.get(\"TF_CONFIG\")\n",
" if not tf_config or tf_config == \"\":\n",
" raise ValueError(\"TF_CONFIG not found.\")\n",
" tf_config_json = json.loads(tf_config)\n",
" cluster = tf_config_json.get('cluster')\n",
" job_name = tf_config_json.get('task', {}).get('type')\n",
" task_index = tf_config_json.get('task', {}).get('index')\n",
" job_name = \"worker\" if job_name == \"master\" else job_name\n",
" sentinel_path = os.path.join(data_root, \"complete.txt\") \n",
" if job_name==\"worker\" and task_index==0:\n",
" mnist = input_data.read_data_sets(data_root, one_hot=True)\n",
" path = os.path.join(data_root, \"complete.txt\") \n",
" with open(sentinel_path, 'w+') as f:\n",
" f.write(\"download complete\")\n",
" else:\n",
" while not os.path.exists(sentinel_path):\n",
" time.sleep(0.01)\n",
" mnist = input_data.read_data_sets(data_root, one_hot=True)\n",
" \n",
" if FLAGS.download_only:\n",
" sys.exit(0)\n",
"\n",
" print(\"job name = %s\" % job_name)\n",
" print(\"task index = %d\" % task_index)\n",
" print(\"number of GPUs = %d\" % FLAGS.num_gpus)\n",
"\n",
" #Construct the cluster and start the server\n",
" cluster_spec = tf.train.ClusterSpec(cluster)\n",
" \n",
" # Get the number of workers.\n",
" num_workers = len(cluster_spec.task_indices(\"worker\"))\n",
"\n",
" if not FLAGS.existing_servers:\n",
" # Not using existing servers. Create an in-process server.\n",
" server = tf.train.Server(\n",
" cluster_spec, job_name=job_name, task_index=task_index)\n",
" if job_name == \"ps\":\n",
" server.join()\n",
"\n",
" is_chief = (task_index == 0)\n",
" if FLAGS.num_gpus > 0:\n",
" # Avoid gpu allocation conflict: now allocate task_num -> #gpu\n",
" # for each worker in the corresponding machine\n",
" gpu = (task_index % FLAGS.num_gpus)\n",
" worker_device = \"/job:worker/task:%d/gpu:%d\" % (task_index, gpu)\n",
" elif FLAGS.num_gpus == 0:\n",
" # Just allocate the CPU to worker server\n",
" cpu = 0\n",
" worker_device = \"/job:worker/task:%d/cpu:%d\" % (task_index, cpu)\n",
" # The device setter will automatically place Variables ops on separate\n",
" # parameter servers (ps). The non-Variable ops will be placed on the workers.\n",
" # The ps use CPU and workers use corresponding GPU\n",
" with tf.device(\n",
" tf.train.replica_device_setter(\n",
" worker_device=worker_device,\n",
" ps_device=\"/job:ps/cpu:0\",\n",
" cluster=cluster)):\n",
" global_step = tf.Variable(0, name=\"global_step\", trainable=False)\n",
"\n",
" # Variables of the hidden layer\n",
" hid_w = tf.Variable(\n",
" tf.truncated_normal(\n",
" [IMAGE_PIXELS * IMAGE_PIXELS, FLAGS.hidden_units],\n",
" stddev=1.0 / IMAGE_PIXELS),\n",
" name=\"hid_w\")\n",
" hid_b = tf.Variable(tf.zeros([FLAGS.hidden_units]), name=\"hid_b\")\n",
"\n",
" # Variables of the softmax layer\n",
" sm_w = tf.Variable(\n",
" tf.truncated_normal(\n",
" [FLAGS.hidden_units, 10],\n",
" stddev=1.0 / math.sqrt(FLAGS.hidden_units)),\n",
" name=\"sm_w\")\n",
" sm_b = tf.Variable(tf.zeros([10]), name=\"sm_b\")\n",
"\n",
" # Ops: located on the worker specified with task_index\n",
" x = tf.placeholder(tf.float32, [None, IMAGE_PIXELS * IMAGE_PIXELS])\n",
" y_ = tf.placeholder(tf.float32, [None, 10])\n",
"\n",
" hid_lin = tf.nn.xw_plus_b(x, hid_w, hid_b)\n",
" hid = tf.nn.relu(hid_lin)\n",
"\n",
" y = tf.nn.softmax(tf.nn.xw_plus_b(hid, sm_w, sm_b))\n",
" cross_entropy = -tf.reduce_sum(y_ * tf.log(tf.clip_by_value(y, 1e-10, 1.0)))\n",
"\n",
" opt = tf.train.AdamOptimizer(FLAGS.learning_rate)\n",
"\n",
" if FLAGS.sync_replicas:\n",
" if FLAGS.replicas_to_aggregate is None:\n",
" replicas_to_aggregate = num_workers\n",
" else:\n",
" replicas_to_aggregate = FLAGS.replicas_to_aggregate\n",
"\n",
" opt = tf.train.SyncReplicasOptimizer(\n",
" opt,\n",
" replicas_to_aggregate=replicas_to_aggregate,\n",
" total_num_replicas=num_workers,\n",
" name=\"mnist_sync_replicas\")\n",
"\n",
" train_step = opt.minimize(cross_entropy, global_step=global_step)\n",
"\n",
" if FLAGS.sync_replicas:\n",
" local_init_op = opt.local_step_init_op\n",
" if is_chief:\n",
" local_init_op = opt.chief_init_op\n",
"\n",
" ready_for_local_init_op = opt.ready_for_local_init_op\n",
"\n",
" # Initial token and chief queue runners required by the sync_replicas mode\n",
" chief_queue_runner = opt.get_chief_queue_runner()\n",
" sync_init_op = opt.get_init_tokens_op()\n",
"\n",
" init_op = tf.global_variables_initializer()\n",
" train_dir = tempfile.mkdtemp()\n",
"\n",
" if FLAGS.sync_replicas:\n",
" sv = tf.train.Supervisor(\n",
" is_chief=is_chief,\n",
" logdir=train_dir,\n",
" init_op=init_op,\n",
" local_init_op=local_init_op,\n",
" ready_for_local_init_op=ready_for_local_init_op,\n",
" recovery_wait_secs=1,\n",
" global_step=global_step)\n",
" else:\n",
" sv = tf.train.Supervisor(\n",
" is_chief=is_chief,\n",
" logdir=train_dir,\n",
" init_op=init_op,\n",
" recovery_wait_secs=1,\n",
" global_step=global_step)\n",
"\n",
" sess_config = tf.ConfigProto(\n",
" allow_soft_placement=True,\n",
" log_device_placement=False,\n",
" device_filters=[\"/job:ps\",\n",
" \"/job:worker/task:%d\" % task_index])\n",
"\n",
" # The chief worker (task_index==0) session will prepare the session,\n",
" # while the remaining workers will wait for the preparation to complete.\n",
" if is_chief:\n",
" print(\"Worker %d: Initializing session...\" % task_index)\n",
" else:\n",
" print(\"Worker %d: Waiting for session to be initialized...\" %\n",
" task_index)\n",
"\n",
" if FLAGS.existing_servers:\n",
" server_grpc_url = \"grpc://\" + worker_spec[task_index]\n",
" print(\"Using existing server at: %s\" % server_grpc_url)\n",
"\n",
" sess = sv.prepare_or_wait_for_session(server_grpc_url, config=sess_config)\n",
" else:\n",
" sess = sv.prepare_or_wait_for_session(server.target, config=sess_config)\n",
"\n",
" print(\"Worker %d: Session initialization complete.\" % task_index)\n",
"\n",
" if FLAGS.sync_replicas and is_chief:\n",
" # Chief worker will start the chief queue runner and call the init op.\n",
" sess.run(sync_init_op)\n",
" sv.start_queue_runners(sess, [chief_queue_runner])\n",
"\n",
" # Perform training\n",
" time_begin = time.time()\n",
" print(\"Training begins @ %f\" % time_begin)\n",
"\n",
" local_step = 0\n",
" while True:\n",
" # Training feed\n",
" batch_xs, batch_ys = mnist.train.next_batch(FLAGS.batch_size)\n",
" train_feed = {x: batch_xs, y_: batch_ys}\n",
"\n",
" _, step = sess.run([train_step, global_step], feed_dict=train_feed)\n",
" local_step += 1\n",
"\n",
" now = time.time()\n",
" print(\"%f: Worker %d: training step %d done (global step: %d)\" %\n",
" (now, task_index, local_step, step))\n",
"\n",
" if step >= FLAGS.train_steps:\n",
" break\n",
"\n",
" time_end = time.time()\n",
" print(\"Training ends @ %f\" % time_end)\n",
" training_time = time_end - time_begin\n",
" print(\"Training elapsed time: %f s\" % training_time)\n",
"\n",
" # Validation feed\n",
" val_feed = {x: mnist.validation.images, y_: mnist.validation.labels}\n",
" val_xent = sess.run(cross_entropy, feed_dict=val_feed)\n",
" print(\"After %d training step(s), validation cross entropy = %g\" %\n",
" (FLAGS.train_steps, val_xent))\n",
" if job_name==\"worker\" and task_index==0:\n",
" run = Run.get_submitted_run()\n",
" run.log(\"CrossEntropy\", val_xent)\n",
"\n",
"if __name__ == \"__main__\":\n",
" tf.app.run()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.train.dnn import *\n",
"tf_estimator = TensorFlow(source_directory=project_folder,\n",
" compute_target=compute_target,\n",
" entry_script='mnist_replica.py',\n",
" node_count=2,\n",
" worker_count=2,\n",
" parameter_server_count=1, \n",
" distributed_backend=\"ps\",\n",
" use_gpu=False)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"run = experiment.submit(tf_estimator)\n",
"print(run)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.train.widgets import RunDetails\n",
"RunDetails(run).show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"run.wait_for_completion(show_output=True)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.5"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

View File

@@ -0,0 +1,509 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Copyright (c) Microsoft Corporation. All rights reserved.\n",
"\n",
"Licensed under the MIT License."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# 52. Distributed CNTK\n",
"In this tutorial we demonstrate how to use the Azure ML Training SDK to train CNTK model in a distributed manner.\n",
"\n",
"# Prerequisites\n",
"\n",
"Make sure you go through the [00. Installation and Configuration](00.configuration.ipynb) Notebook first if you haven't."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Check core SDK version number\n",
"import azureml.core\n",
"\n",
"print(\"SDK version:\", azureml.core.VERSION)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.core.workspace import Workspace\n",
"\n",
"ws = Workspace.from_config()\n",
"print('Workspace name: ' + ws.name, \n",
" 'Azure region: ' + ws.location, \n",
" 'Subscription id: ' + ws.subscription_id, \n",
" 'Resource group: ' + ws.resource_group, sep = '\\n')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import getpass\n",
"import os\n",
"from azureml.core.experiment import Experiment\n",
"\n",
"username = getpass.getuser().replace('-','')\n",
"\n",
"# choose a name for the run history container in the workspace\n",
"run_history_name = username + '-cntk-distrib'\n",
"\n",
"experiment = Experiment(ws, run_history_name)\n",
"\n",
"# project folder name\n",
"project_folder = './' + run_history_name\n",
"\n",
"print(project_folder)\n",
"os.makedirs(project_folder, exist_ok = True)\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This recipe is using a MLC-managed Batch AI cluster. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.core.compute import BatchAiCompute\n",
"from azureml.core.compute import ComputeTarget\n",
"\n",
"batchai_cluster_name='gpucluster'\n",
"\n",
"\n",
"try:\n",
" # Check for existing cluster\n",
" compute_target = ComputeTarget(ws,batchai_cluster_name)\n",
" print('Found existing compute target')\n",
"except:\n",
" # Else, create new one\n",
" print('Creating a new compute target...')\n",
" provisioning_config = BatchAiCompute.provisioning_configuration(vm_size = \"STANDARD_NC6\", # NC6 is GPU-enabled\n",
" #vm_priority = 'lowpriority', # optional\n",
" autoscale_enabled = True,\n",
" cluster_min_nodes = 0, \n",
" cluster_max_nodes = 4)\n",
" compute_target = ComputeTarget.create(ws, batchai_cluster_name, provisioning_config)\n",
" # can poll for a minimum number of nodes and for a specific timeout. \n",
" # if no min node count is provided it will use the scale settings for the cluster\n",
" compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)\n",
"\n",
" # For a more detailed view of current BatchAI cluster status, use the 'status' property \n",
"print(compute_target.status.serialize())"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%writefile {project_folder}/cntk_mnist.py\n",
"\n",
"# This code is adapted from CNTK MNIST tutorials: \n",
"# 1. https://github.com/Microsoft/CNTK/blob/v2.0/Tutorials/CNTK_103A_MNIST_DataLoader.ipynb\n",
"# 2. https://github.com/Microsoft/CNTK/blob/v2.0/Tutorials/CNTK_103C_MNIST_MultiLayerPerceptron.ipynb\n",
"\n",
"# Import the relevant modules to be used later\n",
"from __future__ import print_function\n",
"import gzip\n",
"import numpy as np\n",
"import os\n",
"import shutil\n",
"import struct\n",
"import sys\n",
"import time\n",
"import pandas \n",
"\n",
"import cntk as C\n",
"from azureml.core.run import Run\n",
"import argparse\n",
"\n",
"run = Run.get_submitted_run()\n",
"\n",
"parser=argparse.ArgumentParser()\n",
"\n",
"parser.add_argument('--learning_rate', type=float, default=0.001, help='learning rate')\n",
"parser.add_argument('--num_hidden_layers', type=int, default=2, help='number of hidden layers')\n",
"parser.add_argument('--minibatch_size', type=int, default=64, help='minibatchsize')\n",
"\n",
"args=parser.parse_args() \n",
"\n",
"# Functions to load MNIST images and unpack into train and test set.\n",
"# - loadData reads image data and formats into a 28x28 long array\n",
"# - loadLabels reads the corresponding labels data, 1 for each image\n",
"# - load packs the downloaded image and labels data into a combined format to be read later by \n",
"# CNTK text reader \n",
"def loadData(src, cimg):\n",
" print ('Downloading ' + src)\n",
" gzfname, h = urlretrieve(src, './delete.me')\n",
" print ('Done.')\n",
" try:\n",
" with gzip.open(gzfname) as gz:\n",
" n = struct.unpack('I', gz.read(4))\n",
" # Read magic number.\n",
" if n[0] != 0x3080000:\n",
" raise Exception('Invalid file: unexpected magic number.')\n",
" # Read number of entries.\n",
" n = struct.unpack('>I', gz.read(4))[0]\n",
" if n != cimg:\n",
" raise Exception('Invalid file: expected {0} entries.'.format(cimg))\n",
" crow = struct.unpack('>I', gz.read(4))[0]\n",
" ccol = struct.unpack('>I', gz.read(4))[0]\n",
" if crow != 28 or ccol != 28:\n",
" raise Exception('Invalid file: expected 28 rows/cols per image.')\n",
" # Read data.\n",
" res = np.fromstring(gz.read(cimg * crow * ccol), dtype = np.uint8)\n",
" finally:\n",
" os.remove(gzfname)\n",
" return res.reshape((cimg, crow * ccol))\n",
"\n",
"def loadLabels(src, cimg):\n",
" print ('Downloading ' + src)\n",
" gzfname, h = urlretrieve(src, './delete.me')\n",
" print ('Done.')\n",
" try:\n",
" with gzip.open(gzfname) as gz:\n",
" n = struct.unpack('I', gz.read(4))\n",
" # Read magic number.\n",
" if n[0] != 0x1080000:\n",
" raise Exception('Invalid file: unexpected magic number.')\n",
" # Read number of entries.\n",
" n = struct.unpack('>I', gz.read(4))\n",
" if n[0] != cimg:\n",
" raise Exception('Invalid file: expected {0} rows.'.format(cimg))\n",
" # Read labels.\n",
" res = np.fromstring(gz.read(cimg), dtype = np.uint8)\n",
" finally:\n",
" os.remove(gzfname)\n",
" return res.reshape((cimg, 1))\n",
"\n",
"def try_download(dataSrc, labelsSrc, cimg):\n",
" data = loadData(dataSrc, cimg)\n",
" labels = loadLabels(labelsSrc, cimg)\n",
" return np.hstack((data, labels))\n",
"\n",
"# Save the data files into a format compatible with CNTK text reader\n",
"def savetxt(filename, ndarray):\n",
" dir = os.path.dirname(filename)\n",
"\n",
" if not os.path.exists(dir):\n",
" os.makedirs(dir)\n",
"\n",
" if not os.path.isfile(filename):\n",
" print(\"Saving\", filename )\n",
" with open(filename, 'w') as f:\n",
" labels = list(map(' '.join, np.eye(10, dtype=np.uint).astype(str)))\n",
" for row in ndarray:\n",
" row_str = row.astype(str)\n",
" label_str = labels[row[-1]]\n",
" feature_str = ' '.join(row_str[:-1])\n",
" f.write('|labels {} |features {}\\n'.format(label_str, feature_str))\n",
" else:\n",
" print(\"File already exists\", filename)\n",
"\n",
"# Read a CTF formatted text (as mentioned above) using the CTF deserializer from a file\n",
"def create_reader(path, is_training, input_dim, num_label_classes):\n",
" return C.io.MinibatchSource(C.io.CTFDeserializer(path, C.io.StreamDefs(\n",
" labels = C.io.StreamDef(field='labels', shape=num_label_classes, is_sparse=False),\n",
" features = C.io.StreamDef(field='features', shape=input_dim, is_sparse=False)\n",
" )), randomize = is_training, max_sweeps = C.io.INFINITELY_REPEAT if is_training else 1)\n",
"\n",
"# Defines a utility that prints the training progress\n",
"def print_training_progress(trainer, mb, frequency, verbose=1):\n",
" training_loss = \"NA\"\n",
" eval_error = \"NA\"\n",
"\n",
" if mb%frequency == 0:\n",
" training_loss = trainer.previous_minibatch_loss_average\n",
" eval_error = trainer.previous_minibatch_evaluation_average\n",
" if verbose: \n",
" print (\"Minibatch: {0}, Loss: {1:.4f}, Error: {2:.2f}%\".format(mb, training_loss, eval_error*100))\n",
" \n",
" return mb, training_loss, eval_error\n",
"\n",
"# Create the network architecture\n",
"def create_model(features):\n",
" with C.layers.default_options(init = C.layers.glorot_uniform(), activation = C.ops.relu):\n",
" h = features\n",
" for _ in range(num_hidden_layers):\n",
" h = C.layers.Dense(hidden_layers_dim)(h)\n",
" r = C.layers.Dense(num_output_classes, activation = None)(h)\n",
" return r\n",
"\n",
"\n",
"if __name__ == '__main__':\n",
" run = Run.get_submitted_run()\n",
"\n",
" try: \n",
" from urllib.request import urlretrieve \n",
" except ImportError: \n",
" from urllib import urlretrieve\n",
"\n",
" # Select the right target device when this script is being used:\n",
" if 'TEST_DEVICE' in os.environ:\n",
" if os.environ['TEST_DEVICE'] == 'cpu':\n",
" C.device.try_set_default_device(C.device.cpu())\n",
" else:\n",
" C.device.try_set_default_device(C.device.gpu(0))\n",
"\n",
" # URLs for the train image and labels data\n",
" url_train_image = 'http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz'\n",
" url_train_labels = 'http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz'\n",
" num_train_samples = 60000\n",
"\n",
" print(\"Downloading train data\")\n",
" train = try_download(url_train_image, url_train_labels, num_train_samples)\n",
"\n",
" url_test_image = 'http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz'\n",
" url_test_labels = 'http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz'\n",
" num_test_samples = 10000\n",
"\n",
" print(\"Downloading test data\")\n",
" test = try_download(url_test_image, url_test_labels, num_test_samples)\n",
"\n",
"\n",
" # Save the train and test files (prefer our default path for the data\n",
" rank = os.environ.get(\"OMPI_COMM_WORLD_RANK\") \n",
" data_dir = os.path.join(\"outputs\", \"MNIST\")\n",
" sentinel_path = os.path.join(data_dir, \"complete.txt\") \n",
" if rank == '0': \n",
" print ('Writing train text file...')\n",
" savetxt(os.path.join(data_dir, \"Train-28x28_cntk_text.txt\"), train)\n",
"\n",
" print ('Writing test text file...')\n",
" savetxt(os.path.join(data_dir, \"Test-28x28_cntk_text.txt\"), test)\n",
" with open(sentinel_path, 'w+') as f:\n",
" f.write(\"download complete\")\n",
"\n",
" print('Done with downloading data.')\n",
" else:\n",
" while not os.path.exists(sentinel_path):\n",
" time.sleep(0.01)\n",
" \n",
"\n",
" # Ensure we always get the same amount of randomness\n",
" np.random.seed(0)\n",
"\n",
" # Define the data dimensions\n",
" input_dim = 784\n",
" num_output_classes = 10\n",
"\n",
" # Ensure the training and test data is generated and available for this tutorial.\n",
" # We search in two locations in the toolkit for the cached MNIST data set.\n",
" data_found = False\n",
" for data_dir in [os.path.join(\"..\", \"Examples\", \"Image\", \"DataSets\", \"MNIST\"),\n",
" os.path.join(\"data_\" + str(rank), \"MNIST\"),\n",
" os.path.join(\"outputs\", \"MNIST\")]:\n",
" train_file = os.path.join(data_dir, \"Train-28x28_cntk_text.txt\")\n",
" test_file = os.path.join(data_dir, \"Test-28x28_cntk_text.txt\")\n",
" if os.path.isfile(train_file) and os.path.isfile(test_file):\n",
" data_found = True\n",
" break\n",
" if not data_found:\n",
" raise ValueError(\"Please generate the data by completing CNTK 103 Part A\")\n",
" print(\"Data directory is {0}\".format(data_dir))\n",
"\n",
" num_hidden_layers = args.num_hidden_layers\n",
" hidden_layers_dim = 400\n",
"\n",
" input = C.input_variable(input_dim)\n",
" label = C.input_variable(num_output_classes)\n",
"\n",
" \n",
" z = create_model(input)\n",
" # Scale the input to 0-1 range by dividing each pixel by 255.\n",
" z = create_model(input/255.0)\n",
"\n",
" loss = C.cross_entropy_with_softmax(z, label)\n",
" label_error = C.classification_error(z, label)\n",
"\n",
"\n",
" # Instantiate the trainer object to drive the model training\n",
" learning_rate = args.learning_rate\n",
" lr_schedule = C.learning_rate_schedule(learning_rate, C.UnitType.minibatch)\n",
" learner = C.sgd(z.parameters, lr_schedule)\n",
" trainer = C.Trainer(z, (loss, label_error), [learner])\n",
"\n",
"\n",
" # Initialize the parameters for the trainer\n",
" minibatch_size = args.minibatch_size\n",
" num_samples_per_sweep = 60000\n",
" num_sweeps_to_train_with = 10\n",
" num_minibatches_to_train = (num_samples_per_sweep * num_sweeps_to_train_with) / minibatch_size\n",
"\n",
" # Create the reader to training data set\n",
" reader_train = create_reader(train_file, True, input_dim, num_output_classes)\n",
"\n",
" # Map the data streams to the input and labels.\n",
" input_map = {\n",
" label : reader_train.streams.labels,\n",
" input : reader_train.streams.features\n",
" } \n",
"\n",
" # Run the trainer on and perform model training\n",
" training_progress_output_freq = 500\n",
" \n",
" errors = []\n",
" losses = []\n",
" for i in range(0, int(num_minibatches_to_train)): \n",
" # Read a mini batch from the training data file\n",
" data = reader_train.next_minibatch(minibatch_size, input_map = input_map)\n",
" \n",
" trainer.train_minibatch(data)\n",
" batchsize, loss, error = print_training_progress(trainer, i, training_progress_output_freq, verbose=1)\n",
" if (error != 'NA') and (loss != 'NA'):\n",
" errors.append(float(error))\n",
" losses.append(float(loss))\n",
" \n",
" # log the losses\n",
" if rank == '0': \n",
" run.log_list(\"Loss\", losses)\n",
" run.log_list(\"Error\",errors)\n",
"\n",
" # Read the training data\n",
" reader_test = create_reader(test_file, False, input_dim, num_output_classes)\n",
"\n",
" test_input_map = {\n",
" label : reader_test.streams.labels,\n",
" input : reader_test.streams.features,\n",
" }\n",
"\n",
" # Test data for trained model\n",
" test_minibatch_size = 512\n",
" num_samples = 10000\n",
" num_minibatches_to_test = num_samples // test_minibatch_size\n",
" test_result = 0.0\n",
"\n",
" \n",
" for i in range(num_minibatches_to_test): \n",
" # We are loading test data in batches specified by test_minibatch_size\n",
" # Each data point in the minibatch is a MNIST digit image of 784 dimensions \n",
" # with one pixel per dimension that we will encode / decode with the \n",
" # trained model.\n",
" data = reader_test.next_minibatch(test_minibatch_size,\n",
" input_map = test_input_map)\n",
"\n",
" eval_error = trainer.test_minibatch(data)\n",
" test_result = test_result + eval_error\n",
" \n",
"\n",
" # Average of evaluation errors of all test minibatches\n",
" print(\"Average test error: {0:.2f}%\".format(test_result*100 / num_minibatches_to_test))\n",
"\n",
" out = C.softmax(z)\n",
"\n",
" # Read the data for evaluation\n",
" reader_eval = create_reader(test_file, False, input_dim, num_output_classes)\n",
"\n",
" eval_minibatch_size = 25\n",
" eval_input_map = {input: reader_eval.streams.features} \n",
"\n",
" data = reader_test.next_minibatch(eval_minibatch_size, input_map = test_input_map)\n",
"\n",
" img_label = data[label].asarray()\n",
" img_data = data[input].asarray()\n",
" predicted_label_prob = [out.eval(img_data[i]) for i in range(len(img_data))]\n",
"\n",
" # Find the index with the maximum value for both predicted as well as the ground truth\n",
" pred = [np.argmax(predicted_label_prob[i]) for i in range(len(predicted_label_prob))]\n",
" gtlabel = [np.argmax(img_label[i]) for i in range(len(img_label))]\n",
"\n",
" print(\"Label :\", gtlabel[:25])\n",
" print(\"Predicted:\", pred)\n",
" \n",
" # save model to outputs folder\n",
" z.save('outputs/cntk.model')\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.train.estimator import *\n",
"pip_packages=['cntk==2.5.1', 'pandas==0.23.4']\n",
"cntk_estimator = Estimator(source_directory=project_folder,\n",
" compute_target=compute_target,\n",
" entry_script='cntk_mnist.py',\n",
" node_count=2,\n",
" process_count_per_node=1,\n",
" distributed_backend=\"mpi\", \n",
" pip_packages=pip_packages,\n",
" custom_docker_base_image=\"microsoft/mmlspark:0.12\",\n",
" use_gpu=False)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"run = experiment.submit(cntk_estimator)\n",
"print(run)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.train.widgets import RunDetails\n",
"RunDetails(run).show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"run.wait_for_completion(show_output=True)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.5"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

View File

@@ -0,0 +1,376 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Copyright (c) Microsoft Corporation. All rights reserved.\n",
"\n",
"Licensed under the MIT License."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# PyTorch Distributed Demo\n",
"\n",
"In this demo, we will run a sample PyTorch job using Horovod on a multi-node Batch AI cluster."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Prerequisites\n",
"Make sure you go through the [00. Installation and Configuration](00.configuration.ipynb) Notebook first if you haven't."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Check core SDK version number\n",
"import azureml.core\n",
"\n",
"print(\"SDK version:\", azureml.core.VERSION)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Initialize Workspace\n",
"\n",
"Initialize a workspace object from persisted configuration."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.core.workspace import Workspace\n",
"\n",
"ws = Workspace.from_config()\n",
"print('Workspace name: ' + ws.name, \n",
" 'Azure region: ' + ws.location, \n",
" 'Subscription id: ' + ws.subscription_id, \n",
" 'Resource group: ' + ws.resource_group, sep = '\\n')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Set experiment name and create project\n",
"Choose a name for your run history container in the workspace, and create a folder for the project."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"experiment_name = 'pytorch-dist-hvd'\n",
"\n",
"# project folder\n",
"project_folder = './sample_projects/pytorch-dist-hvd'\n",
"os.makedirs(project_folder, exist_ok = True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Write demo PyTorch code\n",
"\n",
"We will use a distributed PyTorch implementation of the classic MNIST problem. The following cell writes the main implementation to the project folder."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%writefile {project_folder}/pytorch_horovod_mnist.py\n",
"\n",
"from __future__ import print_function\n",
"import argparse\n",
"import torch.nn as nn\n",
"import torch.nn.functional as F\n",
"import torch.optim as optim\n",
"from torchvision import datasets, transforms\n",
"from torch.autograd import Variable\n",
"import torch.utils.data.distributed\n",
"import horovod.torch as hvd\n",
"\n",
"# Training settings\n",
"parser = argparse.ArgumentParser(description='PyTorch MNIST Example')\n",
"parser.add_argument('--batch-size', type=int, default=64, metavar='N',\n",
" help='input batch size for training (default: 64)')\n",
"parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N',\n",
" help='input batch size for testing (default: 1000)')\n",
"parser.add_argument('--epochs', type=int, default=10, metavar='N',\n",
" help='number of epochs to train (default: 10)')\n",
"parser.add_argument('--lr', type=float, default=0.01, metavar='LR',\n",
" help='learning rate (default: 0.01)')\n",
"parser.add_argument('--momentum', type=float, default=0.5, metavar='M',\n",
" help='SGD momentum (default: 0.5)')\n",
"parser.add_argument('--no-cuda', action='store_true', default=False,\n",
" help='disables CUDA training')\n",
"parser.add_argument('--seed', type=int, default=42, metavar='S',\n",
" help='random seed (default: 42)')\n",
"parser.add_argument('--log-interval', type=int, default=10, metavar='N',\n",
" help='how many batches to wait before logging training status')\n",
"args = parser.parse_args()\n",
"args.cuda = not args.no_cuda and torch.cuda.is_available()\n",
"\n",
"hvd.init()\n",
"torch.manual_seed(args.seed)\n",
"\n",
"if args.cuda:\n",
" # Horovod: pin GPU to local rank.\n",
" torch.cuda.set_device(hvd.local_rank())\n",
" torch.cuda.manual_seed(args.seed)\n",
"\n",
"\n",
"kwargs = {'num_workers': 1, 'pin_memory': True} if args.cuda else {}\n",
"train_dataset = \\\n",
" datasets.MNIST('data-%d' % hvd.rank(), train=True, download=True,\n",
" transform=transforms.Compose([\n",
" transforms.ToTensor(),\n",
" transforms.Normalize((0.1307,), (0.3081,))\n",
" ]))\n",
"train_sampler = torch.utils.data.distributed.DistributedSampler(\n",
" train_dataset, num_replicas=hvd.size(), rank=hvd.rank())\n",
"train_loader = torch.utils.data.DataLoader(\n",
" train_dataset, batch_size=args.batch_size, sampler=train_sampler, **kwargs)\n",
"\n",
"test_dataset = \\\n",
" datasets.MNIST('data-%d' % hvd.rank(), train=False, transform=transforms.Compose([\n",
" transforms.ToTensor(),\n",
" transforms.Normalize((0.1307,), (0.3081,))\n",
" ]))\n",
"test_sampler = torch.utils.data.distributed.DistributedSampler(\n",
" test_dataset, num_replicas=hvd.size(), rank=hvd.rank())\n",
"test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=args.test_batch_size,\n",
" sampler=test_sampler, **kwargs)\n",
"\n",
"\n",
"class Net(nn.Module):\n",
" def __init__(self):\n",
" super(Net, self).__init__()\n",
" self.conv1 = nn.Conv2d(1, 10, kernel_size=5)\n",
" self.conv2 = nn.Conv2d(10, 20, kernel_size=5)\n",
" self.conv2_drop = nn.Dropout2d()\n",
" self.fc1 = nn.Linear(320, 50)\n",
" self.fc2 = nn.Linear(50, 10)\n",
"\n",
" def forward(self, x):\n",
" x = F.relu(F.max_pool2d(self.conv1(x), 2))\n",
" x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))\n",
" x = x.view(-1, 320)\n",
" x = F.relu(self.fc1(x))\n",
" x = F.dropout(x, training=self.training)\n",
" x = self.fc2(x)\n",
" return F.log_softmax(x)\n",
"\n",
"\n",
"model = Net()\n",
"\n",
"if args.cuda:\n",
" # Move model to GPU.\n",
" model.cuda()\n",
"\n",
"# Horovod: broadcast parameters.\n",
"hvd.broadcast_parameters(model.state_dict(), root_rank=0)\n",
"\n",
"# Horovod: scale learning rate by the number of GPUs.\n",
"optimizer = optim.SGD(model.parameters(), lr=args.lr * hvd.size(),\n",
" momentum=args.momentum)\n",
"\n",
"# Horovod: wrap optimizer with DistributedOptimizer.\n",
"optimizer = hvd.DistributedOptimizer(\n",
" optimizer, named_parameters=model.named_parameters())\n",
"\n",
"\n",
"def train(epoch):\n",
" model.train()\n",
" train_sampler.set_epoch(epoch)\n",
" for batch_idx, (data, target) in enumerate(train_loader):\n",
" if args.cuda:\n",
" data, target = data.cuda(), target.cuda()\n",
" data, target = Variable(data), Variable(target)\n",
" optimizer.zero_grad()\n",
" output = model(data)\n",
" loss = F.nll_loss(output, target)\n",
" loss.backward()\n",
" optimizer.step()\n",
" if batch_idx % args.log_interval == 0:\n",
" print('Train Epoch: {} [{}/{} ({:.0f}%)]\\tLoss: {:.6f}'.format(\n",
" epoch, batch_idx * len(data), len(train_sampler),\n",
" 100. * batch_idx / len(train_loader), loss.data[0]))\n",
"\n",
"\n",
"def metric_average(val, name):\n",
" tensor = torch.FloatTensor([val])\n",
" avg_tensor = hvd.allreduce(tensor, name=name)\n",
" return avg_tensor[0]\n",
"\n",
"\n",
"def test():\n",
" model.eval()\n",
" test_loss = 0.\n",
" test_accuracy = 0.\n",
" for data, target in test_loader:\n",
" if args.cuda:\n",
" data, target = data.cuda(), target.cuda()\n",
" data, target = Variable(data, volatile=True), Variable(target)\n",
" output = model(data)\n",
" # sum up batch loss\n",
" test_loss += F.nll_loss(output, target, size_average=False).data[0]\n",
" # get the index of the max log-probability\n",
" pred = output.data.max(1, keepdim=True)[1]\n",
" test_accuracy += pred.eq(target.data.view_as(pred)).cpu().float().sum()\n",
"\n",
" test_loss /= len(test_sampler)\n",
" test_accuracy /= len(test_sampler)\n",
"\n",
" test_loss = metric_average(test_loss, 'avg_loss')\n",
" test_accuracy = metric_average(test_accuracy, 'avg_accuracy')\n",
"\n",
" if hvd.rank() == 0:\n",
" print('\\nTest set: Average loss: {:.4f}, Accuracy: {:.2f}%\\n'.format(\n",
" test_loss, 100. * test_accuracy))\n",
"\n",
"\n",
"for epoch in range(1, args.epochs + 1):\n",
" train(epoch)\n",
" test()\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Deploy Batch AI cluster\n",
"\n",
"To run this in a distributed context, we'll need a Batch AI cluster with at least two nodes.\n",
"\n",
"Here, we use exactly two CPU nodes, to conserve resources. If you want to try it with some other number or SKU, just change the relevant values in the following code block."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.core.compute import BatchAiCompute\n",
"from azureml.core.compute import ComputeTarget\n",
"\n",
"batchai_cluster_name='gpucluster'\n",
"\n",
"\n",
"try:\n",
" # Check for existing cluster\n",
" compute_target = ComputeTarget(ws,batchai_cluster_name)\n",
" print('Found existing compute target')\n",
"except:\n",
" # Else, create new one\n",
" print('Creating a new compute target...')\n",
" provisioning_config = BatchAiCompute.provisioning_configuration(vm_size = \"STANDARD_NC6\", # NC6 is GPU-enabled\n",
" #vm_priority = 'lowpriority', # optional\n",
" autoscale_enabled = True,\n",
" cluster_min_nodes = 0, \n",
" cluster_max_nodes = 4)\n",
" compute_target = ComputeTarget.create(ws, batchai_cluster_name, provisioning_config)\n",
" # can poll for a minimum number of nodes and for a specific timeout. \n",
" # if no min node count is provided it will use the scale settings for the cluster\n",
" compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)\n",
"\n",
" # For a more detailed view of current BatchAI cluster status, use the 'status' property \n",
"print(compute_target.status.serialize())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Submit job\n",
"\n",
"Now that we have a cluster ready to go, let's submit our job.\n",
"\n",
"We need to use a custom estimator here, and specify that we want the `pytorch`, `horovod` and `torchvision` packages installed to our image."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.train.dnn import PyTorch\n",
"\n",
"estimator = PyTorch(source_directory=project_folder,\n",
" compute_target=compute_target,\n",
" entry_script='pytorch_horovod_mnist.py',\n",
" node_count=2,\n",
" process_count_per_node=1,\n",
" distributed_backend=\"mpi\",\n",
" use_gpu=False)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.core.experiment import Experiment\n",
"\n",
"experiment = Experiment(workspace=ws, name=experiment_name)\n",
"run = experiment.submit(estimator)\n",
"print(run)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.train.widgets import RunDetails\n",
"RunDetails(run).show()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.2"
}
},
"nbformat": 4,
"nbformat_minor": 2
}