Copyright (c) Microsoft Corporation. All rights reserved.  
Licensed under the MIT License.

![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/machine-learning-pipelines/intro-to-pipelines/aml-pipelines-with-data-dependency-steps.png)

# Showcasing DataPath and PipelineParameter

This notebook demonstrateas the use of [**DataPath**](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.datapath.datapath?view=azure-ml-py) and [**PipelineParameters**](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipelineparameter?view=azure-ml-py) in AML Pipeline. You will learn how strings and [**DataPath**](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.datapath.datapath?view=azure-ml-py) can be parameterized and submitted to AML Pipelines via [**PipelineParameters**](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipelineparameter?view=azure-ml-py).
To see more about how parameters work between steps, please refer [aml-pipelines-with-data-dependency-steps](https://aka.ms/pl-data-dep).

* [How to create a Pipeline with a DataPath PipelineParameter](#index1)
* [How to submit a Pipeline with a DataPath PipelineParameter](#index2)
* [How to submit a Pipeline and change the DataPath PipelineParameter value from the sdk](#index3)
* [How to submit a Pipeline and change the DataPath PipelineParameter value using a REST call](#index4)
* [How to create a datastore trigger schedule and use the data_path_parameter_name to get the path of the changed blob in the Pipeline](#index5)

## Azure Machine Learning and Pipeline SDK-specific imports

In [None]:
import azureml.core
from azureml.core import Workspace, Experiment
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.data.datapath import DataPath, DataPathComputeBinding
from azureml.widgets import RunDetails

from azureml.pipeline.core import PipelineParameter
from azureml.pipeline.core import Pipeline, PipelineRun
from azureml.pipeline.steps import PythonScriptStep

# Check core SDK version number
print("SDK version:", azureml.core.VERSION)

## Initialize Workspace

Initialize a workspace object from persisted configuration. If you are using an Azure Machine Learning Notebook VM, you are all set. Otherwise, make sure the config file is present at .\config.json

If you don't have a config.json file, go through the [configuration Notebook](https://aka.ms/pl-config) first.

This sets you up with a working config file that has information on your workspace, subscription id, etc.

In [None]:
ws = Workspace.from_config()
print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep = '\n')

## Create an Azure ML experiment

Let's create an experiment named "automl-classification" and a folder to hold the training scripts. The script runs will be recorded under the experiment in Azure.

In [None]:
# Choose a name for the run history container in the workspace.
experiment_name = 'showcasing-datapath'
source_directory  = '.'

experiment = Experiment(ws, experiment_name)
experiment

## Create or Attach an AmlCompute cluster
You will need to create a [compute target](https://docs.microsoft.com/azure/machine-learning/service/concept-azure-machine-learning-architecture#compute-target) for your AutoML run. In this tutorial, you get the default `AmlCompute` as your training compute resource.

In [None]:
# Choose a name for your cluster.
amlcompute_cluster_name = "cpu-cluster"

found = False
# Check if this compute target already exists in the workspace.
cts = ws.compute_targets
if amlcompute_cluster_name in cts and cts[amlcompute_cluster_name].type == 'AmlCompute':
    found = True
    print('Found existing compute target.')
    compute_target = cts[amlcompute_cluster_name]
    
if not found:
    print('Creating a new compute target...')
    provisioning_config = AmlCompute.provisioning_configuration(vm_size = "STANDARD_D2_V2", # for GPU, use "STANDARD_NC6"
                                                                #vm_priority = 'lowpriority', # optional
                                                                max_nodes = 4)

    # Create the cluster.
    compute_target = ComputeTarget.create(ws, amlcompute_cluster_name, provisioning_config)
    
    # Can poll for a minimum number of nodes and for a specific timeout.
    # If no min_node_count is provided, it will use the scale settings for the cluster.
    compute_target.wait_for_completion(show_output = True, timeout_in_minutes = 10)
    
     # For a more detailed view of current AmlCompute status, use get_status().

## Data and arguments setup 

We will setup a trining script to run and its arguments to be used. The sample training script below will print the two arguments to show what has been passed to pipeline.

In [None]:
%%writefile train_with_datapath.py
import argparse
import os

parser = argparse.ArgumentParser("train")
parser.add_argument("--arg1", type=str, help="sample string argument")
parser.add_argument("--arg2", type=str, help="sample datapath argument")
args = parser.parse_args()

print("Sample string argument  : %s" % args.arg1)
print("Sample datapath argument: %s" % args.arg2)


Let's setup string and DataPath arguments using PipelineParameter. 

Note that Pipeline accepts a tuple of the form ([**PipelineParameters**](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipelineparameter?view=azure-ml-py) , [**DataPathComputeBinding**](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.datapath.datapathcomputebinding?view=azure-ml-py)) as an input. DataPath defines the location of input data. DataPathComputeBinding defines how the data is consumed during step execution. The DataPath can be modified at pipeline submission time with a DataPath parameter, while the compute binding does not change. For static data inputs, we use [**DataReference**](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.data_reference.datareference?view=azure-ml-py) which defines both the data location and compute binding.

In [None]:
def_blob_store = ws.get_default_datastore()
print("Default datastore's name: {}".format(def_blob_store.name))

data_path = DataPath(datastore=def_blob_store, path_on_datastore='sample_datapath1')
datapath1_pipeline_param = PipelineParameter(name="input_datapath", default_value=data_path)
datapath_input = (datapath1_pipeline_param, DataPathComputeBinding(mode='mount'))

string_pipeline_param = PipelineParameter(name="input_string", default_value='sample_string1')

<a id='index1'></a>

## Create a Pipeline with a DataPath PipelineParameter

Note that the ```datapath_input``` is specified on both arguments and inputs to create a step.

In [None]:
train_step = PythonScriptStep(
    name='train_step',
    script_name="train_with_datapath.py",
    arguments=["--arg1", string_pipeline_param, "--arg2", datapath_input],
    inputs=[datapath_input],
    compute_target=compute_target, 
    source_directory=source_directory)
print("train_step created")

pipeline = Pipeline(workspace=ws, steps=[train_step])
print("pipeline with the train_step created")

<a id='index2'></a>

## Submit a Pipeline with a DataPath PipelineParameter

Pipelines can be submitted with default values of PipelineParameters by not specifying any parameters.

In [None]:
pipeline_run = experiment.submit(pipeline)
print("Pipeline is submitted for execution")

In [None]:
RunDetails(pipeline_run).show()

In [None]:
pipeline_run.wait_for_completion()

<a id='index3'></a>

## Submit a Pipeline and change the DataPath PipelineParameter value from the sdk

Or Pipelines can be submitted with values other than default ones by using pipeline_parameters. 

In [None]:
pipeline_run_with_params = experiment.submit(pipeline, \
        pipeline_parameters={'input_datapath': DataPath(datastore=def_blob_store, path_on_datastore='sample_datapath2'),
                         'input_string': 'sample_string2'}) 

In [None]:
RunDetails(pipeline_run_with_params).show()

In [None]:
pipeline_run_with_params.wait_for_completion()

<a id='index4'></a>

## Submit a Pipeline and change the DataPath PipelineParameter value using a REST call

Let's published the pipeline to use the rest endpoint of the published pipeline.

In [None]:
published_pipeline = pipeline.publish(name="DataPath_Pipeline", description="Pipeline to test Datapath", continue_on_step_failure=True)
published_pipeline

In [None]:
from azureml.core.authentication import InteractiveLoginAuthentication
import requests

auth = InteractiveLoginAuthentication()
aad_token = auth.get_authentication_header()

rest_endpoint = published_pipeline.endpoint

print("You can perform HTTP POST on URL {} to trigger this pipeline".format(rest_endpoint))

In [None]:
# specify the param when running the pipeline
response = requests.post(rest_endpoint, 
                         headers=aad_token, 
                         json={"ExperimentName": "MyRestPipeline",
                               "RunSource": "SDK",
                               "DataPathAssignments": {
                                   "input_datapath": { 
                                       "DataStoreName": def_blob_store.name,
                                       "RelativePath": 'sample_datapath3'
                                   }
                               },
                               "ParameterAssignments": {"input_string": "sample_string3"}
                              }
                        )

In [None]:
try:
    response.raise_for_status()
except Exception:    
    raise Exception('Received bad response from the endpoint: {}\n'
                    'Response Code: {}\n'
                    'Headers: {}\n'
                    'Content: {}'.format(rest_endpoint, response.status_code, response.headers, response.content))

run_id = response.json().get('Id')
print('Submitted pipeline run: ', run_id)

In [None]:
published_pipeline_run_via_rest = PipelineRun(ws.experiments["MyRestPipeline"], run_id)
RunDetails(published_pipeline_run_via_rest).show()

In [None]:
published_pipeline_run_via_rest.wait_for_completion()

<a id='index5'></a>

## Create a Datastore trigger schedule and use data path parameter

When the Pipeline is scheduled with DataPath parameter, it will be triggered by the modified or added data in the DataPath. ```path_on_datastore``` should be a folder and the value of the DataPath will be replaced by the path of the modified data.

In [None]:
from azureml.pipeline.core import Schedule

schedule = Schedule.create(workspace=ws, 
                           name="Datastore_trigger_schedule",
                           pipeline_id=published_pipeline.id, 
                           experiment_name='Scheduled_Pipeline',
                           datastore=def_blob_store,
                           wait_for_provisioning=True,
                           description="Datastore trigger schedule demo",
                           path_on_datastore="sample_datapath_for_folder",
                           data_path_parameter_name="input_datapath") #Same name as used above to create PipelineParameter

print("Created schedule with id: {}".format(schedule.id))

In [None]:
schedule.disable()
schedule