mirror of
https://github.com/Azure/MachineLearningNotebooks.git
synced 2025-12-19 17:17:04 -05:00
410 lines
14 KiB
Plaintext
410 lines
14 KiB
Plaintext
{
|
|
"cells": [
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"Copyright (c) Microsoft Corporation. All rights reserved. \n",
|
|
"Licensed under the MIT License."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
""
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"# How to Publish a Pipeline and Invoke the REST endpoint\n",
|
|
"In this notebook, we will see how we can publish a pipeline and then invoke the REST endpoint."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"## Prerequisites and Azure Machine Learning Basics\n",
|
|
"If you are using an Azure Machine Learning Notebook VM, you are all set. Otherwise, make sure you go through the configuration Notebook located at https://github.com/Azure/MachineLearningNotebooks first if you haven't. This sets you up with a working config file that has information on your workspace, subscription id, etc. \n",
|
|
"\n",
|
|
"### Initialization Steps"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"import azureml.core\n",
|
|
"from azureml.core import Workspace, Datastore, Experiment\n",
|
|
"from azureml.core.compute import AmlCompute\n",
|
|
"from azureml.core.compute import ComputeTarget\n",
|
|
"\n",
|
|
"# Check core SDK version number\n",
|
|
"print(\"SDK version:\", azureml.core.VERSION)\n",
|
|
"\n",
|
|
"from azureml.data.data_reference import DataReference\n",
|
|
"from azureml.pipeline.core import Pipeline, PipelineData\n",
|
|
"from azureml.pipeline.steps import PythonScriptStep\n",
|
|
"from azureml.pipeline.core.graph import PipelineParameter\n",
|
|
"\n",
|
|
"print(\"Pipeline SDK-specific imports completed\")\n",
|
|
"\n",
|
|
"ws = Workspace.from_config()\n",
|
|
"print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep = '\\n')\n",
|
|
"\n",
|
|
"# Default datastore (Azure file storage)\n",
|
|
"def_file_store = ws.get_default_datastore() \n",
|
|
"print(\"Default datastore's name: {}\".format(def_file_store.name))\n",
|
|
"\n",
|
|
"def_blob_store = Datastore(ws, \"workspaceblobstore\")\n",
|
|
"print(\"Blobstore's name: {}\".format(def_blob_store.name))"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"### Compute Targets\n",
|
|
"#### Retrieve an already attached Azure Machine Learning Compute"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"aml_compute = ws.get_default_compute_target(\"CPU\")"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# For a more detailed view of current Azure Machine Learning Compute status, use get_status()\n",
|
|
"# example: un-comment the following line.\n",
|
|
"# print(aml_compute.get_status().serialize())"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"## Building Pipeline Steps with Inputs and Outputs\n",
|
|
"As mentioned earlier, a step in the pipeline can take data as input. This data can be a data source that lives in one of the accessible data locations, or intermediate data produced by a previous step in the pipeline."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# Reference the data uploaded to blob storage using DataReference\n",
|
|
"# Assign the datasource to blob_input_data variable\n",
|
|
"blob_input_data = DataReference(\n",
|
|
" datastore=def_blob_store,\n",
|
|
" data_reference_name=\"test_data\",\n",
|
|
" path_on_datastore=\"20newsgroups/20news.pkl\")\n",
|
|
"print(\"DataReference object created\")"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# Define intermediate data using PipelineData\n",
|
|
"processed_data1 = PipelineData(\"processed_data1\",datastore=def_blob_store)\n",
|
|
"print(\"PipelineData object created\")"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"#### Define a Step that consumes a datasource and produces intermediate data.\n",
|
|
"In this step, we define a step that consumes a datasource and produces intermediate data.\n",
|
|
"\n",
|
|
"**Open `train.py` in the local machine and examine the arguments, inputs, and outputs for the script. That will give you a good sense of why the script argument names used below are important.** "
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# trainStep consumes the datasource (Datareference) in the previous step\n",
|
|
"# and produces processed_data1\n",
|
|
"trainStep = PythonScriptStep(\n",
|
|
" script_name=\"train.py\", \n",
|
|
" arguments=[\"--input_data\", blob_input_data, \"--output_train\", processed_data1],\n",
|
|
" inputs=[blob_input_data],\n",
|
|
" outputs=[processed_data1],\n",
|
|
" compute_target=aml_compute, \n",
|
|
" source_directory='.'\n",
|
|
")\n",
|
|
"print(\"trainStep created\")"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"#### Define a Step that consumes intermediate data and produces intermediate data\n",
|
|
"In this step, we define a step that consumes an intermediate data and produces intermediate data.\n",
|
|
"\n",
|
|
"**Open `extract.py` in the local machine and examine the arguments, inputs, and outputs for the script. That will give you a good sense of why the script argument names used below are important.** "
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# extractStep to use the intermediate data produced by step4\n",
|
|
"# This step also produces an output processed_data2\n",
|
|
"processed_data2 = PipelineData(\"processed_data2\", datastore=def_blob_store)\n",
|
|
"\n",
|
|
"extractStep = PythonScriptStep(\n",
|
|
" script_name=\"extract.py\",\n",
|
|
" arguments=[\"--input_extract\", processed_data1, \"--output_extract\", processed_data2],\n",
|
|
" inputs=[processed_data1],\n",
|
|
" outputs=[processed_data2],\n",
|
|
" compute_target=aml_compute, \n",
|
|
" source_directory='.')\n",
|
|
"print(\"extractStep created\")"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"#### Define a Step that consumes multiple intermediate data and produces intermediate data\n",
|
|
"In this step, we define a step that consumes multiple intermediate data and produces intermediate data."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"### PipelineParameter"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"This step also has a [PipelineParameter](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.graph.pipelineparameter?view=azure-ml-py) argument that help with calling the REST endpoint of the published pipeline."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# We will use this later in publishing pipeline\n",
|
|
"pipeline_param = PipelineParameter(name=\"pipeline_arg\", default_value=10)\n",
|
|
"print(\"pipeline parameter created\")"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"**Open `compare.py` in the local machine and examine the arguments, inputs, and outputs for the script. That will give you a good sense of why the script argument names used below are important.**"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# Now define step6 that takes two inputs (both intermediate data), and produce an output\n",
|
|
"processed_data3 = PipelineData(\"processed_data3\", datastore=def_blob_store)\n",
|
|
"\n",
|
|
"\n",
|
|
"\n",
|
|
"compareStep = PythonScriptStep(\n",
|
|
" script_name=\"compare.py\",\n",
|
|
" arguments=[\"--compare_data1\", processed_data1, \"--compare_data2\", processed_data2, \"--output_compare\", processed_data3, \"--pipeline_param\", pipeline_param],\n",
|
|
" inputs=[processed_data1, processed_data2],\n",
|
|
" outputs=[processed_data3], \n",
|
|
" compute_target=aml_compute, \n",
|
|
" source_directory='.')\n",
|
|
"print(\"compareStep created\")"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"#### Build the pipeline"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"pipeline1 = Pipeline(workspace=ws, steps=[compareStep])\n",
|
|
"print (\"Pipeline is built\")"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"## Run published pipeline\n",
|
|
"### Publish the pipeline"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"published_pipeline1 = pipeline1.publish(name=\"My_New_Pipeline\", description=\"My Published Pipeline Description\", continue_on_step_failure=True)\n",
|
|
"published_pipeline1"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"Note: the continue_on_step_failure parameter specifies whether the execution of steps in the Pipeline will continue if one step fails. The default value is False, meaning when one step fails, the Pipeline execution will stop, canceling any running steps."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"### Publish the pipeline from a submitted PipelineRun\n",
|
|
"It is also possible to publish a pipeline from a submitted PipelineRun"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# submit a pipeline run\n",
|
|
"pipeline_run1 = Experiment(ws, 'Pipeline_experiment').submit(pipeline1)\n",
|
|
"# publish a pipeline from the submitted pipeline run\n",
|
|
"published_pipeline2 = pipeline_run1.publish_pipeline(name=\"My_New_Pipeline2\", description=\"My Published Pipeline Description\", version=\"0.1\", continue_on_step_failure=True)\n",
|
|
"published_pipeline2"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"### Get published pipeline\n",
|
|
"\n",
|
|
"You can get the published pipeline using **pipeline id**.\n",
|
|
"\n",
|
|
"To get all the published pipelines for a given workspace(ws): \n",
|
|
"```css\n",
|
|
"all_pub_pipelines = PublishedPipeline.get_all(ws)\n",
|
|
"```"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"from azureml.pipeline.core import PublishedPipeline\n",
|
|
"\n",
|
|
"pipeline_id = published_pipeline1.id # use your published pipeline id\n",
|
|
"published_pipeline = PublishedPipeline.get(ws, pipeline_id)\n",
|
|
"published_pipeline"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"### Run published pipeline using its REST endpoint\n",
|
|
"[This notebook](https://aka.ms/pl-restep-auth) shows how to authenticate to AML workspace."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"from azureml.core.authentication import InteractiveLoginAuthentication\n",
|
|
"import requests\n",
|
|
"\n",
|
|
"auth = InteractiveLoginAuthentication()\n",
|
|
"aad_token = auth.get_authentication_header()\n",
|
|
"\n",
|
|
"rest_endpoint1 = published_pipeline.endpoint\n",
|
|
"\n",
|
|
"print(\"You can perform HTTP POST on URL {} to trigger this pipeline\".format(rest_endpoint1))\n",
|
|
"\n",
|
|
"# specify the param when running the pipeline\n",
|
|
"response = requests.post(rest_endpoint1, \n",
|
|
" headers=aad_token, \n",
|
|
" json={\"ExperimentName\": \"My_Pipeline1\",\n",
|
|
" \"RunSource\": \"SDK\",\n",
|
|
" \"ParameterAssignments\": {\"pipeline_arg\": 45}})\n",
|
|
"run_id = response.json()[\"Id\"]\n",
|
|
"\n",
|
|
"print(run_id)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"# Next: Data Transfer\n",
|
|
"The next [notebook](./aml-pipelines-data-transfer.ipynb) will showcase data transfer steps between different types of data stores."
|
|
]
|
|
}
|
|
],
|
|
"metadata": {
|
|
"authors": [
|
|
{
|
|
"name": "diray"
|
|
}
|
|
],
|
|
"kernelspec": {
|
|
"display_name": "Python 3.6",
|
|
"language": "python",
|
|
"name": "python36"
|
|
},
|
|
"language_info": {
|
|
"codemirror_mode": {
|
|
"name": "ipython",
|
|
"version": 3
|
|
},
|
|
"file_extension": ".py",
|
|
"mimetype": "text/x-python",
|
|
"name": "python",
|
|
"nbconvert_exporter": "python",
|
|
"pygments_lexer": "ipython3",
|
|
"version": "3.6.7"
|
|
}
|
|
},
|
|
"nbformat": 4,
|
|
"nbformat_minor": 2
|
|
} |