Copyright (c) Microsoft Corporation. All rights reserved.  
Licensed under the MIT License.

![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/machine-learning-pipelines/intro-to-pipelines/aml-pipelines-setup-schedule-for-a-published-pipeline.png)

# How to Setup a Schedule for a Published Pipeline or Pipeline Endpoint
In this notebook, we will show you how you can run an already published pipeline or a pipeline endpoint on a schedule.

## Prerequisites and AML Basics
If you are using an Azure Machine Learning Notebook VM, you are all set. Otherwise, make sure you go through the [configuration Notebook](https://aka.ms/pl-config) first if you haven't. This sets you up with a working config file that has information on your workspace, subscription id, etc.

### Initialization Steps

In [None]:
import azureml.core
from azureml.core import Workspace

# Check core SDK version number
print("SDK version:", azureml.core.VERSION)

ws = Workspace.from_config()
print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep = '\n')

### Compute Targets
#### Retrieve an already attached Azure Machine Learning Compute

> Note that if you have an AzureML Data Scientist role, you will not have permission to create compute resources. Talk to your workspace or IT admin to create the compute targets described in this section, if they do not already exist.

In [None]:
from azureml.core.compute import AmlCompute, ComputeTarget
from azureml.core.compute_target import ComputeTargetException
aml_compute_target = "cpu-cluster"
try:
    aml_compute = AmlCompute(ws, aml_compute_target)
    print("Found existing compute target: {}".format(aml_compute_target))
except ComputeTargetException:
    print("Creating new compute target: {}".format(aml_compute_target))
    
    provisioning_config = AmlCompute.provisioning_configuration(vm_size = "STANDARD_D2_V2",
                                                                min_nodes = 1, 
                                                                max_nodes = 4)    
    aml_compute = ComputeTarget.create(ws, aml_compute_target, provisioning_config)
    aml_compute.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)

## Build and Publish Pipeline
Build a simple pipeline, publish it and add a schedule to run it.

### Define a pipeline step
Define a single step pipeline for demonstration purpose. The best practice is to use separate folders for scripts and its dependent files for each step and specify that folder as the `source_directory` for the step. This helps reduce the size of the snapshot created for the step (only the specific folder is snapshotted). Since changes in any files in the `source_directory` would trigger a re-upload of the snapshot, this helps keep the reuse of the step when there are no changes in the `source_directory` of the step.

In [None]:
from azureml.pipeline.steps import PythonScriptStep

source_directory = "publish_run_train"

trainStep = PythonScriptStep(
    name="Training_Step",
    script_name="train.py", 
    compute_target=aml_compute_target, 
    source_directory=source_directory
)
print("TrainStep created")

### Build the pipeline

In [None]:
from azureml.pipeline.core import Pipeline

pipeline1 = Pipeline(workspace=ws, steps=[trainStep])
print ("Pipeline is built")

### Publish the pipeline

In [None]:
from datetime import datetime

timenow = datetime.now().strftime('%m-%d-%Y-%H-%M')

pipeline_name = timenow + "-Pipeline"
print(pipeline_name)

published_pipeline1 = pipeline1.publish(
    name=pipeline_name, 
    description=pipeline_name)
print("Newly published pipeline id: {}".format(published_pipeline1.id))

### Create a Pipeline Endpoint
Alternatively, you can create a schedule to run a pipeline endpoint instead of a published pipeline. You will need this to create a schedule against a pipeline endpoint in the last section of this notebook. 

In [None]:
from azureml.pipeline.core import PipelineEndpoint

pipeline_endpoint = PipelineEndpoint.publish(workspace=ws, name="ScheduledPipelineEndpoint",
                                            pipeline=pipeline1, description="Publish pipeline endpoint for schedule test")
pipeline_endpoint

## Schedule Operations
Schedule operations require id of a published pipeline. You can get all published pipelines and do Schedule operations on them, or if you already know the id of the published pipeline, you can use it directly as well.
### Get published pipeline ID

In [None]:
from azureml.pipeline.core import PublishedPipeline

# You could retrieve all pipelines that are published, or 
# just get the published pipeline object that you have the ID for.

