update samples from Release-72 as a part of 1.24.0 SDK stable release

This commit is contained in:
amlrelsa-ms
2021-03-09 04:17:07 +00:00
parent 3fe4f8b038
commit efc61d2219
130 changed files with 1019 additions and 9498 deletions

View File

@@ -0,0 +1,495 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Copyright (c) Microsoft Corporation. All rights reserved.\n",
"\n",
"Licensed under the MIT License."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/ml-frameworks/pytorch/distributed-pytorch-with-horovod/distributed-pytorch-with-horovod.png)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Distributed PyTorch with DistributedDataParallel\n",
"\n",
"In this tutorial, you will train a PyTorch model on the [CIFAR-10](https://www.cs.toronto.edu/~kriz/cifar.html) dataset using distributed training with PyTorch's `DistributedDataParallel` module across a GPU cluster."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Prerequisites\n",
"* If you are using an Azure Machine Learning Notebook VM, you are all set. Otherwise, go through the [Configuration](../../../../configuration.ipynb) notebook to install the Azure Machine Learning Python SDK and create an Azure ML `Workspace`"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Check core SDK version number\n",
"import azureml.core\n",
"\n",
"print(\"SDK version:\", azureml.core.VERSION)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Diagnostics\n",
"Opt-in diagnostics for better experience, quality, and security of future releases."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": [
"Diagnostics"
]
},
"outputs": [],
"source": [
"from azureml.telemetry import set_diagnostics_collection\n",
"\n",
"set_diagnostics_collection(send_diagnostics=True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Initialize workspace\n",
"\n",
"Initialize a [Workspace](https://docs.microsoft.com/azure/machine-learning/service/concept-azure-machine-learning-architecture#workspace) object from the existing workspace you created in the Prerequisites step. `Workspace.from_config()` creates a workspace object from the details stored in `config.json`."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.core.workspace import Workspace\n",
"\n",
"ws = Workspace.from_config()\n",
"print('Workspace name: ' + ws.name, \n",
" 'Azure region: ' + ws.location, \n",
" 'Subscription id: ' + ws.subscription_id, \n",
" 'Resource group: ' + ws.resource_group, sep='\\n')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Create or attach existing AmlCompute\n",
"You will need to create a [compute target](https://docs.microsoft.com/azure/machine-learning/service/concept-azure-machine-learning-architecture#compute-target) for training your model. In this tutorial, we use Azure ML managed compute ([AmlCompute](https://docs.microsoft.com/azure/machine-learning/service/how-to-set-up-training-targets#amlcompute)) for our remote training compute resource. Specifically, the below code creates an `STANDARD_NC6` GPU cluster that autoscales from `0` to `4` nodes.\n",
"\n",
"**Creation of AmlCompute takes approximately 5 minutes.** If the AmlCompute with that name is already in your workspace, this code will skip the creation process.\n",
"\n",
"As with other Azure services, there are limits on certain resources (e.g. AmlCompute) associated with the Azure Machine Learning service. Please read [this article](https://docs.microsoft.com/en-us/azure/machine-learning/service/how-to-manage-quotas) on the default limits and how to request more quota."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.core.compute import ComputeTarget, AmlCompute\n",
"from azureml.core.compute_target import ComputeTargetException\n",
"\n",
"# choose a name for your cluster\n",
"cluster_name = 'gpu-cluster'\n",
"\n",
"try:\n",
" compute_target = ComputeTarget(workspace=ws, name=cluster_name)\n",
" print('Found existing compute target.')\n",
"except ComputeTargetException:\n",
" print('Creating a new compute target...')\n",
" compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_NC6',\n",
" max_nodes=4)\n",
"\n",
" # create the cluster\n",
" compute_target = ComputeTarget.create(ws, cluster_name, compute_config)\n",
"\n",
" compute_target.wait_for_completion(show_output=True)\n",
"\n",
"# use get_status() to get a detailed status for the current AmlCompute. \n",
"print(compute_target.get_status().serialize())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The above code creates GPU compute. If you instead want to create CPU compute, provide a different VM size to the `vm_size` parameter, such as `STANDARD_D2_V2`."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Prepare dataset\n",
"\n",
"Prepare the dataset used for training. We will first download and extract the publicly available CIFAR-10 dataset from the cs.toronto.edu website and then create an Azure ML FileDataset to use the data for training."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Download and extract CIFAR-10 data"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import urllib\n",
"import tarfile\n",
"import os\n",
"\n",
"url = 'https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz'\n",
"filename = 'cifar-10-python.tar.gz'\n",
"data_root = 'cifar-10'\n",
"filepath = os.path.join(data_root, filename)\n",
"\n",
"if not os.path.isdir(data_root):\n",
" os.makedirs(data_root, exist_ok=True)\n",
" urllib.request.urlretrieve(url, filepath)\n",
" with tarfile.open(filepath, \"r:gz\") as tar:\n",
" tar.extractall(path=data_root)\n",
" os.remove(filepath) # delete tar.gz file after extraction"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Create Azure ML dataset\n",
"\n",
"The `upload_directory` method will upload the data to a datastore and create a FileDataset from it. In this tutorial we will use the workspace's default datastore."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.core import Dataset\n",
"\n",
"datastore = ws.get_default_datastore()\n",
"dataset = Dataset.File.upload_directory(\n",
" src_dir=data_root, target=(datastore, data_root)\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Train model on the remote compute\n",
"Now that we have the AmlCompute ready to go, let's run our distributed training job."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Create a project directory\n",
"Create a directory that will contain all the necessary code from your local machine that you will need access to on the remote resource. This includes the training script and any additional files your training script depends on."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"project_folder = './pytorch-distr'\n",
"os.makedirs(project_folder, exist_ok=True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Prepare training script\n",
"Now you will need to create your training script. In this tutorial, the script for distributed training on CIFAR-10 is already provided for you at `train.py`. In practice, you should be able to take any custom PyTorch training script as is and run it with Azure ML without having to modify your code."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Once your script is ready, copy the training script `train.py` into the project directory."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import shutil\n",
"\n",
"shutil.copy('train.py', project_folder)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Create an experiment\n",
"Create an [Experiment](https://docs.microsoft.com/azure/machine-learning/service/concept-azure-machine-learning-architecture#experiment) to track all the runs in your workspace for this distributed PyTorch tutorial. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.core import Experiment\n",
"\n",
"experiment_name = 'pytorch-distr'\n",
"experiment = Experiment(ws, name=experiment_name)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Create an environment\n",
"\n",
"In this tutorial, we will use one of Azure ML's curated PyTorch environments for training. [Curated environments](https://docs.microsoft.com/azure/machine-learning/how-to-use-environments#use-a-curated-environment) are available in your workspace by default. Specifically, we will use the PyTorch 1.6 GPU curated environment."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.core import Environment\n",
"\n",
"pytorch_env = Environment.get(ws, name='AzureML-PyTorch-1.6-GPU')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Configure the training job\n",
"\n",
"To launch a distributed PyTorch job on Azure ML, you have two options:\n",
"\n",
"1. Per-process launch - specify the total # of worker processes (typically one per GPU) you want to run, and\n",
"Azure ML will handle launching each process.\n",
"2. Per-node launch with [torch.distributed.launch](https://pytorch.org/docs/stable/distributed.html#launch-utility) - provide the `torch.distributed.launch` command you want to\n",
"run on each node.\n",
"\n",
"For more information, see the [documentation](https://docs.microsoft.com/en-us/azure/machine-learning/how-to-train-pytorch#distributeddataparallel).\n",
"\n",
"Both options are shown below."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Per-process launch\n",
"\n",
"To use the per-process launch option in which Azure ML will handle launching each of the processes to run your training script,\n",
"\n",
"1. Specify the training script and arguments\n",
"2. Create a `PyTorchConfiguration` and specify `node_count` and `process_count`. The `process_count` is the total number of processes you want to run for the job; this should typically equal the # of GPUs available on each node multiplied by the # of nodes. Since this tutorial uses the `STANDARD_NC6` SKU, which has one GPU, the total process count for a 2-node job is `2`. If you are using a SKU with >1 GPUs, adjust the `process_count` accordingly.\n",
"\n",
"Azure ML will set the `MASTER_ADDR`, `MASTER_PORT`, `NODE_RANK`, `WORLD_SIZE` environment variables on each node, in addition to the process-level `RANK` and `LOCAL_RANK` environment variables, that are needed for distributed PyTorch training."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.core import ScriptRunConfig\n",
"from azureml.core.runconfig import PyTorchConfiguration\n",
"\n",
"# create distributed config\n",
"distr_config = PyTorchConfiguration(process_count=2, node_count=2)\n",
"\n",
"# create args\n",
"args = [\"--data-dir\", dataset.as_download(), \"--epochs\", 25]\n",
"\n",
"# create job config\n",
"src = ScriptRunConfig(source_directory=project_folder,\n",
" script='train.py',\n",
" arguments=args,\n",
" compute_target=compute_target,\n",
" environment=pytorch_env,\n",
" distributed_job_config=distr_config)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Per-node launch with `torch.distributed.launch`\n",
"\n",
"If you would instead like to use the PyTorch-provided launch utility `torch.distributed.launch` to handle launching the worker processes on each node, you can do so as well. \n",
"\n",
"1. Provide the launch command to the `command` parameter of ScriptRunConfig. For PyTorch jobs Azure ML will set the `MASTER_ADDR`, `MASTER_PORT`, and `NODE_RANK` environment variables on each node, so you can simply just reference those environment variables in your command. If you are using a SKU with >1 GPUs, adjust the `--nproc_per_node` argument accordingly.\n",
"\n",
"2. Create a `PyTorchConfiguration` and specify the `node_count`. You do not need to specify the `process_count`; by default Azure ML will launch one process per node to run the `command` you provided.\n",
"\n",
"Uncomment the code below to configure a job with this method."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"'''\n",
"from azureml.core import ScriptRunConfig\n",
"from azureml.core.runconfig import PyTorchConfiguration\n",
"\n",
"# create distributed config\n",
"distr_config = PyTorchConfiguration(node_count=2)\n",
"\n",
"# define command\n",
"launch_cmd = [\"python -m torch.distributed.launch --nproc_per_node 1 --nnodes 2 \" \\\n",
" \"--node_rank $NODE_RANK --master_addr $MASTER_ADDR --master_port $MASTER_PORT --use_env \" \\\n",
" \"train.py --data-dir\", dataset.as_download(), \"--epochs 25\"]\n",
"\n",
"# create job config\n",
"src = ScriptRunConfig(source_directory=project_folder,\n",
" command=launch_cmd,\n",
" compute_target=compute_target,\n",
" environment=pytorch_env,\n",
" distributed_job_config=distr_config)\n",
"'''"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Submit job\n",
"Run your experiment by submitting your `ScriptRunConfig` object. Note that this call is asynchronous."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"run = experiment.submit(src)\n",
"print(run)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Monitor your run\n",
"You can monitor the progress of the run with a Jupyter widget. Like the run submission, the widget is asynchronous and provides live updates every 10-15 seconds until the job completes. You can see that the widget automatically plots and visualizes the loss metric that we logged to the Azure ML run."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.widgets import RunDetails\n",
"\n",
"RunDetails(run).show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Alternatively, you can block until the script has completed training before running more code."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"run.wait_for_completion(show_output=True) # this provides a verbose log"
]
}
],
"metadata": {
"authors": [
{
"name": "minxia"
}
],
"category": "training",
"compute": [
"AML Compute"
],
"datasets": [
"CIFAR-10"
],
"deployment": [
"None"
],
"exclude_from_index": false,
"framework": [
"PyTorch"
],
"friendly_name": "Distributed training with PyTorch",
"index_order": 1,
"kernelspec": {
"display_name": "Python 3.6",
"language": "python",
"name": "python36"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.7"
},
"tags": [
"None"
],
"task": "Train a model using distributed training via PyTorch DistributedDataParallel"
},
"nbformat": 4,
"nbformat_minor": 2
}

