{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "Copyright (c) Microsoft Corporation. All rights reserved. \n", "Licensed under the MIT License." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/machine-learning-pipelines/intro-to-pipelines/aml-pipelines-setup-versioned-pipeline-endpoints.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "# How to Setup a PipelineEndpoint and Submit a Pipeline Using the PipelineEndpoint.\n", "In this notebook, we will see how to setup a PipelineEndpoint and run specific pipeline version.\n", "\n", "PipelineEndpoint can be used to update a published pipeline while maintaining same endpoint.\n", "PipelineEndpoint, provides a way to keep track of [PublishedPipelines](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.publishedpipeline) using versions. PipelineEndpoint uses endpoint with version information to trigger underlying published pipeline. Pipeline endpoints are uniquely named within a workspace. \n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Prerequisites and AML Basics\n", "If you are using an Azure Machine Learning Notebook VM, you are all set. Otherwise, make sure you go through the [configuration Notebook](https://github.com/Azure/MachineLearningNotebooks) first if you haven't. This sets you up with a working config file that has information on your workspace, subscription id, etc.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from azureml.core import Workspace\n", "\n", "ws = Workspace.from_config()\n", "print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep = '\\n')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Notebook Overview\n", "In this notebook, we provide an introduction to Azure machine learning PipelineEndpoints. It covers:\n", "* [Create PipelineEndpoint](#Create-PipelineEndpoint), How to create PipelineEndpoint.\n", "* [Retrieving PipelineEndpoint](#Retrieving-PipelineEndpoint), How to get specific PipelineEndpoint from worskpace by name/Id and get all [PipelineEndpoints](#Get-all-PipelineEndpoints-in-workspace) within workspace.\n", "* [PipelineEndpoint Properties](#PipelineEndpoint-properties). How to get and set PipelineEndpoint properties, such as default version of PipelineEndpoint.\n", "* [PipelineEndpoint Submission](#PipelineEndpoint-Submission). How to run a Pipeline using PipelineEndpoint." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create PipelineEndpoint\n", "Following are required input parameters to create PipelineEndpoint:\n", "\n", "* *workspace*: AML workspace.\n", "* *name*: name of PipelineEndpoint, it is unique within workspace.\n", "* *description*: description details for PipelineEndpoint.\n", "* *pipeline*: A [Pipeline](#Steps-to-create-simple-Pipeline) or [PublishedPipeline](#Publish-Pipeline), to set default version of PipelineEndpoint. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Initialization, Steps to create a Pipeline" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from azureml.core import Run, Experiment, Datastore\n", "from azureml.core.compute import AmlCompute, ComputeTarget\n", "from azureml.pipeline.steps import PythonScriptStep\n", "from azureml.pipeline.core import Pipeline\n", "\n", "#Retrieve an already attached Azure Machine Learning Compute\n", "aml_compute_target = \"cpu-cluster\"\n", "try:\n", " aml_compute = AmlCompute(ws, aml_compute_target)\n", " print(\"Found existing compute target: {}\".format(aml_compute_target))\n", "except:\n", " print(\"Creating new compute target: {}\".format(aml_compute_target))\n", " \n", " provisioning_config = AmlCompute.provisioning_configuration(vm_size = \"STANDARD_D2_V2\",\n", " min_nodes = 1, \n", " max_nodes = 4) \n", " aml_compute = ComputeTarget.create(ws, aml_compute_target, provisioning_config)\n", " aml_compute.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)\n", "\n", "# source_directory\n", "source_directory = '.'\n", "# define a single step pipeline for demonstration purpose.\n", "trainStep = PythonScriptStep(\n", " name=\"Training_Step\",\n", " script_name=\"train.py\", \n", " compute_target=aml_compute_target, \n", " source_directory=source_directory\n", ")\n", "print(\"TrainStep created\")\n", "# build and validate Pipeline\n", "pipeline = Pipeline(workspace=ws, steps=[trainStep])\n", "print(\"Pipeline is built\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Publish Pipeline" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from datetime import datetime\n", "\n", "timenow = datetime.now().strftime('%m-%d-%Y-%H-%M')\n", "\n", "pipeline_name = timenow + \"-Pipeline\"\n", "print(pipeline_name)\n", "\n", "published_pipeline = pipeline.publish(\n", " name=pipeline_name, \n", " description=pipeline_name)\n", "print(\"Newly published pipeline id: {}\".format(published_pipeline.id))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Publishing PipelineEndpoint\n", "Create PipelineEndpoint with required parameters: workspace, name, description and pipeline" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from azureml.pipeline.core import PipelineEndpoint\n", "\n", "pipeline_endpoint = PipelineEndpoint.publish(workspace=ws, name=\"PipelineEndpointTest\",\n", " pipeline=pipeline, description=\"Test description Notebook\")\n", "pipeline_endpoint" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Retrieving PipelineEndpoint\n", "\n", "PipelineEndpoint is uniquely defined by name and id within workspace. PipelineEndpoint in workspace can be retrived by Id or by name." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Get PipelineEndpoint by Name\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pipeline_endpoint_by_name = PipelineEndpoint.get(workspace=ws, name=\"PipelineEndpointTest\")\n", "pipeline_endpoint_by_name" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Get PipelineEndpoint by Id\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#get the PipelineEndpoint Id\n", "pipeline_endpoint_by_name = PipelineEndpoint.get(workspace=ws, name=\"PipelineEndpointTest\")\n", "endpoint_id = pipeline_endpoint_by_name.id\n", "\n", "pipeline_endpoint_by_id = PipelineEndpoint.get(workspace=ws, id=endpoint_id)\n", "pipeline_endpoint_by_id" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Get all PipelineEndpoints in workspace\n", "Returns all PipelineEndpoints within workspace" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "endpoint_list = PipelineEndpoint.get_all(workspace=ws, active_only=True)\n", "endpoint_list" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### PipelineEndpoint properties" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Default Version of PipelineEndpoint\n", "Default version of PipelineEndpoint starts from \"0\" and increments on addition of pipelines.\n", "\n", "##### Get the Default Version" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "default_version = pipeline_endpoint_by_name.get_default_version()\n", "default_version" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "##### Set default version \n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pipeline_endpoint_by_name.set_default_version(\"0\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Get the Published Pipeline corresponds to specific version of PipelineEndpoint" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pipeline = pipeline_endpoint_by_name.get_pipeline(\"0\")\n", "pipeline" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Get default version Published Pipeline" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pipeline = pipeline_endpoint_by_name.get_pipeline()\n", "pipeline" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Set Published Pipeline to default version" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Set Published Pipeline to PipelineEndpoint, if exists\n", "pipeline_endpoint_by_name.set_default(published_pipeline)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Get all Versions in PipelineEndpoint\n", "Returns list of published pipelines and its versions" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "versions = pipeline_endpoint_by_name.get_all_versions()\n", "\n", "for ve in versions:\n", " print(ve.version)\n", " print(ve.pipeline.id)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Get all Published Pipelines in PipelineEndpoint\n", "Returns all active pipelines in PipelineEnpoint, if active_only flag is set to True." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pipelines = pipeline_endpoint_by_name.get_all_pipelines(active_only=True)\n", "pipelines" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Name property of PipelineEndpoint\n", "PipelineEndpoint is uniquely identified by name" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "##### Set Name PipelineEndpoint" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pipeline_endpoint_by_name.set_name(name=\"NewName\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Add Published Pipeline to PipelineEndpoint, \n", "Adding published pipeline, if its not present in PipelineEndpoint." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pipeline_endpoint_by_name.add(published_pipeline)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Add Published pipeline to PipelineEndpoint and set it to default version\n", "Adding published pipeline to PipelineEndpoint if not present and set it to default" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pipeline_endpoint_by_name.add_default(published_pipeline)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### PipelineEndpoint Submission\n", "PipelineEndpoint triggers specific versioned pipeline or default pipeline by:\n", "* Rest Endpoint \n", "* Submit call." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Run Pipeline by endpoint property of PipelineEndpoint\n", "Run specific pipeline using endpoint property of PipelineEndpoint and executing http post." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pipeline_endpoint_by_name = PipelineEndpoint.get(workspace=ws, name=\"PipelineEndpointTest\")\n", "\n", "# endpoint with id \n", "rest_endpoint_id = pipeline_endpoint_by_name.endpoint\n", "\n", "# for default version pipeline\n", "rest_endpoint_id_without_version_with_id = rest_endpoint_id\n", "\n", "# for specific version pipeline just append version info\n", "version=\"0\"\n", "rest_endpoint_id_with_version = rest_endpoint_id_without_version_with_id+\"/\"+ version\n", "print(rest_endpoint_id_with_version)\n", "pipeline_endpoint_by_name" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# endpoint with name\n", "rest_endpoint_name = rest_endpoint_id.split(\"Id\", 1)[0] + \"Name?name=\" + pipeline_endpoint_by_name.name\n", "\n", "# for default version pipeline\n", "rest_endpoint_name_without_version = rest_endpoint_name\n", "\n", "# for specific version pipeline just append version info\n", "version=\"0\"\n", "rest_endpoint_name_with_version = rest_endpoint_name_without_version+\"&pipelineVersion=\"+ version\n", "print(rest_endpoint_name_with_version)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "[This notebook](https://aka.ms/pl-restep-auth) shows how to authenticate to AML workspace." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from azureml.core.authentication import InteractiveLoginAuthentication\n", "import requests\n", "\n", "auth = InteractiveLoginAuthentication()\n", "aad_token = auth.get_authentication_header()\n", "\n", "#endpoint = pipeline_endpoint_by_name.url\n", "\n", "print(\"You can perform HTTP POST on URL {} to trigger this pipeline\".format(rest_endpoint_name_with_version))\n", "\n", "# specify the param when running the pipeline\n", "response = requests.post(rest_endpoint_name_with_version, \n", " headers=aad_token, \n", " json={\"ExperimentName\": \"default_pipeline\",\n", " \"RunSource\": \"SDK\",\n", " \"ParameterAssignments\": {\"1\": \"united\", \"2\":\"city\"}})\n", "\n", "run_id = response.json()[\"Id\"]\n", "print(run_id)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Run Pipeline by Submit call of PipelineEndpoint \n", "Run specific pipeline using Submit api of PipelineEndpoint" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# submit pipeline with specific version\n", "run_id = pipeline_endpoint_by_name.submit(\"TestPipelineEndpoint\", pipeline_version=\"0\")\n", "print(run_id)\n", "\n", "# submit pipeline with default version\n", "run_id = pipeline_endpoint_by_name.submit(\"TestPipelineEndpoint\")\n", "print(run_id)" ] } ], "metadata": { "authors": [ { "name": "mameghwa" } ], "kernelspec": { "display_name": "Python 3.6", "language": "python", "name": "python36" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.2" } }, "nbformat": 4, "nbformat_minor": 2 }