# Get all published pipeline objects in the workspace
all_pub_pipelines = PublishedPipeline.list(ws)

# We will iterate through the list of published pipelines and 
# use the last ID in the list for Schelue operations: 
print("Published pipelines found in the workspace:")
for pub_pipeline in all_pub_pipelines:
    print(pub_pipeline.id)
    pub_pipeline_id = pub_pipeline.id

print("Published pipeline id to be used for Schedule operations: {}".format(pub_pipeline_id))

### Create a schedule for the published pipeline using a recurrence
This schedule will run on a specified recurrence interval.

In [None]:
from azureml.pipeline.core.schedule import ScheduleRecurrence, Schedule

recurrence = ScheduleRecurrence(frequency="Day", interval=2, hours=[22], minutes=[30]) # Runs every other day at 10:30pm

schedule = Schedule.create(workspace=ws, name="My_Schedule",
                           pipeline_id=pub_pipeline_id, 
                           experiment_name='Schedule-run-sample',
                           recurrence=recurrence,
                           wait_for_provisioning=True,
                           description="Schedule Run")

# You may want to make sure that the schedule is provisioned properly
# before making any further changes to the schedule

print("Created schedule with id: {}".format(schedule.id))

Note: Set the `wait_for_provisioning` flag to False if you do not want to wait for the call to provision the schedule in the backend.

### Get all schedules for a given pipeline
Once you have the published pipeline ID, then you can get all schedules for that pipeline.

In [None]:
schedules = Schedule.list(ws, pipeline_id=pub_pipeline_id)

# We will iterate through the list of schedules and 
# use the last recurrence schedule in the list for further operations: 
print("Found these schedules for the pipeline id {}:".format(pub_pipeline_id))
for schedule in schedules: 
    print(schedule.id)
    if schedule.recurrence is not None:
        schedule_id = schedule.id

print("Schedule id to be used for schedule operations: {}".format(schedule_id))

### Get all schedules in your workspace
You can also iterate through all schedules in your workspace if needed.

In [None]:
# Use active_only=False to get all schedules including disabled schedules
schedules = Schedule.list(ws, active_only=True) 
print("Your workspace has the following schedules set up:")
for schedule in schedules:
    print("{} (Published pipeline: {}".format(schedule.id, schedule.pipeline_id))

### Get the schedule

In [None]:
fetched_schedule = Schedule.get(ws, schedule_id)
print("Using schedule with id: {}".format(fetched_schedule.id))

### Disable the schedule
It is important to note the best practice of disabling schedules when not in use.
The number of schedule triggers allowed per month per region per subscription is 100,000.
This is calculated using the project trigger counts for all active schedules.

In [None]:
# Set the wait_for_provisioning flag to False if you do not want to wait  
# for the call to provision the schedule in the backend.
fetched_schedule.disable(wait_for_provisioning=True)
fetched_schedule = Schedule.get(ws, schedule_id)
print("Disabled schedule {}. New status is: {}".format(fetched_schedule.id, fetched_schedule.status))

### Reenable the schedule

In [None]:
# Set the wait_for_provisioning flag to False if you do not want to wait  
# for the call to provision the schedule in the backend.
fetched_schedule.enable(wait_for_provisioning=True)
fetched_schedule = Schedule.get(ws, schedule_id)
print("Enabled schedule {}. New status is: {}".format(fetched_schedule.id, fetched_schedule.status))

### Change recurrence of the schedule

In [None]:
# Set the wait_for_provisioning flag to False if you do not want to wait  
# for the call to provision the schedule in the backend.
recurrence = ScheduleRecurrence(frequency="Hour", interval=2) # Runs every two hours

fetched_schedule = Schedule.get(ws, schedule_id)

fetched_schedule.update(name="My_Updated_Schedule", 
                        description="Updated_Schedule_Run", 
                        status='Active', 
                        wait_for_provisioning=True,
                        recurrence=recurrence)

fetched_schedule = Schedule.get(ws, fetched_schedule.id)

print("Updated schedule:", fetched_schedule.id, 
      "\nNew name:", fetched_schedule.name,
      "\nNew frequency:", fetched_schedule.recurrence.frequency,
      "\nNew status:", fetched_schedule.status)

