Copyright (c) Microsoft Corporation. All rights reserved.

Licensed under the MIT License.

![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/reinforcement-learning/atari-on-distributed-compute/pong_rllib.png)

# Reinforcement Learning in Azure Machine Learning - Pong problem
Reinforcement Learning in Azure Machine Learning is a managed service for running distributed reinforcement learning training and simulation using the open source Ray framework.
This example uses Ray RLlib to train a Pong playing agent on a multi-node cluster.

## Pong problem
[Pong](https://en.wikipedia.org/wiki/Pong) is a two-dimensional sports game that simulates table tennis. The player controls an in-game paddle by moving it vertically across the left or right side of the screen. They can compete against another player controlling a second paddle on the opposing side. Players use the paddles to hit a ball back and forth.

<table style="width:50%">
  <tr>
      <th style="text-align: center;"><img src="./images/pong.gif" alt="Pong image" align="middle" margin-left="auto" margin-right="auto"/></th>
  </tr>
  <tr style="text-align: center;">
      <th>Fig 1. Pong game animation (from <a href="https://towardsdatascience.com/intro-to-reinforcement-learning-pong-92a94aa0f84d">towardsdatascience.com</a>).</th>
  </tr>
</table>

The goal here is to train an agent to win an episode of Pong game against opponent with the score of at least 18 points. An episode in Pong runs until one of the players reaches a score of 21. Episodes are a terminology that is used across all the [OpenAI gym](https://gym.openai.com/envs/Pong-v0/) environments that contains a strictly defined task.

Training a Pong agent is a compute-intensive task and this example demonstrates the use of Reinforcement Learning in Azure Machine Learning service to train an agent faster in a distributed, parallel environment. You'll learn more about using the head and the worker compute targets to train an agent in this notebook below.

## Prerequisite

It is highly recommended that the user should go through the [Reinforcement Learning in Azure Machine Learning - Cartpole Problem on Single Compute](../cartpole-on-single-compute/cartpole_sc.ipynb) to understand the basics of Reinforcement Learning in Azure Machine Learning and Ray RLlib used in this notebook.

## Set up Development Environment
The following subsections show typical steps to setup your development environment. Setup includes:

* Connecting to a workspace to enable communication between your local machine and remote resources
* Creating an experiment to track all your runs
* Setting up a virtual network
* Creating remote head and worker compute target on a virtual network to use for training

### Azure Machine Learning SDK
Display the Azure Machine Learning SDK version.

In [None]:
%matplotlib inline

# Azure Machine Learning core imports
import azureml.core

# Check core SDK version number
print("Azure Machine Learning SDK Version: ", azureml.core.VERSION)

### Get Azure Machine Learning workspace
Get a reference to an existing Azure Machine Learning workspace.

In [None]:
from azureml.core import Workspace

ws = Workspace.from_config()
print(ws.name, ws.location, ws.resource_group, sep = ' | ')

### Create Azure Machine Learning experiment
Create an experiment to track the runs in your workspace.

In [None]:
from azureml.core.experiment import Experiment

# Experiment name
experiment_name = 'rllib-pong-multi-node'
exp = Experiment(workspace=ws, name=experiment_name)

### Create Virtual Network

If you are using separate compute targets for the Ray head and worker, a virtual network must be created in the resource group.  If you have alraeady created a virtual network in the resource group, you can skip this step.

To do this, you first must install the Azure Networking API.

`pip install --upgrade azure-mgmt-network`

In [None]:
# If you need to install the Azure Networking SDK, uncomment the following line.
#!pip install --upgrade azure-mgmt-network

In [None]:
from azure.mgmt.network import NetworkManagementClient

# Virtual network name
vnet_name ="rl_pong_vnet"

# Default subnet
subnet_name ="default"

# The Azure subscription you are using
subscription_id=ws.subscription_id

# The resource group for the reinforcement learning cluster
resource_group=ws.resource_group

# Azure region of the resource group
location=ws.location

network_client = NetworkManagementClient(ws._auth_object, subscription_id)

async_vnet_creation = network_client.virtual_networks.create_or_update(
    resource_group,
    vnet_name,
    {
        'location': location,
        'address_space': {
            'address_prefixes': ['10.0.0.0/16']
        }
    }
)

async_vnet_creation.wait()
print("Virtual network created successfully: ", async_vnet_creation.result())

### Set up Network Security Group on Virtual Network

Depending on your Azure setup, you may need to open certain ports to make it possible for Azure to manage the compute targets that you create.  The ports that need to be opened are described [here](https://docs.microsoft.com/en-us/azure/machine-learning/how-to-enable-virtual-network).

A common situation is that ports `29876-29877` are closed.  The following code will add a security rule to open these ports.    Or you can do this manually in the [Azure portal](https://portal.azure.com).

You may need to modify the code below to match your scenario.

In [None]:
import azure.mgmt.network.models

security_group_name = vnet_name + '-' + "nsg"
security_rule_name = "AllowAML"

# Create a network security group
nsg_params = azure.mgmt.network.models.NetworkSecurityGroup(
    location=location,
    security_rules=[
        azure.mgmt.network.models.SecurityRule(
            name=security_rule_name,
            access=azure.mgmt.network.models.SecurityRuleAccess.allow,
            description='Reinforcement Learning in Azure Machine Learning rule',
            destination_address_prefix='*',
            destination_port_range='29876-29877',
            direction=azure.mgmt.network.models.SecurityRuleDirection.inbound,
            priority=400,
            protocol=azure.mgmt.network.models.SecurityRuleProtocol.tcp,
            source_address_prefix='BatchNodeManagement',
            source_port_range='*'
        ),
    ],
)

async_nsg_creation = network_client.network_security_groups.create_or_update(
    resource_group,
    security_group_name,
    nsg_params,
)

async_nsg_creation.wait() 
print("Network security group created successfully:", async_nsg_creation.result())

network_security_group = network_client.network_security_groups.get(
    resource_group,
    security_group_name,
)

# Define a subnet to be created with network security group
subnet = azure.mgmt.network.models.Subnet(
            id='default',
            address_prefix='10.0.0.0/24',
            network_security_group=network_security_group
            )
    
# Create subnet on virtual network
async_subnet_creation = network_client.subnets.create_or_update(
    resource_group_name=resource_group,
    virtual_network_name=vnet_name,
    subnet_name=subnet_name,
    subnet_parameters=subnet
)

async_subnet_creation.wait()
print("Subnet created successfully:", async_subnet_creation.result())

### Review the virtual network security rules
Ensure that the virtual network is configured correctly with required ports open. It is possible that you have configured rules with broader range of ports that allows ports 29876-29877 to be opened. Kindly review your network security group rules.  

In [None]:
from files.networkutils import *

check_vnet_security_rules(ws._auth_object, ws.subscription_id, ws.resource_group, vnet_name, True)

### Create head compute target

In this example, we show how to set up separate compute targets for the Ray head and Ray worker nodes. First we define the head cluster with GPU for the Ray head node. One CPU of the head node will be used for the Ray head process and the rest of the CPUs will be used by the Ray worker processes.

In [None]:
from azureml.core.compute import AmlCompute, ComputeTarget

# Choose a name for the Ray head cluster
head_compute_name = 'head-gpu'
head_compute_min_nodes = 0
head_compute_max_nodes = 2

# This example uses GPU VM. For using CPU VM, set SKU to STANDARD_D2_V2
head_vm_size = 'STANDARD_NC6'

if head_compute_name in ws.compute_targets:
    head_compute_target = ws.compute_targets[head_compute_name]
    if head_compute_target and type(head_compute_target) is AmlCompute:
        if head_compute_target.provisioning_state == 'Succeeded':
            print('found head compute target. just use it', head_compute_name)
        else: 
            raise Exception(
                'found head compute target but it is in state', head_compute_target.provisioning_state)
else:
    print('creating a new head compute target...')
    provisioning_config = AmlCompute.provisioning_configuration(
        vm_size=head_vm_size,
        min_nodes=head_compute_min_nodes, 
        max_nodes=head_compute_max_nodes,
        vnet_resourcegroup_name=ws.resource_group,
        vnet_name=vnet_name,
        subnet_name='default')

    # Create the cluster
    head_compute_target = ComputeTarget.create(ws, head_compute_name, provisioning_config)
    
    # Can poll for a minimum number of nodes and for a specific timeout. 
    # If no min node count is provided it will use the scale settings for the cluster
    head_compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)
    
    # For a more detailed view of current AmlCompute status, use get_status()
    print(head_compute_target.get_status().serialize())

### Create worker compute target

Now we create a compute target with CPUs for the additional Ray worker nodes. CPUs in these worker nodes are used by Ray worker processes. Each Ray worker node, depending on the CPUs on the node, may have multiple Ray worker processes. There can be multiple worker tasks on each worker process (core).

In [None]:
# Choose a name for your Ray worker compute target
worker_compute_name = 'worker-cpu'
worker_compute_min_nodes = 0 
worker_compute_max_nodes = 4

# This example uses CPU VM. For using GPU VM, set SKU to STANDARD_NC6
worker_vm_size = 'STANDARD_D2_V2'

# Create the compute target if it hasn't been created already
if worker_compute_name in ws.compute_targets:
    worker_compute_target = ws.compute_targets[worker_compute_name]
    if worker_compute_target and type(worker_compute_target) is AmlCompute:
        if worker_compute_target.provisioning_state == 'Succeeded':
            print('found worker compute target. just use it', worker_compute_name)
        else: 
            raise Exception(
                'found worker compute target but it is in state', head_compute_target.provisioning_state)
else:
    print('creating a new worker compute target...')
    provisioning_config = AmlCompute.provisioning_configuration(
        vm_size=worker_vm_size,
        min_nodes=worker_compute_min_nodes,
        max_nodes=worker_compute_max_nodes,
        vnet_resourcegroup_name=ws.resource_group,
        vnet_name=vnet_name,
        subnet_name='default')

    # Create the compute target
    worker_compute_target = ComputeTarget.create(ws, worker_compute_name, provisioning_config)
    
    # Can poll for a minimum number of nodes and for a specific timeout. 
    # If no min node count is provided it will use the scale settings for the cluster
    worker_compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)
    
    # For a more detailed view of current AmlCompute status, use get_status()
    print(worker_compute_target.get_status().serialize())

## Train Pong Agent
To facilitate reinforcement learning, Azure Machine Learning Python SDK provides a high level abstraction, the _ReinforcementLearningEstimator_ class, which allows users to easily construct  reinforcement learning run configurations for the underlying reinforcement learning framework.  Reinforcement Learning in Azure Machine Learning supports the open source [Ray framework](https://ray.io/) and its highly customizable [RLLib](https://ray.readthedocs.io/en/latest/rllib.html#rllib-scalable-reinforcement-learning). In this section we show how to use _ReinforcementLearningEstimator_ and Ray/RLLib framework to train a Pong playing agent.


### Define worker configuration
Define a `WorkerConfiguration` using your worker compute target. We specify the number of nodes in the worker compute target to be used for training and additional PIP packages to install on those nodes as a part of setup.
In this case, we define the PIP packages as dependencies for both head and worker nodes. With this setup, the game simulations will run directly on the worker compute nodes.

In [None]:
from azureml.contrib.train.rl import WorkerConfiguration

# Pip packages we will use for both head and worker
pip_packages=["ray[rllib]==0.8.3"] # Latest version of Ray has fixes for isses related to object transfers

# Specify the Ray worker configuration
worker_conf = WorkerConfiguration(
    
    # Azure Machine Learning compute target to run Ray workers
    compute_target=worker_compute_target, 
    
    # Number of worker nodes
    node_count=4,
    
    # GPU
    use_gpu=False, 
    
    # PIP packages to use
    pip_packages=pip_packages
)

### Create reinforcement learning estimator

The `ReinforcementLearningEstimator` is used to submit a job to Azure Machine Learning to start the Ray experiment run. We define the training script parameters here that will be passed to the estimator. 

We specify `episode_reward_mean` to 18 as we want to stop the training as soon as the trained agent reaches an average win margin of at least 18 point over opponent over all episodes in the training epoch.
Number of Ray worker processes are defined by parameter `num_workers`. We set it to 13 as we have 13 CPUs available in our compute targets. Multiple Ray worker processes parallelizes agent training and helps in achieving our goal faster. 

```
Number of CPUs in head_compute_target = 6 CPUs in 1 node = 6
Number of CPUs in worker_compute_target = 2 CPUs in each of 4 nodes = 8
Number of CPUs available = (Number of CPUs in head_compute_target) + (Number of CPUs in worker_compute_target) - (1 CPU for head node) = 6 + 8 - 1 = 13
```

In [None]:
from azureml.contrib.train.rl import ReinforcementLearningEstimator, Ray

training_algorithm = "IMPALA"
rl_environment = "PongNoFrameskip-v4"

# Training script parameters
script_params = {
    
    # Training algorithm, IMPALA in this case
    "--run": training_algorithm,
    
    # Environment, Pong in this case
    "--env": rl_environment,
    
    # Add additional single quotes at the both ends of string values as we have spaces in the 
    # string parameters, outermost quotes are not passed to scripts as they are not actually part of string
    # Number of GPUs
    # Number of ray workers
    "--config": '\'{"num_gpus": 1, "num_workers": 13}\'',
    
    # Target episode reward mean to stop the training
    # Total training time in seconds
    "--stop": '\'{"episode_reward_mean": 18, "time_total_s": 3600}\'',
}

#  Reinforcement learning estimator
rl_estimator = ReinforcementLearningEstimator(
    
    # Location of source files
    source_directory='files',
    
    # Python script file
    entry_script="pong_rllib.py",
    
    # Parameters to pass to the script file
    # Defined above.
    script_params=script_params,
    
    # The Azure Machine Learning compute target set up for Ray head nodes
    compute_target=head_compute_target,
    
    # Pip packages
    pip_packages=pip_packages,
    
    # GPU usage
    use_gpu=True,
    
    # Reinforcement learning framework. Currently must be Ray.
    rl_framework=Ray(),
    
    # Ray worker configuration defined above.
    worker_configuration=worker_conf,
    
    # How long to wait for whole cluster to start
    cluster_coordination_timeout_seconds=3600,
    
    # Maximum time for the whole Ray job to run
    # This will cut off the run after an hour
    max_run_duration_seconds=3600,
    
    # Allow the docker container Ray runs in to make full use
    # of the shared memory available from the host OS.
    shm_size=24*1024*1024*1024
)

### Training script
As recommended in [RLlib](https://ray.readthedocs.io/en/latest/rllib.html) documentations, we use Ray [Tune](https://ray.readthedocs.io/en/latest/tune.html) API to run the training algorithm. All the RLlib built-in trainers are compatible with the Tune API. Here we use tune.run() to execute a built-in training algorithm. For convenience, down below you can see part of the entry script where we make this call.

```python
    tune.run(
        run_or_experiment=args.run,
        config={
            "env": args.env,
            "num_gpus": args.config["num_gpus"],
            "num_workers": args.config["num_workers"],
            "callbacks": {"on_train_result": callbacks.on_train_result},
            "sample_batch_size": 50,
            "train_batch_size": 1000,
            "num_sgd_iter": 2,
            "num_data_loader_buffers": 2,
            "model": {"dim": 42},
        },
        stop=args.stop,
        local_dir='./logs')
```

### Submit the estimator to start a run
Now we use the rl_estimator configured above to submit a run.

In [None]:
run = exp.submit(config=rl_estimator)

### Monitor the run

Azure Machine Learning provides a Jupyter widget to show the status of an experiment run. You could use this widget to monitor the status of the runs. The widget shows the list of two child runs, one for head compute target run and one for worker compute target run. You can click on the link under **Status** to see the details of the child run. It will also show the metrics being logged.

In [None]:
from azureml.widgets import RunDetails

RunDetails(run).show()

### Stop the run

To stop the run, call `run.cancel()`.

In [None]:
# Uncomment line below to cancel the run
# run.cancel()

### Wait for completion
Wait for the run to complete before proceeding. If you want to stop the run, you may skip this and move to next section below. 

**Note: The run may take anywhere from 30 minutes to 45 minutes to complete.**

In [None]:
run.wait_for_completion()

### Performance of the agent during training

Let's get the reward metrics for the training run agent and observe how the agent's rewards improved over the training iterations and how the agent learns to win the Pong game. 

Collect the episode reward metrics from the worker run's metrics. 

In [None]:
# Get all child runs
child_runs = list(run.get_children(_rehydrate_runs=False))

# Get the reward metrics from worker run
if child_runs[0].id.endswith("_worker"):
    episode_reward_mean = child_runs[0].get_metrics(name='episode_reward_mean')
else:
    episode_reward_mean = child_runs[1].get_metrics(name='episode_reward_mean')

Plot the reward metrics. 

In [None]:
import matplotlib.pyplot as plt

plt.plot(episode_reward_mean['episode_reward_mean'])
plt.xlabel('training_iteration')
plt.ylabel('episode_reward_mean')
plt.show()

We observe that during the training over multiple episodes, the agent learns to win the Pong game against opponent with our target of 18 points in each episode of 21 points.
**Congratulations!! You have trained your Pong agent to win a game.**

## Cleaning up
For your convenience, below you can find code snippets to clean up any resources created as part of this tutorial that you don't wish to retain.

In [None]:
# To archive the created experiment:
#experiment.archive()

# To delete the compute targets:
#head_compute_target.delete()
#worker_compute_target.delete()

## Next
In this example, you learned how to solve distributed reinforcement learning training problems using head and worker compute targets. This was an introductory tutorial on Reinforement Learning in Azure Machine Learning service offering. We would love to hear your feedback to build the features you need!