Copyright (c) Microsoft Corporation. All rights reserved.  
Licensed under the MIT License.

![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/machine-learning-pipelines/intro-to-pipelines/aml-pipelines-use-adla-as-compute-target.png)

# AML Pipeline with AdlaStep

This notebook is used to demonstrate the use of AdlaStep in AML Pipelines. [AdlaStep](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.adla_step.adlastep?view=azure-ml-py) is used to run U-SQL scripts using Azure Data Lake Analytics service. 

## AML and Pipeline SDK-specific imports

In [None]:
import os
from msrest.exceptions import HttpOperationError

import azureml.core
from azureml.exceptions import ComputeTargetException
from azureml.core import Workspace, Experiment
from azureml.core.compute import ComputeTarget, AdlaCompute
from azureml.core.datastore import Datastore
from azureml.data.data_reference import DataReference
from azureml.pipeline.core import Pipeline, PipelineData
from azureml.pipeline.steps import AdlaStep

# Check core SDK version number
print("SDK version:", azureml.core.VERSION)

## Initialize Workspace

Initialize a workspace object from persisted configuration. If you are using an Azure Machine Learning Notebook VM, you are all set. Otherwise, make sure you go through the [configuration Notebook](https://aka.ms/pl-config) first if you haven't.

In [None]:
ws = Workspace.from_config()
print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep = '\n')

## Attach ADLA account to workspace

To submit jobs to Azure Data Lake Analytics service, you must first attach your ADLA account to the workspace. You'll need to provide the account name and resource group of ADLA account to complete this part.

In [None]:
adla_compute_name = 'testadl' # Name to associate with new compute in workspace

# ADLA account details needed to attach as compute to workspace
adla_account_name = "<adla_account_name>" # Name of the Azure Data Lake Analytics account
adla_resource_group = "<adla_resource_group>" # Name of the resource group which contains this account

try:
    # check if already attached
    adla_compute = AdlaCompute(ws, adla_compute_name)
except ComputeTargetException:
    print('attaching adla compute...')
    attach_config = AdlaCompute.attach_configuration(resource_group=adla_resource_group, account_name=adla_account_name)
    adla_compute = ComputeTarget.attach(ws, adla_compute_name, attach_config)
    adla_compute.wait_for_completion()

print("Using ADLA compute:{}".format(adla_compute.cluster_resource_id))
print("Provisioning state:{}".format(adla_compute.provisioning_state))
print("Provisioning errors:{}".format(adla_compute.provisioning_errors))

## Register Data Lake Storage as Datastore

To register Data Lake Storage as Datastore in workspace, you'll need account information like account name, resource group and subscription Id. 

> AdlaStep can only work with data stored in the **default** Data Lake Storage of the Data Lake Analytics account provided above. If the data you need to work with is in a non-default storage, you can use a DataTransferStep to copy the data before training. You can find the default storage by opening your Data Lake Analytics account in Azure portal and then navigating to 'Data sources' item under Settings in the left pane.

### Grant Azure AD application access to Data Lake Storage

You'll also need to provide an Active Directory application which can access Data Lake Storage. [This document](https://docs.microsoft.com/en-us/azure/data-lake-store/data-lake-store-service-to-service-authenticate-using-active-directory) contains step-by-step instructions on how to create an AAD application and assign to Data Lake Storage. Couple of important notes when assigning permissions to AAD app:

- Access should be provided at root folder level.
- In 'Assign permissions' pane, select Read, Write, and Execute permissions for 'This folder and all children'. Add as 'An access permission entry and a default permission entry' to make sure application can access any new files created in the future.

In [None]:
datastore_name = 'MyAdlsDatastore' # Name to associate with data store in workspace

# ADLS storage account details needed to register as a Datastore
subscription_id = os.getenv("ADL_SUBSCRIPTION_62", "<my-subscription-id>") # subscription id of ADLS account
resource_group = os.getenv("ADL_RESOURCE_GROUP_62", "<my-resource-group>") # resource group of ADLS account
store_name = os.getenv("ADL_STORENAME_62", "<my-datastore-name>") # ADLS account name
tenant_id = os.getenv("ADL_TENANT_62", "<my-tenant-id>") # tenant id of service principal
client_id = os.getenv("ADL_CLIENTID_62", "<my-client-id>") # client id of service principal
client_secret = os.getenv("ADL_CLIENT_62_SECRET", "<my-client-secret>") # the secret of service principal

try:
    adls_datastore = Datastore.get(ws, datastore_name)
    print("found datastore with name: %s" % datastore_name)
