Copyright (c) Microsoft Corporation. All rights reserved.  
Licensed under the MIT License.

![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/machine-learning-pipelines/intro-to-pipelines/aml-pipelines-with-data-dependency-steps.png)

# Showcasing Dataset and PipelineParameter

This notebook demonstrates how a [**FileDataset**](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.filedataset?view=azure-ml-py) or [**TabularDataset**](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.tabulardataset?view=azure-ml-py) can be parametrized with [**PipelineParameters**](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipelineparameter?view=azure-ml-py) in an AML [Pipeline](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipeline(class)?view=azure-ml-py). By parametrizing datasets, you can dynamically run pipeline experiments with different datasets without any code change.

A common use case is building a training pipeline with a sample of your training data for quick iterative development. When you're ready to test and deploy your pipeline at scale, you can pass in your full training dataset to the pipeline experiment without making any changes to your training script. 
 
To see more about how parameters work between steps, please refer [aml-pipelines-with-data-dependency-steps](https://aka.ms/pl-data-dep).

* [How to create a Pipeline with a Dataset PipelineParameter](#index1)
* [How to submit a Pipeline with a Dataset PipelineParameter](#index2)
* [How to submit a Pipeline and change the Dataset PipelineParameter value from the sdk](#index3)
* [How to submit a Pipeline and change the Dataset PipelineParameter value using a REST call](#index4)

## Azure Machine Learning and Pipeline SDK-specific imports

In [None]:
import azureml.core
from azureml.core import Workspace, Experiment, Dataset
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.data.dataset_consumption_config import DatasetConsumptionConfig
from azureml.widgets import RunDetails

from azureml.pipeline.core import PipelineParameter
from azureml.pipeline.core import Pipeline, PipelineRun
from azureml.pipeline.steps import PythonScriptStep

# Check core SDK version number
print("SDK version:", azureml.core.VERSION)

## Initialize Workspace

Initialize a workspace object from persisted configuration. If you are using an Azure Machine Learning Notebook VM, you are all set. Otherwise, make sure the config file is present at .\config.json

If you don't have a config.json file, go through the [configuration Notebook](https://aka.ms/pl-config) first.

This sets you up with a working config file that has information on your workspace, subscription id, etc.

In [None]:
ws = Workspace.from_config()
print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep = '\n')

## Create an Azure ML experiment

Let's create an experiment named "showcasing-dataset" and a folder to hold the training scripts. The script runs will be recorded under the experiment in Azure.

In [None]:
# Choose a name for the run history container in the workspace.
experiment_name = 'showcasing-dataset'
source_directory  = '.'

experiment = Experiment(ws, experiment_name)
experiment

## Create or Attach an AmlCompute cluster
You will need to create a [compute target](https://docs.microsoft.com/azure/machine-learning/service/concept-azure-machine-learning-architecture#compute-target) for your AutoML run. In this tutorial, you get the default `AmlCompute` as your training compute resource.

In [None]:
# Choose a name for your cluster.
amlcompute_cluster_name = "cpu-cluster"

found = False
# Check if this compute target already exists in the workspace.
cts = ws.compute_targets
if amlcompute_cluster_name in cts and cts[amlcompute_cluster_name].type == 'AmlCompute':
    found = True
    print('Found existing compute target.')
    compute_target = cts[amlcompute_cluster_name]
    
if not found:
    print('Creating a new compute target...')
    provisioning_config = AmlCompute.provisioning_configuration(vm_size = "STANDARD_D2_V2", # for GPU, use "STANDARD_NC6"
                                                                #vm_priority = 'lowpriority', # optional
                                                                max_nodes = 4)

    # Create the cluster.
    compute_target = ComputeTarget.create(ws, amlcompute_cluster_name, provisioning_config)
    
    # Can poll for a minimum number of nodes and for a specific timeout.
    # If no min_node_count is provided, it will use the scale settings for the cluster.
    compute_target.wait_for_completion(show_output = True, timeout_in_minutes = 10)
    
     # For a more detailed view of current AmlCompute status, use get_status().

## Dataset Configuration

The following steps detail how to create a [FileDataset](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.filedataset?view=azure-ml-py) and [TabularDataset](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.tabulardataset?view=azure-ml-py) from an external CSV file, and configure them to be used by a [Pipeline](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipeline(class)?view=azure-ml-py):

1. Create a dataset from a csv file
2. Create a [PipelineParameter](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipelineparameter?view=azure-ml-py) object and set the `default_value` to the dataset. [PipelineParameter](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipelineparameter?view=azure-ml-py) objects enabled arguments to be passed into Pipelines when they are resubmitted after creation. The `name` is referenced later on when we submit additional pipeline runs with different input datasets. 
3. Create a [DatasetConsumptionConfig](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.dataset_consumption_config.datasetconsumptionconfig?view=azure-ml-py) object from the [PiepelineParameter](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipelineparameter?view=azure-ml-py). The [DatasetConsumptionConfig](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.dataset_consumption_config.datasetconsumptionconfig?view=azure-ml-py) object specifies how the dataset should be used by the remote compute where the pipeline is run. **NOTE** only [DatasetConsumptionConfig](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.dataset_consumption_config.datasetconsumptionconfig?view=azure-ml-py) objects built on [FileDataset](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.filedataset?view=azure-ml-py) can be set `as_mount()` or `as_download()` on the remote compute.

In [None]:
file_dataset = Dataset.File.from_files('https://dprepdata.blob.core.windows.net/demo/Titanic.csv')
file_pipeline_param = PipelineParameter(name="file_ds_param", default_value=file_dataset)
file_ds_consumption = DatasetConsumptionConfig("file_dataset", file_pipeline_param).as_mount()

tabular_dataset = Dataset.Tabular.from_delimited_files('https://dprepdata.blob.core.windows.net/demo/Titanic.csv')
tabular_pipeline_param = PipelineParameter(name="tabular_ds_param", default_value=tabular_dataset)
tabular_ds_consumption = DatasetConsumptionConfig("tabular_dataset", tabular_pipeline_param)

We will setup a training script to ingest our passed-in datasets and print their contents. **NOTE** the names of the datasets referenced inside the training script correspond to the `name` of their respective [DatasetConsumptionConfig](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.dataset_consumption_config.datasetconsumptionconfig?view=azure-ml-py) objects we defined above.

In [None]:
%%writefile train_with_dataset.py
from azureml.core import Run

input_file_ds_path = Run.get_context().input_datasets['file_dataset']
with open(input_file_ds_path, 'r') as f:
    content = f.read()
    print(content)

input_tabular_ds = Run.get_context().input_datasets['tabular_dataset']
tabular_df = input_tabular_ds.to_pandas_dataframe()
print(tabular_df)

<a id='index1'></a>

## Create a Pipeline with a Dataset PipelineParameter

Note that the ```file_ds_consumption``` and ```tabular_ds_consumption``` are specified as both arguments and inputs to create a step.

In [None]:
train_step = PythonScriptStep(
    name="train_step",
    script_name="train_with_dataset.py",
    arguments=["--param1", file_ds_consumption, "--param2", tabular_ds_consumption],
    inputs=[file_ds_consumption, tabular_ds_consumption],
    compute_target=compute_target,
    source_directory=source_directory)

print("train_step created")

pipeline = Pipeline(workspace=ws, steps=[train_step])
print("pipeline with the train_step created")

<a id='index2'></a>

## Submit a Pipeline with a Dataset PipelineParameter

Pipelines can be submitted with default values of PipelineParameters by not specifying any parameters.

In [None]:
# Pipeline will run with default file_ds and tabular_ds
pipeline_run = experiment.submit(pipeline)
print("Pipeline is submitted for execution")

In [None]:
RunDetails(pipeline_run).show()

In [None]:
pipeline_run.wait_for_completion()

<a id='index3'></a>

## Submit a Pipeline with a different Dataset PipelineParameter value from the SDK

The training pipeline can be reused with different input datasets by passing them in as PipelineParameters

In [None]:
iris_file_ds = Dataset.File.from_files('https://raw.githubusercontent.com/Azure/MachineLearningNotebooks/'
                                        '4e7b3784d50e81c313c62bcdf9a330194153d9cd/how-to-use-azureml/work-with-data/'
                                        'datasets-tutorial/train-with-datasets/train-dataset/iris.csv')

iris_tabular_ds = Dataset.Tabular.from_delimited_files('https://raw.githubusercontent.com/Azure/MachineLearningNotebooks/'
                                                       '4e7b3784d50e81c313c62bcdf9a330194153d9cd/how-to-use-azureml/work-with-data/'
                                                       'datasets-tutorial/train-with-datasets/train-dataset/iris.csv')

In [None]:
pipeline_run_with_params = experiment.submit(pipeline, pipeline_parameters={'file_ds_param': iris_file_ds, 'tabular_ds_param': iris_tabular_ds}) 

In [None]:
RunDetails(pipeline_run_with_params).show()

In [None]:
pipeline_run_with_params.wait_for_completion()

<a id='index4'></a>

## Dynamically Set the Dataset PipelineParameter Values using a REST Call

Let's publish the pipeline we created previously, so we can generate a pipeline endpoint. We can then submit the iris datasets to the pipeline REST endpoint by passing in their IDs. 

In [None]:
published_pipeline = pipeline.publish(name="Dataset_Pipeline", description="Pipeline to test Dataset PipelineParameter", continue_on_step_failure=True)
published_pipeline

In [None]:
published_pipeline.submit(ws, experiment_name="publishedexperiment", pipeline_parameters={'file_ds_param': iris_file_ds, 'tabular_ds_param': iris_tabular_ds})

In [None]:
from azureml.core.authentication import InteractiveLoginAuthentication
import requests

auth = InteractiveLoginAuthentication()
aad_token = auth.get_authentication_header()

rest_endpoint = published_pipeline.endpoint

print("You can perform HTTP POST on URL {} to trigger this pipeline".format(rest_endpoint))

In [None]:
# specify the param when running the pipeline
response = requests.post(rest_endpoint, 
                         headers=aad_token, 
                         json={"ExperimentName": "MyRestPipeline",
                               "RunSource": "SDK",
                               "DataSetDefinitionValueAssignments": {"file_ds_param": {"SavedDataSetReference": {"Id": iris_file_ds.id}},
                                                                     "tabular_ds_param": {"SavedDataSetReference": {"Id": iris_tabular_ds.id}}}
                              }
                        )

In [None]:
try:
    response.raise_for_status()
except Exception:    
    raise Exception('Received bad response from the endpoint: {}\n'
                    'Response Code: {}\n'
                    'Headers: {}\n'
                    'Content: {}'.format(rest_endpoint, response.status_code, response.headers, response.content))

run_id = response.json().get('Id')
print('Submitted pipeline run: ', run_id)

In [None]:
published_pipeline_run_via_rest = PipelineRun(ws.experiments["MyRestPipeline"], run_id)
RunDetails(published_pipeline_run_via_rest).show()

In [None]:
published_pipeline_run_via_rest.wait_for_completion()

<a id='index5'></a>