{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "Copyright (c) Microsoft Corporation. All rights reserved. \n", "Licensed under the MIT License." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/machine-learning-pipelines/intro-to-pipelines/aml-pipelines-setup-versioned-pipeline-endpoints.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "# How to Setup a PipelineEndpoint and Submit a Pipeline Using the PipelineEndpoint.\n", "In this notebook, we will see how to setup a PipelineEndpoint and run a specific pipeline version.\n", "\n", "PipelineEndpoint can be used to update a published pipeline while maintaining the same endpoint.\n", "PipelineEndpoint provides a way to keep track of [PublishedPipelines](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.publishedpipeline) using versions. PipelineEndpoint uses endpoint with version information to trigger an underlying published pipeline. Pipeline endpoints are uniquely named within a workspace. \n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Prerequisites and AML Basics\n", "If you are using an Azure Machine Learning Notebook VM, you are all set. Otherwise, make sure you go through the [configuration Notebook](https://aka.ms/pl-config) first if you haven't. This sets you up with a working config file that has information on your workspace, subscription id, etc.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from azureml.core import Workspace\n", "\n", "ws = Workspace.from_config()\n", "print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep = '\\n')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Notebook Overview\n", "In this notebook, we provide an introduction to Azure machine learning PipelineEndpoints. It covers:\n", "* [Create PipelineEndpoint](#Create-PipelineEndpoint), How to create PipelineEndpoint.\n", "* [Retrieving PipelineEndpoint](#Retrieving-PipelineEndpoint), How to get specific PipelineEndpoint from worskpace by name/Id and get all [PipelineEndpoints](#Get-all-PipelineEndpoints-in-workspace) within workspace.\n", "* [PipelineEndpoint Properties](#PipelineEndpoint-properties). How to get and set PipelineEndpoint properties, such as default version of PipelineEndpoint.\n", "* [PipelineEndpoint Submission](#PipelineEndpoint-Submission). How to run a Pipeline using PipelineEndpoint." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create PipelineEndpoint\n", "Following are required input parameters to create PipelineEndpoint:\n", "\n", "* *workspace*: AML workspace.\n", "* *name*: name of PipelineEndpoint, it is unique within workspace.\n", "* *description*: description details for PipelineEndpoint.\n", "* *pipeline*: A [Pipeline](#Steps-to-create-simple-Pipeline) or [PublishedPipeline](#Publish-Pipeline), to set default version of PipelineEndpoint. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Initialization, Steps to create a Pipeline\n", "\n", "The best practice is to use separate folders for scripts and its dependent files for each step and specify that folder as the `source_directory` for the step. This helps reduce the size of the snapshot created for the step (only the specific folder is snapshotted). Since changes in any files in the `source_directory` would trigger a re-upload of the snapshot, this helps keep the reuse of the step when there are no changes in the `source_directory` of the step." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from azureml.core.compute import AmlCompute, ComputeTarget\n", "from azureml.pipeline.steps import PythonScriptStep\n", "from azureml.pipeline.core import Pipeline\n", "\n", "#Retrieve an already attached Azure Machine Learning Compute\n", "from azureml.core.compute_target import ComputeTargetException\n", "aml_compute_target = \"cpu-cluster\"\n", "try:\n", " aml_compute = AmlCompute(ws, aml_compute_target)\n", " print(\"Found existing compute target: {}\".format(aml_compute_target))\n", "except ComputeTargetException:\n", " print(\"Creating new compute target: {}\".format(aml_compute_target))\n", " \n", " provisioning_config = AmlCompute.provisioning_configuration(vm_size = \"STANDARD_D2_V2\",\n", " min_nodes = 1, \n", " max_nodes = 4) \n", " aml_compute = ComputeTarget.create(ws, aml_compute_target, provisioning_config)\n", " aml_compute.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)\n", "\n", "# source_directory\n", "source_directory = 'publish_run_train'\n", "# define a single step pipeline for demonstration purpose.\n", "trainStep = PythonScriptStep(\n", " name=\"Training_Step\",\n", " script_name=\"train.py\", \n", " compute_target=aml_compute_target, \n", " source_directory=source_directory\n", ")\n", "print(\"TrainStep created\")\n", "# build and validate Pipeline\n", "pipeline = Pipeline(workspace=ws, steps=[trainStep])\n", "print(\"Pipeline is built\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Publish Pipeline" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from datetime import datetime\n", "\n", "timenow = datetime.now().strftime('%m-%d-%Y-%H-%M')\n", "\n", "pipeline_name = timenow + \"-Pipeline\"\n", "print(pipeline_name)\n", "\n", "published_pipeline = pipeline.publish(\n", " name=pipeline_name, \n", " description=pipeline_name)\n", "print(\"Newly published pipeline id: {}\".format(published_pipeline.id))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Publishing PipelineEndpoint\n", "Create PipelineEndpoint with required parameters: workspace, name, description and pipeline" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from azureml.pipeline.core import PipelineEndpoint\n", "\n", "pipeline_endpoint = PipelineEndpoint.publish(workspace=ws, name=\"PipelineEndpointTest\",\n", " pipeline=pipeline, description=\"Test description Notebook\")\n", "pipeline_endpoint" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Retrieving PipelineEndpoint\n", "\n", "PipelineEndpoint is uniquely defined by name and id within workspace. PipelineEndpoint in workspace can be retrived by Id or by name." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Get PipelineEndpoint by Name\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pipeline_endpoint_by_name = PipelineEndpoint.get(workspace=ws, name=\"PipelineEndpointTest\")\n", "pipeline_endpoint_by_name" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Get PipelineEndpoint by Id\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#get the PipelineEndpoint Id\n", "pipeline_endpoint_by_name = PipelineEndpoint.get(workspace=ws, name=\"PipelineEndpointTest\")\n", "endpoint_id = pipeline_endpoint_by_name.id\n", "\n", "pipeline_endpoint_by_id = PipelineEndpoint.get(workspace=ws, id=endpoint_id)\n", "pipeline_endpoint_by_id" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Get all PipelineEndpoints in workspace\n", "Returns all PipelineEndpoints within workspace" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "endpoint_list = PipelineEndpoint.list(workspace=ws, active_only=True)\n", "endpoint_list" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### PipelineEndpoint properties" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Default Version of PipelineEndpoint\n", "Default version of PipelineEndpoint starts from \"0\" and increments on addition of pipelines.\n", "\n", "##### Get the Default Version" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "default_version = pipeline_endpoint_by_name.get_default_version()\n", "default_version" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "##### Set default version \n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pipeline_endpoint_by_name.set_default_version(\"0\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Get the Published Pipeline corresponds to specific version of PipelineEndpoint" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pipeline = pipeline_endpoint_by_name.get_pipeline(\"0\")\n", "pipeline" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Get default version Published Pipeline" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pipeline = pipeline_endpoint_by_name.get_pipeline()\n", "pipeline" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Add Published Pipeline to PipelineEndpoint, \n", "Adds a published pipeline (if its not present) using add() and if you want to add and set to default use add_default()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pipeline_endpoint_by_name.add(published_pipeline)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Add Published pipeline to PipelineEndpoint and set it to default version\n", "Adding published pipeline to PipelineEndpoint if not present and set it to default" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Set Published Pipeline to PipelineEndpoint, if exists\n", "pipeline_endpoint_by_name.set_default(published_pipeline)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Get all Versions in PipelineEndpoint\n", "Returns list of published pipelines and its versions" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "versions = pipeline_endpoint_by_name.list_versions()\n", "\n", "for ve in versions:\n", " print(ve.version)\n", " print(ve.pipeline.id)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Get all Published Pipelines in PipelineEndpoint\n", "Returns all active pipelines in PipelineEnpoint, if active_only flag is set to True." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pipelines = pipeline_endpoint_by_name.list_pipelines(active_only=True)\n", "pipelines" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Name property of PipelineEndpoint\n", "PipelineEndpoint is uniquely identified by name" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "##### Set Name PipelineEndpoint" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pipeline_endpoint_by_name.set_name(name=\"NewName\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### PipelineEndpoint Submission\n", "PipelineEndpoint triggers specific versioned pipeline or default pipeline by:\n", "* Rest Endpoint \n", "* Submit call." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Run Pipeline by endpoint property of PipelineEndpoint\n", "Run specific pipeline using endpoint property of PipelineEndpoint and executing http post." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pipeline_endpoint_by_name = PipelineEndpoint.get(workspace=ws, name=\"NewName\")\n", "\n", "# endpoint with id \n", "rest_endpoint_id = pipeline_endpoint_by_name.endpoint\n", "\n", "# for default version pipeline\n", "rest_endpoint_id_without_version_with_id = rest_endpoint_id\n", "\n", "# for specific version pipeline just append version info\n", "version=\"0\"\n", "rest_endpoint_id_with_version = rest_endpoint_id_without_version_with_id+\"/\"+ version\n", "print(rest_endpoint_id_with_version)\n", "pipeline_endpoint_by_name" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# endpoint with name\n", "rest_endpoint_name = rest_endpoint_id.split(\"Id\", 1)[0] + \"Name?name=\" + pipeline_endpoint_by_name.name\n", "\n", "# for default version pipeline\n", "rest_endpoint_name_without_version = rest_endpoint_name\n", "\n", "# for specific version pipeline just append version info\n", "version=\"0\"\n", "rest_endpoint_name_with_version = rest_endpoint_name_without_version+\"&pipelineVersion=\"+ version\n", "print(rest_endpoint_name_with_version)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "[This notebook](https://aka.ms/pl-restep-auth) shows how to authenticate to AML workspace." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from azureml.core.authentication import InteractiveLoginAuthentication\n", "import requests\n", "\n", "auth = InteractiveLoginAuthentication()\n", "aad_token = auth.get_authentication_header()\n", "\n", "#endpoint = pipeline_endpoint_by_name.url\n", "\n", "print(\"You can perform HTTP POST on URL {} to trigger this pipeline\".format(rest_endpoint_name_with_version))\n", "\n", "# specify the param when running the pipeline\n", "response = requests.post(rest_endpoint_name_with_version, \n", " headers=aad_token, \n", " json={\"ExperimentName\": \"default_pipeline\",\n", " \"RunSource\": \"SDK\",\n", " \"ParameterAssignments\": {\"1\": \"united\", \"2\":\"city\"}})" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "try:\n", " response.raise_for_status()\n", "except Exception: \n", " raise Exception('Received bad response from the endpoint: {}\\n'\n", " 'Response Code: {}\\n'\n", " 'Headers: {}\\n'\n", " 'Content: {}'.format(rest_endpoint, response.status_code, response.headers, response.content))\n", "\n", "run_id = response.json().get('Id')\n", "print('Submitted pipeline run: ', run_id)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Run Pipeline by Submit call of PipelineEndpoint \n", "Run specific pipeline using Submit api of PipelineEndpoint" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# submit pipeline with specific version\n", "run_id = pipeline_endpoint_by_name.submit(\"NewName\", pipeline_version=\"0\")\n", "print(run_id)\n", "\n", "# submit pipeline with default version\n", "run_id = pipeline_endpoint_by_name.submit(\"NewName\")\n", "print(run_id)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Use Experiment.Submit() to Submit Pipeline\n", "Run specific pipeline using Experiment submit api" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from azureml.core import Experiment\n", "pipeline_run = Experiment(ws, name=\"submit_endpoint_sample\").submit(pipeline_endpoint_by_name, tags={'endpoint_tag': \"1\"}, pipeline_version=\"0\")" ] } ], "metadata": { "authors": [ { "name": "sanpil" } ], "category": "tutorial", "compute": [ "AML Compute" ], "datasets": [ "Custom" ], "deployment": [ "None" ], "exclude_from_index": false, "framework": [ "Azure ML" ], "friendly_name": "How to setup a versioned Pipeline Endpoint", "kernelspec": { "display_name": "Python 3.6", "language": "python", "name": "python36" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.7" }, "order_index": 12, "tags": [ "None" ], "task": "Demonstrates the use of PipelineEndpoint to run a specific version of the Published Pipeline" }, "nbformat": 4, "nbformat_minor": 2 }