except HttpOperationError:
    adls_datastore = Datastore.register_azure_data_lake(
        workspace=ws,
        datastore_name=datastore_name,
        subscription_id=subscription_id, # subscription id of ADLS account
        resource_group=resource_group, # resource group of ADLS account
        store_name=store_name, # ADLS account name
        tenant_id=tenant_id, # tenant id of service principal
        client_id=client_id, # client id of service principal
        client_secret=client_secret) # the secret of service principal
    print("registered datastore with name: %s" % datastore_name)

## Setup inputs and outputs

For purpose of this demo, we're going to execute a simple U-SQL script that reads a CSV file and writes portion of content to a new text file. First, let's create our sample input which contains 3 columns: employee Id, name and department Id.

In [None]:
# create a folder to store files for our job
sample_folder = "adla_sample"

if not os.path.isdir(sample_folder):
    os.mkdir(sample_folder)

In [None]:
%%writefile $sample_folder/sample_input.csv
1, Noah, 100
3, Liam, 100
4, Emma, 100
5, Jacob, 100
7, Jennie, 100

Upload this file to Data Lake Storage at location `adla_sample/sample_input.csv` and create a DataReference to refer to this file.

In [None]:
sample_input = DataReference(
    datastore=adls_datastore,
    data_reference_name="employee_data",
    path_on_datastore="adla_sample/sample_input.csv")

Create PipelineData object to store output produced by AdlaStep.

In [None]:
sample_output = PipelineData("sample_output", datastore=adls_datastore)

## Write your U-SQL script

Now let's write a U-Sql script that reads above CSV file and writes the name column to a new file.

Instead of hard-coding paths in your script, you can use `@@name@@` syntax to refer to inputs, outputs, and parameters.

- If `name` is the name of an input or output port binding, any occurrences of `@@name@@` in the script are replaced with actual data path of corresponding port binding.
- If `name` matches any key in the `params` dictionary, any occurrences of `@@name@@` will be replaced with corresponding value in the dictionary.

Note the use of @@ syntax in the below script. Before submitting the job to Data Lake Analytics service, `@@emplyee_data@@` will be replaced with actual path of `sample_input.csv` in Data Lake Storage. Similarly, `@@sample_output@@` will be replaced with a path in Data Lake Storage which will be used to store intermediate output produced by the step.

In [None]:
%%writefile $sample_folder/sample_script.usql

// Read employee information from csv file
@employees = 
    EXTRACT EmpId int, EmpName string, DeptId int
    FROM "@@employee_data@@"
    USING Extractors.Csv();

// Export employee names to text file
OUTPUT
(
    SELECT EmpName
    FROM @employees
)
TO "@@sample_output@@"
USING Outputters.Text();

## Create an AdlaStep

**[AdlaStep](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.adla_step.adlastep?view=azure-ml-py)** is used to run U-SQL script using Azure Data Lake Analytics.

- **name:** Name of module
- **script_name:** name of U-SQL script file
- **inputs:** List of input port bindings
- **outputs:** List of output port bindings
- **compute_target:** the ADLA compute to use for this job
- **params:** Dictionary of name-value pairs to pass to U-SQL job *(optional)*
- **degree_of_parallelism:** the degree of parallelism to use for this job *(optional)*
- **priority:** the priority value to use for the current job *(optional)*
- **runtime_version:** the runtime version of the Data Lake Analytics engine *(optional)*
- **source_directory:** folder that contains the script, assemblies etc. *(optional)*
- **hash_paths:** list of paths to hash to detect a change (script file is always hashed) *(optional)*

The best practice is to use separate folders for scripts and its dependent files for each step and specify that folder as the `source_directory` for the step. This helps reduce the size of the snapshot created for the step (only the specific folder is snapshotted). Since changes in any files in the `source_directory` would trigger a re-upload of the snapshot, this helps keep the reuse of the step when there are no changes in the `source_directory` of the step.

In [None]:
adla_step = AdlaStep(
    name='extract_employee_names',
    script_name='sample_script.usql',
    source_directory=sample_folder,
    inputs=[sample_input],
    outputs=[sample_output],
    compute_target=adla_compute)

## Build and Submit the Experiment

In [None]:
pipeline = Pipeline(workspace=ws, steps=[adla_step])

pipeline_run = Experiment(ws, 'adla_sample').submit(pipeline)
pipeline_run.wait_for_completion()

### View Run Details

In [None]:
from azureml.widgets import RunDetails
RunDetails(pipeline_run).show()