Copyright (c) Microsoft Corporation. All rights reserved.  
Licensed under the MIT License.

![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/machine-learning-pipelines/intro-to-pipelines/aml-pipelines-setup-versioned-pipeline-endpoints.png)


# How to Setup a PipelineEndpoint and Submit a Pipeline Using the PipelineEndpoint.
In this notebook, we will see how to setup a PipelineEndpoint and run a specific pipeline version.

PipelineEndpoint can be used to update a published pipeline while maintaining the same endpoint.
PipelineEndpoint provides a way to keep track of [PublishedPipelines](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.publishedpipeline) using versions. PipelineEndpoint uses endpoint with version information to trigger an underlying published pipeline. Pipeline endpoints are uniquely named within a workspace.  


### Prerequisites and AML Basics
If you are using an Azure Machine Learning Notebook VM, you are all set. Otherwise, make sure you go through the [configuration Notebook](https://aka.ms/pl-config) first if you haven't. This sets you up with a working config file that has information on your workspace, subscription id, etc.


In [None]:
from azureml.core import Workspace

ws = Workspace.from_config()
print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep = '\n')

### Notebook Overview
In this notebook, we provide an introduction to Azure machine learning PipelineEndpoints. It covers:
* [Create PipelineEndpoint](#Create-PipelineEndpoint), How to create PipelineEndpoint.
* [Retrieving PipelineEndpoint](#Retrieving-PipelineEndpoint), How to get specific PipelineEndpoint from worskpace by name/Id and get all [PipelineEndpoints](#Get-all-PipelineEndpoints-in-workspace) within workspace.
* [PipelineEndpoint Properties](#PipelineEndpoint-properties). How to get and set PipelineEndpoint properties, such as default version of PipelineEndpoint.
* [PipelineEndpoint Submission](#PipelineEndpoint-Submission). How to run a Pipeline using PipelineEndpoint.

###  Create PipelineEndpoint
Following are required input parameters to create PipelineEndpoint:

* *workspace*: AML workspace.
* *name*: name of PipelineEndpoint, it is unique within workspace.
* *description*: description details for PipelineEndpoint.
* *pipeline*: A [Pipeline](#Steps-to-create-simple-Pipeline) or [PublishedPipeline](#Publish-Pipeline), to set default version of PipelineEndpoint.                                                       

####  Initialization, Steps to create a Pipeline

The best practice is to use separate folders for scripts and its dependent files for each step and specify that folder as the `source_directory` for the step. This helps reduce the size of the snapshot created for the step (only the specific folder is snapshotted). Since changes in any files in the `source_directory` would trigger a re-upload of the snapshot, this helps keep the reuse of the step when there are no changes in the `source_directory` of the step.

In [None]:
from azureml.core.compute import AmlCompute, ComputeTarget
from azureml.pipeline.steps import PythonScriptStep
from azureml.pipeline.core import Pipeline

#Retrieve an already attached Azure Machine Learning Compute
from azureml.core.compute_target import ComputeTargetException
aml_compute_target = "cpu-cluster"
try:
    aml_compute = AmlCompute(ws, aml_compute_target)
    print("Found existing compute target: {}".format(aml_compute_target))
except ComputeTargetException:
    print("Creating new compute target: {}".format(aml_compute_target))
    
    provisioning_config = AmlCompute.provisioning_configuration(vm_size = "STANDARD_D2_V2",
                                                                min_nodes = 1, 
                                                                max_nodes = 4)    
    aml_compute = ComputeTarget.create(ws, aml_compute_target, provisioning_config)
    aml_compute.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)

# source_directory
source_directory = 'publish_run_train'
# define a single step pipeline for demonstration purpose.
trainStep = PythonScriptStep(
    name="Training_Step",
    script_name="train.py", 
    compute_target=aml_compute_target, 
    source_directory=source_directory
)
print("TrainStep created")
# build and validate Pipeline
pipeline = Pipeline(workspace=ws, steps=[trainStep])
print("Pipeline is built")

#### Publish Pipeline

In [None]:
from datetime import datetime

timenow = datetime.now().strftime('%m-%d-%Y-%H-%M')

pipeline_name = timenow + "-Pipeline"
print(pipeline_name)

published_pipeline = pipeline.publish(
    name=pipeline_name, 
    description=pipeline_name)
print("Newly published pipeline id: {}".format(published_pipeline.id))

#### Publishing PipelineEndpoint
Create PipelineEndpoint with required parameters: workspace, name, description and pipeline

In [None]:
from azureml.pipeline.core import PipelineEndpoint

pipeline_endpoint = PipelineEndpoint.publish(workspace=ws, name="PipelineEndpointTest",
                                            pipeline=pipeline, description="Test description Notebook")
pipeline_endpoint

### Retrieving PipelineEndpoint

PipelineEndpoint is uniquely defined by name and id within workspace. PipelineEndpoint in workspace can be retrived by Id or by name.

#### Get PipelineEndpoint by Name



In [None]:
pipeline_endpoint_by_name = PipelineEndpoint.get(workspace=ws, name="PipelineEndpointTest")
pipeline_endpoint_by_name

#### Get PipelineEndpoint by Id


In [None]:
#get the PipelineEndpoint Id
pipeline_endpoint_by_name = PipelineEndpoint.get(workspace=ws, name="PipelineEndpointTest")
endpoint_id = pipeline_endpoint_by_name.id

pipeline_endpoint_by_id = PipelineEndpoint.get(workspace=ws, id=endpoint_id)
pipeline_endpoint_by_id

#### Get all PipelineEndpoints in workspace
Returns all PipelineEndpoints within workspace

In [None]:
endpoint_list = PipelineEndpoint.list(workspace=ws, active_only=True)
endpoint_list

### PipelineEndpoint properties

#### Default Version of PipelineEndpoint
Default version of PipelineEndpoint starts from "0" and increments on addition of pipelines.

##### Get the Default Version

In [None]:
default_version = pipeline_endpoint_by_name.get_default_version()
default_version

#####  Set default version 


In [None]:
pipeline_endpoint_by_name.set_default_version("0")

#### Get the Published Pipeline corresponds to specific version of PipelineEndpoint

In [None]:
pipeline = pipeline_endpoint_by_name.get_pipeline("0")
pipeline

#### Get default version Published Pipeline

In [None]:
pipeline = pipeline_endpoint_by_name.get_pipeline()
pipeline

#### Add Published Pipeline to PipelineEndpoint, 
Adds a published pipeline (if its not present) using add() and if you want to add and set to default use add_default()

In [None]:
pipeline_endpoint_by_name.add(published_pipeline)

#### Add Published pipeline to PipelineEndpoint and set it to default version
Adding published pipeline to PipelineEndpoint if not present and set it to default

In [None]:
# Set Published Pipeline to PipelineEndpoint, if exists
pipeline_endpoint_by_name.set_default(published_pipeline)

#### Get all Versions in PipelineEndpoint
Returns list of published pipelines and its versions

In [None]:
versions = pipeline_endpoint_by_name.list_versions()

for ve in versions:
    print(ve.version)
    print(ve.pipeline.id)

#### Get all Published Pipelines in PipelineEndpoint
Returns all active pipelines in PipelineEnpoint, if active_only flag is set to True.

In [None]:
pipelines = pipeline_endpoint_by_name.list_pipelines(active_only=True)
pipelines

#### Name property of PipelineEndpoint
PipelineEndpoint is uniquely identified by name

##### Set Name PipelineEndpoint

In [None]:
pipeline_endpoint_by_name.set_name(name="NewName")

### PipelineEndpoint Submission
PipelineEndpoint triggers specific versioned pipeline or default pipeline by:
* Rest Endpoint 
* Submit call.

#### Run Pipeline by endpoint property of PipelineEndpoint
Run specific pipeline using endpoint property of PipelineEndpoint and executing http post.

In [None]:
pipeline_endpoint_by_name = PipelineEndpoint.get(workspace=ws, name="NewName")

# endpoint with id 
rest_endpoint_id =  pipeline_endpoint_by_name.endpoint

# for default version pipeline
rest_endpoint_id_without_version_with_id = rest_endpoint_id

# for specific version pipeline just append version info
version="0"
rest_endpoint_id_with_version = rest_endpoint_id_without_version_with_id+"/"+ version
print(rest_endpoint_id_with_version)
pipeline_endpoint_by_name

In [None]:
# endpoint with name
rest_endpoint_name = rest_endpoint_id.split("Id", 1)[0] + "Name?name=" + pipeline_endpoint_by_name.name

# for default version pipeline
rest_endpoint_name_without_version = rest_endpoint_name

# for specific version pipeline just append version info
version="0"
rest_endpoint_name_with_version = rest_endpoint_name_without_version+"&pipelineVersion="+ version
print(rest_endpoint_name_with_version)

[This notebook](https://aka.ms/pl-restep-auth) shows how to authenticate to AML workspace.

In [None]:
from azureml.core.authentication import InteractiveLoginAuthentication
import requests

auth = InteractiveLoginAuthentication()
aad_token = auth.get_authentication_header()

#endpoint = pipeline_endpoint_by_name.url

print("You can perform HTTP POST on URL {} to trigger this pipeline".format(rest_endpoint_name_with_version))

# specify the param when running the pipeline
response = requests.post(rest_endpoint_name_with_version, 
                         headers=aad_token, 
                         json={"ExperimentName": "default_pipeline",
                               "RunSource": "SDK",
                               "ParameterAssignments": {"1": "united", "2":"city"}})

In [None]:
try:
    response.raise_for_status()
except Exception:    
    raise Exception('Received bad response from the endpoint: {}\n'
                    'Response Code: {}\n'
                    'Headers: {}\n'
                    'Content: {}'.format(rest_endpoint, response.status_code, response.headers, response.content))

run_id = response.json().get('Id')
print('Submitted pipeline run: ', run_id)

#### Run Pipeline by Submit call of PipelineEndpoint 
Run specific pipeline using Submit api of PipelineEndpoint

In [None]:
# submit pipeline with specific version
run_id = pipeline_endpoint_by_name.submit("NewName", pipeline_version="0")
print(run_id)

# submit pipeline with default version
run_id = pipeline_endpoint_by_name.submit("NewName")
print(run_id)

#### Use Experiment.Submit() to Submit Pipeline
Run specific pipeline using Experiment submit api

In [None]:
from azureml.core import Experiment
pipeline_run = Experiment(ws, name="submit_from_endpoint").submit(pipeline_endpoint_by_name, tags={'endpoint_tag': "1"}, pipeline_version="0")