### Create a schedule for the pipeline using a Datastore
This schedule will run when additions or modifications are made to Blobs in the Datastore.
By default, the Datastore container is monitored for changes. Use the path_on_datastore parameter to instead specify a path on the Datastore to monitor for changes. Note: the path_on_datastore will be under the container for the datastore, so the actual path monitored will be container/path_on_datastore. Changes made to subfolders in the container/path will not trigger the schedule.
Note: Only Blob Datastores are supported.
Note: Not supported for CMK workspaces. Please review these [instructions](https://docs.microsoft.com/azure/machine-learning/how-to-trigger-published-pipeline) in order to setup a blob trigger submission schedule with CMK enabled. Also see those instructions to bring your own LogicApp to avoid the schedule triggers per month limit.

In [None]:
from azureml.core.datastore import Datastore

datastore = Datastore(workspace=ws, name="workspaceblobstore")

schedule = Schedule.create(workspace=ws, name="My_Schedule",
                           pipeline_id=pub_pipeline_id, 
                           experiment_name='Schedule-run-sample',
                           datastore=datastore,
                           wait_for_provisioning=True,
                           description="Schedule Run")
                          #polling_interval=5, use polling_interval to specify how often to poll for blob additions/modifications. Default value is 5 minutes.
                          #path_on_datastore="file/path") use path_on_datastore to specify a specific folder to monitor for changes.

# You may want to make sure that the schedule is provisioned properly
# before making any further changes to the schedule

print("Created schedule with id: {}".format(schedule.id))

In [None]:
# Set the wait_for_provisioning flag to False if you do not want to wait  
# for the call to provision the schedule in the backend.
schedule.disable(wait_for_provisioning=True)
schedule = Schedule.get(ws, schedule_id)
print("Disabled schedule {}. New status is: {}".format(schedule.id, schedule.status))

### Create a schedule for a pipeline endpoint
Alternative to creating schedules for a published pipeline, you can also create schedules to run pipeline endpoints.
Retrieve the pipeline endpoint id to create a schedule. 

In [None]:
pipeline_endpoint_by_name = PipelineEndpoint.get(workspace=ws, name="ScheduledPipelineEndpoint")
published_pipeline_endpoint_id = pipeline_endpoint_by_name.id

recurrence = ScheduleRecurrence(frequency="Day", interval=2, hours=[22], minutes=[30]) # Runs every other day at 10:30pm

schedule = Schedule.create_for_pipeline_endpoint(workspace=ws, name="My_Endpoint_Schedule",
                                                 pipeline_endpoint_id=published_pipeline_endpoint_id,
                                                 experiment_name='Schedule-run-sample',
                                                 recurrence=recurrence, description="Schedule_Run",
                                                 wait_for_provisioning=True)

# You may want to make sure that the schedule is provisioned properly
# before making any further changes to the schedule

print("Created schedule with id: {}".format(schedule.id))

### Get all schedules for a given pipeline endpoint
Once you have the pipeline endpoint ID, then you can get all schedules for that pipeline endopint.

In [None]:
schedules_for_pipeline_endpoints = Schedule.\
    get_schedules_for_pipeline_endpoint_id(ws,
                                           pipeline_endpoint_id=published_pipeline_endpoint_id)
print('Got all schedules for pipeline endpoint:', published_pipeline_endpoint_id, 'Count:',
      len(schedules_for_pipeline_endpoints))

print('done')

### Disable the schedule created for running the pipeline endpont
Recall the best practice of disabling schedules when not in use.
The number of schedule triggers allowed per month per region per subscription is 100,000.
This is calculated using the project trigger counts for all active schedules.

In [None]:
fetched_schedule = Schedule.get(ws, schedule_id)
print("Using schedule with id: {}".format(fetched_schedule.id))

# Set the wait_for_provisioning flag to False if you do not want to wait  
# for the call to provision the schedule in the backend.
fetched_schedule.disable(wait_for_provisioning=True)
fetched_schedule = Schedule.get(ws, schedule_id)
print("Disabled schedule {}. New status is: {}".format(fetched_schedule.id, fetched_schedule.status))