{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "Copyright (c) Microsoft Corporation. All rights reserved. \n", "Licensed under the MIT License." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# How to Publish a Pipeline and Invoke the REST endpoint\n", "In this notebook, we will see how we can publish a pipeline and then invoke the REST endpoint." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Prerequisites and Azure Machine Learning Basics\n", "Make sure you go through the configuration Notebook located at https://github.com/Azure/MachineLearningNotebooks first if you haven't. This sets you up with a working config file that has information on your workspace, subscription id, etc. \n", "\n", "### Initialization Steps" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import azureml.core\n", "from azureml.core import Workspace, Run, Experiment, Datastore\n", "from azureml.core.compute import AmlCompute\n", "from azureml.core.compute import ComputeTarget\n", "from azureml.core.compute import DataFactoryCompute\n", "from azureml.widgets import RunDetails\n", "\n", "# Check core SDK version number\n", "print(\"SDK version:\", azureml.core.VERSION)\n", "\n", "from azureml.data.data_reference import DataReference\n", "from azureml.pipeline.core import Pipeline, PipelineData, StepSequence\n", "from azureml.pipeline.steps import PythonScriptStep\n", "from azureml.pipeline.steps import DataTransferStep\n", "from azureml.pipeline.core import PublishedPipeline\n", "from azureml.pipeline.core.graph import PipelineParameter\n", "\n", "print(\"Pipeline SDK-specific imports completed\")\n", "\n", "ws = Workspace.from_config()\n", "print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep = '\\n')\n", "\n", "# Default datastore (Azure file storage)\n", "def_file_store = ws.get_default_datastore() \n", "print(\"Default datastore's name: {}\".format(def_file_store.name))\n", "\n", "def_blob_store = Datastore(ws, \"workspaceblobstore\")\n", "print(\"Blobstore's name: {}\".format(def_blob_store.name))\n", "\n", "# project folder\n", "project_folder = '.'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Compute Targets\n", "#### Retrieve an already attached Azure Machine Learning Compute" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\n", "aml_compute_target = \"aml-compute\"\n", "try:\n", " aml_compute = AmlCompute(ws, aml_compute_target)\n", " print(\"found existing compute target.\")\n", "except:\n", " print(\"creating new compute target\")\n", " \n", " provisioning_config = AmlCompute.provisioning_configuration(vm_size = \"STANDARD_D2_V2\",\n", " min_nodes = 1, \n", " max_nodes = 4) \n", " aml_compute = ComputeTarget.create(ws, aml_compute_target, provisioning_config)\n", " aml_compute.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)\n", " \n", "print(aml_compute.status.serialize())\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Building Pipeline Steps with Inputs and Outputs\n", "As mentioned earlier, a step in the pipeline can take data as input. This data can be a data source that lives in one of the accessible data locations, or intermediate data produced by a previous step in the pipeline." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Reference the data uploaded to blob storage using DataReference\n", "# Assign the datasource to blob_input_data variable\n", "blob_input_data = DataReference(\n", " datastore=def_blob_store,\n", " data_reference_name=\"test_data\",\n", " path_on_datastore=\"20newsgroups/20news.pkl\")\n", "print(\"DataReference object created\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Define intermediate data using PipelineData\n", "processed_data1 = PipelineData(\"processed_data1\",datastore=def_blob_store)\n", "print(\"PipelineData object created\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Define a Step that consumes a datasource and produces intermediate data.\n", "In this step, we define a step that consumes a datasource and produces intermediate data.\n", "\n", "**Open `train.py` in the local machine and examine the arguments, inputs, and outputs for the script. That will give you a good sense of why the script argument names used below are important.** " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# trainStep consumes the datasource (Datareference) in the previous step\n", "# and produces processed_data1\n", "trainStep = PythonScriptStep(\n", " script_name=\"train.py\", \n", " arguments=[\"--input_data\", blob_input_data, \"--output_train\", processed_data1],\n", " inputs=[blob_input_data],\n", " outputs=[processed_data1],\n", " compute_target=aml_compute, \n", " source_directory=project_folder\n", ")\n", "print(\"trainStep created\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Define a Step that consumes intermediate data and produces intermediate data\n", "In this step, we define a step that consumes an intermediate data and produces intermediate data.\n", "\n", "**Open `extract.py` in the local machine and examine the arguments, inputs, and outputs for the script. That will give you a good sense of why the script argument names used below are important.** " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# extractStep to use the intermediate data produced by step4\n", "# This step also produces an output processed_data2\n", "processed_data2 = PipelineData(\"processed_data2\", datastore=def_blob_store)\n", "\n", "extractStep = PythonScriptStep(\n", " script_name=\"extract.py\",\n", " arguments=[\"--input_extract\", processed_data1, \"--output_extract\", processed_data2],\n", " inputs=[processed_data1],\n", " outputs=[processed_data2],\n", " compute_target=aml_compute, \n", " source_directory=project_folder)\n", "print(\"extractStep created\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Define a Step that consumes multiple intermediate data and produces intermediate data\n", "In this step, we define a step that consumes multiple intermediate data and produces intermediate data." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### PipelineParameter" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This step also has a [PipelineParameter](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.graph.pipelineparameter?view=azure-ml-py) argument that help with calling the REST endpoint of the published pipeline." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# We will use this later in publishing pipeline\n", "pipeline_param = PipelineParameter(name=\"pipeline_arg\", default_value=10)\n", "print(\"pipeline parameter created\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Open `compare.py` in the local machine and examine the arguments, inputs, and outputs for the script. That will give you a good sense of why the script argument names used below are important.**" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Now define step6 that takes two inputs (both intermediate data), and produce an output\n", "processed_data3 = PipelineData(\"processed_data3\", datastore=def_blob_store)\n", "\n", "\n", "\n", "compareStep = PythonScriptStep(\n", " script_name=\"compare.py\",\n", " arguments=[\"--compare_data1\", processed_data1, \"--compare_data2\", processed_data2, \"--output_compare\", processed_data3, \"--pipeline_param\", pipeline_param],\n", " inputs=[processed_data1, processed_data2],\n", " outputs=[processed_data3], \n", " compute_target=aml_compute, \n", " source_directory=project_folder)\n", "print(\"compareStep created\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Build the pipeline" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pipeline1 = Pipeline(workspace=ws, steps=[compareStep])\n", "print (\"Pipeline is built\")\n", "\n", "pipeline1.validate()\n", "print(\"Simple validation complete\") " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Publish the pipeline" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "published_pipeline1 = pipeline1.publish(name=\"My_New_Pipeline\", description=\"My Published Pipeline Description\")\n", "print(published_pipeline1.id)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Run published pipeline using its REST endpoint" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from azureml.core.authentication import AzureCliAuthentication\n", "import requests\n", "\n", "cli_auth = AzureCliAuthentication()\n", "aad_token = cli_auth.get_authentication_header()\n", "\n", "rest_endpoint1 = published_pipeline1.endpoint\n", "\n", "print(rest_endpoint1)\n", "\n", "# specify the param when running the pipeline\n", "response = requests.post(rest_endpoint1, \n", " headers=aad_token, \n", " json={\"ExperimentName\": \"My_Pipeline1\",\n", " \"RunSource\": \"SDK\",\n", " \"ParameterAssignments\": {\"pipeline_arg\": 45}})\n", "run_id = response.json()[\"Id\"]\n", "\n", "print(run_id)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Next: Data Transfer\n", "The next [notebook](./aml-pipelines-data-transfer.ipynb) will showcase data transfer steps between different types of data stores." ] } ], "metadata": { "authors": [ { "name": "diray" } ], "kernelspec": { "display_name": "Python 3.6", "language": "python", "name": "python36" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.7" } }, "nbformat": 4, "nbformat_minor": 2 }