Copyright (c) Microsoft Corporation. All rights reserved.  
Licensed under the MIT License.

![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/machine-learning-pipelines/intro-to-pipelines/aml-pipelines-how-to-use-azurebatch-to-run-a-windows-executable.png)

# Azure Machine Learning Pipeline with AzureBatchStep

This notebook is used to demonstrate the use of AzureBatchStep in Azure Machine Learning Pipeline.
An AzureBatchStep will submit a job to an AzureBatch Compute to run a simple windows executable.

## Azure Machine Learning and Pipeline SDK-specific Imports

In [None]:
import azureml.core
from azureml.core import Workspace, Experiment
from azureml.core.compute import ComputeTarget, BatchCompute
from azureml.core.datastore import Datastore
from azureml.data.data_reference import DataReference
from azureml.exceptions import ComputeTargetException
from azureml.pipeline.core import Pipeline, PipelineData
from azureml.pipeline.steps import AzureBatchStep

import os
from os import path
from tempfile import mkdtemp


# Check core SDK version number
print("SDK version:", azureml.core.VERSION)

## Initialize Workspace

Initialize a workspace object from persisted configuration. Make sure the config file is present at .\config.json

If you are using an Azure Machine Learning Notebook VM, you are all set. Otherwise, If you don't have a config.json file, please go through the [configuration Notebook](https://aka.ms/pl-config) located [here](https://github.com/Azure/MachineLearningNotebooks).  

This sets you up with a working config file that has information on your workspace, subscription id, etc. 

In [None]:
ws = Workspace.from_config()

print('Workspace Name: ' + ws.name, 
      'Azure Region: ' + ws.location, 
      'Subscription Id: ' + ws.subscription_id, 
      'Resource Group: ' + ws.resource_group, sep = '\n')

## Attach Batch Compute to Workspace

To submit jobs to Azure Batch service, you must attach your Azure Batch account to the workspace.

In [None]:
batch_compute_name = 'mybatchcompute' # Name to associate with new compute in workspace

# Batch account details needed to attach as compute to workspace
batch_account_name = "<batch_account_name>" # Name of the Batch account
batch_resource_group = "<batch_resource_group>" # Name of the resource group which contains this account

try:
    # check if already attached
    batch_compute = BatchCompute(ws, batch_compute_name)
except ComputeTargetException:
    print('Attaching Batch compute...')
    provisioning_config = BatchCompute.attach_configuration(resource_group=batch_resource_group, 
                                                            account_name=batch_account_name)
    batch_compute = ComputeTarget.attach(ws, batch_compute_name, provisioning_config)
    batch_compute.wait_for_completion()
    print("Provisioning state:{}".format(batch_compute.provisioning_state))
    print("Provisioning errors:{}".format(batch_compute.provisioning_errors))

print("Using Batch compute:{}".format(batch_compute.cluster_resource_id))

## Setup Datastore

Setting up the Blob storage associated with the workspace.  
The following call retrieves the Azure Blob Store associated with your workspace.  
Note that workspaceblobstore is **the name of this store and CANNOT BE CHANGED and must be used as is**.  
  
If you want to register another Datastore, please follow the instructions from here:
https://docs.microsoft.com/en-us/azure/machine-learning/service/how-to-access-data#register-a-datastore

In [None]:
datastore = Datastore(ws, "workspaceblobstore")

print('Datastore details:')
print('Datastore Account Name: ' + datastore.account_name)
print('Datastore Workspace Name: ' + datastore.workspace.name)
print('Datastore Container Name: ' + datastore.container_name)

## Setup Input and Output

For this example we will upload a file in the provided Datastore. These are some helper methods to achieve that.

In [None]:
def create_local_file(content, file_name):
    # create a file in a local temporary directory
    temp_dir = mkdtemp()
    with open(path.join(temp_dir, file_name), 'w') as f:
        f.write(content)
    return temp_dir


def upload_file_to_datastore(datastore, file_name, content):
    src_dir = create_local_file(content=content, file_name=file_name)
    datastore.upload(src_dir=src_dir, overwrite=True, show_progress=True)