View File

@@ -0,0 +1,5 @@
name: distributed-pytorch-with-distributeddataparallel
dependencies:
- pip:
- azureml-sdk
- azureml-widgets

View File

@@ -0,0 +1,238 @@
# Copyright (c) 2017 Facebook, Inc. All rights reserved.
# BSD 3-Clause License
#
# Script adapted from:
# https://pytorch.org/tutorials/beginner/blitz/cifar10_tutorial.html
# ==============================================================================
# imports
import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import os
import argparse
# define network architecture
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(3, 32, 3)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(32, 64, 3)
self.conv3 = nn.Conv2d(64, 128, 3)
self.fc1 = nn.Linear(128 * 6 * 6, 120)
self.dropout = nn.Dropout(p=0.2)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 10)
def forward(self, x):
x = F.relu(self.conv1(x))
x = self.pool(F.relu(self.conv2(x)))
x = self.pool(F.relu(self.conv3(x)))
x = x.view(-1, 128 * 6 * 6)
x = self.dropout(F.relu(self.fc1(x)))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
def train(train_loader, model, criterion, optimizer, epoch, device, print_freq, rank):
running_loss = 0.0
for i, data in enumerate(train_loader, 0):
# get the inputs; data is a list of [inputs, labels]
inputs, labels = data[0].to(device), data[1].to(device)
# zero the parameter gradients
optimizer.zero_grad()
# forward + backward + optimize
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
# print statistics
running_loss += loss.item()
if i % print_freq == 0: # print every print_freq mini-batches
print(
"Rank %d: [%d, %5d] loss: %.3f"
% (rank, epoch + 1, i + 1, running_loss / print_freq)
)
running_loss = 0.0
def evaluate(test_loader, model, device):
classes = (
"plane",
"car",
"bird",
"cat",
"deer",
"dog",
"frog",
"horse",
"ship",
"truck",
)
model.eval()
correct = 0
total = 0
class_correct = list(0.0 for i in range(10))
class_total = list(0.0 for i in range(10))
with torch.no_grad():
for data in test_loader:
images, labels = data[0].to(device), data[1].to(device)
outputs = model(images)
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
c = (predicted == labels).squeeze()
for i in range(10):
label = labels[i]
class_correct[label] += c[i].item()
class_total[label] += 1
# print total test set accuracy
print(
"Accuracy of the network on the 10000 test images: %d %%"
% (100 * correct / total)
)
# print test accuracy for each of the classes
for i in range(10):
print(
"Accuracy of %5s : %2d %%"
% (classes[i], 100 * class_correct[i] / class_total[i])
)
def main(args):
# get PyTorch environment variables
world_size = int(os.environ["WORLD_SIZE"])
rank = int(os.environ["RANK"])
local_rank = int(os.environ["LOCAL_RANK"])
distributed = world_size > 1
# set device
if distributed:
device = torch.device("cuda", local_rank)
else:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# initialize distributed process group using default env:// method
if distributed:
torch.distributed.init_process_group(backend="nccl")
# define train and test dataset DataLoaders
transform = transforms.Compose(
[transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]
)
train_set = torchvision.datasets.CIFAR10(
root=args.data_dir, train=True, download=False, transform=transform
)
if distributed:
train_sampler = torch.utils.data.distributed.DistributedSampler(train_set)
else:
train_sampler = None
train_loader = torch.utils.data.DataLoader(
train_set,
batch_size=args.batch_size,
shuffle=(train_sampler is None),
num_workers=args.workers,
sampler=train_sampler,
)
test_set = torchvision.datasets.CIFAR10(
root=args.data_dir, train=False, download=False, transform=transform
)
test_loader = torch.utils.data.DataLoader(
test_set, batch_size=args.batch_size, shuffle=False, num_workers=args.workers
)
model = Net().to(device)
# wrap model with DDP
if distributed:
model = nn.parallel.DistributedDataParallel(
model, device_ids=[local_rank], output_device=local_rank
)
# define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(
model.parameters(), lr=args.learning_rate, momentum=args.momentum
)
# train the model
for epoch in range(args.epochs):
print("Rank %d: Starting epoch %d" % (rank, epoch))
if distributed:
train_sampler.set_epoch(epoch)
model.train()
train(
train_loader,
model,
criterion,
optimizer,
epoch,
device,
args.print_freq,
rank,
)
print("Rank %d: Finished Training" % (rank))
if not distributed or rank == 0:
os.makedirs(args.output_dir, exist_ok=True)
model_path = os.path.join(args.output_dir, "cifar_net.pt")
torch.save(model.state_dict(), model_path)
# evaluate on full test dataset
evaluate(test_loader, model, device)
if __name__ == "__main__":
# setup argparse
parser = argparse.ArgumentParser()
parser.add_argument(
"--data-dir", type=str, help="directory containing CIFAR-10 dataset"
)
parser.add_argument("--epochs", default=10, type=int, help="number of epochs")
parser.add_argument(
"--batch-size",
default=16,
type=int,
help="mini batch size for each gpu/process",
)
parser.add_argument(
"--workers",
default=2,
type=int,
help="number of data loading workers for each gpu/process",
)
parser.add_argument(
"--learning-rate", default=0.001, type=float, help="learning rate"
)
parser.add_argument("--momentum", default=0.9, type=float, help="momentum")
parser.add_argument(
"--output-dir", default="outputs", type=str, help="directory to save model to"
)
parser.add_argument(
"--print-freq",
default=200,
type=int,
help="frequency of printing training statistics",
)
args = parser.parse_args()
main(args)