Here we associate the input DataReference with an existing file in the provided Datastore. Feel free to upload the file of your choice manually or use the *upload_file_to_datastore* method. 

In [None]:
file_name="input.txt"

upload_file_to_datastore(datastore=datastore, 
                         file_name=file_name, 
                         content="this is the content of the file")

testdata = DataReference(datastore=datastore, 
                         path_on_datastore=file_name, 
                         data_reference_name="input")

outputdata = PipelineData(name="output", datastore=datastore)

## Setup AzureBatch Job Binaries

AzureBatch can run a task within the job and here we put a simple .cmd file to be executed. Feel free to put any binaries in the folder, or modify the .cmd file as needed, they will be uploaded once we create the AzureBatch Step.

In [None]:
binaries_folder = "azurebatch/job_binaries"
if not os.path.isdir(binaries_folder):
    os.makedirs(binaries_folder)

file_name="azurebatch.cmd"
with open(path.join(binaries_folder, file_name), 'w') as f:
    f.write("copy \"%1\" \"%2\"")

## Create an AzureBatchStep

AzureBatchStep is used to submit a job to the attached Azure Batch compute.
- **name:** Name of the step
- **pool_id:** Name of the pool, it can be an existing pool, or one that will be created when the job is submitted
- **inputs:** List of inputs that will be processed by the job
- **outputs:** List of outputs the job will create
- **executable:** The executable that will run as part of the job
- **arguments:** Arguments for the executable. They can be plain string format, inputs, outputs or parameters
- **compute_target:** The compute target where the job will run.
- **source_directory:** The local directory with binaries to be executed by the job

Optional parameters:

- **create_pool:** Boolean flag to indicate whether create the pool before running the jobs
- **delete_batch_job_after_finish:** Boolean flag to indicate whether to delete the job from Batch account after it's finished
- **delete_batch_pool_after_finish:** Boolean flag to indicate whether to delete the pool after the job finishes
- **is_positive_exit_code_failure:** Boolean flag to indicate if the job fails if the task exists with a positive code
- **vm_image_urn:** If create_pool is true and VM uses VirtualMachineConfiguration.  
 Value format: 'urn:publisher:offer:sku'.  
 Example: urn:MicrosoftWindowsServer:WindowsServer:2012-R2-Datacenter  
 For more details:  
 https://docs.microsoft.com/en-us/azure/virtual-machines/windows/cli-ps-findimage#table-of-commonly-used-windows-images and  
 https://docs.microsoft.com/en-us/azure/virtual-machines/linux/cli-ps-findimage#find-specific-images
- **run_task_as_admin:** Boolean flag to indicate if the task should run with Admin privileges
- **target_compute_nodes:** Assumes create_pool is true, indicates how many compute nodes will be added to the pool
- **source_directory:** Local folder that contains the module binaries, executable, assemblies etc.
- **executable:** Name of the command/executable that will be executed as part of the job
- **arguments:** Arguments for the command/executable
- **inputs:** List of input port bindings
- **outputs:** List of output port bindings
- **vm_size:** If create_pool is true, indicating Virtual machine size of the compute nodes
- **compute_target:** BatchCompute compute
- **allow_reuse:** Whether the module should reuse previous results when run with the same settings/inputs
- **version:** A version tag to denote a change in functionality for the module

In [None]:
step = AzureBatchStep(
            name="Azure Batch Job",
            pool_id="MyPoolName", # Replace this with the pool name of your choice
            inputs=[testdata],
            outputs=[outputdata],
            executable="azurebatch.cmd",
            arguments=[testdata, outputdata],
            compute_target=batch_compute,
            source_directory=binaries_folder,
)

## Build and Submit the Pipeline

In [None]:
pipeline = Pipeline(workspace=ws, steps=[step])
pipeline_run = Experiment(ws, 'azurebatch_sample').submit(pipeline)

## Visualize the Running Pipeline

In [None]:
from azureml.widgets import RunDetails
RunDetails(pipeline_run).show()