mirror of
https://github.com/Azure/MachineLearningNotebooks.git
synced 2025-12-19 17:17:04 -05:00
528 lines
19 KiB
Plaintext
528 lines
19 KiB
Plaintext
{
|
|
"cells": [
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"Copyright (c) Microsoft Corporation. All rights reserved. \n",
|
|
"Licensed under the MIT License."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
""
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"# Showcasing DataPath and PipelineParameter\n",
|
|
"\n",
|
|
"This notebook demonstrateas the use of [**DataPath**](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.datapath.datapath?view=azure-ml-py) and [**PipelineParameters**](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipelineparameter?view=azure-ml-py) in AML Pipeline. You will learn how strings and [**DataPath**](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.datapath.datapath?view=azure-ml-py) can be parameterized and submitted to AML Pipelines via [**PipelineParameters**](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipelineparameter?view=azure-ml-py).\n",
|
|
"To see more about how parameters work between steps, please refer [aml-pipelines-with-data-dependency-steps](https://aka.ms/pl-data-dep).\n",
|
|
"\n",
|
|
"* [How to create a Pipeline with a DataPath PipelineParameter](#index1)\n",
|
|
"* [How to submit a Pipeline with a DataPath PipelineParameter](#index2)\n",
|
|
"* [How to submit a Pipeline and change the DataPath PipelineParameter value from the sdk](#index3)\n",
|
|
"* [How to submit a Pipeline and change the DataPath PipelineParameter value using a REST call](#index4)\n",
|
|
"* [How to create a datastore trigger schedule and use the data_path_parameter_name to get the path of the changed blob in the Pipeline](#index5)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"## Azure Machine Learning and Pipeline SDK-specific imports"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"import azureml.core\n",
|
|
"from azureml.core import Workspace, Experiment\n",
|
|
"from azureml.core.compute import ComputeTarget, AmlCompute\n",
|
|
"from azureml.data.datapath import DataPath, DataPathComputeBinding\n",
|
|
"from azureml.widgets import RunDetails\n",
|
|
"\n",
|
|
"from azureml.pipeline.core import PipelineParameter\n",
|
|
"from azureml.pipeline.core import Pipeline, PipelineRun\n",
|
|
"from azureml.pipeline.steps import PythonScriptStep\n",
|
|
"\n",
|
|
"# Check core SDK version number\n",
|
|
"print(\"SDK version:\", azureml.core.VERSION)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"## Initialize Workspace\n",
|
|
"\n",
|
|
"Initialize a workspace object from persisted configuration. If you are using an Azure Machine Learning Notebook VM, you are all set. Otherwise, make sure the config file is present at .\\config.json\n",
|
|
"\n",
|
|
"If you don't have a config.json file, go through the [configuration Notebook](https://aka.ms/pl-config) first.\n",
|
|
"\n",
|
|
"This sets you up with a working config file that has information on your workspace, subscription id, etc."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"ws = Workspace.from_config()\n",
|
|
"print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep = '\\n')"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"## Create an Azure ML experiment\n",
|
|
"\n",
|
|
"Let's create an experiment named \"automl-classification\" and a folder to hold the training scripts. The script runs will be recorded under the experiment in Azure."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# Choose a name for the run history container in the workspace.\n",
|
|
"experiment_name = 'showcasing-datapath'\n",
|
|
"source_directory = '.'\n",
|
|
"\n",
|
|
"experiment = Experiment(ws, experiment_name)\n",
|
|
"experiment"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"## Create or Attach an AmlCompute cluster\n",
|
|
"You will need to create a [compute target](https://docs.microsoft.com/azure/machine-learning/service/concept-azure-machine-learning-architecture#compute-target) for your AutoML run. In this tutorial, you get the default `AmlCompute` as your training compute resource."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# Choose a name for your cluster.\n",
|
|
"amlcompute_cluster_name = \"cpu-cluster\"\n",
|
|
"\n",
|
|
"found = False\n",
|
|
"# Check if this compute target already exists in the workspace.\n",
|
|
"cts = ws.compute_targets\n",
|
|
"if amlcompute_cluster_name in cts and cts[amlcompute_cluster_name].type == 'AmlCompute':\n",
|
|
" found = True\n",
|
|
" print('Found existing compute target.')\n",
|
|
" compute_target = cts[amlcompute_cluster_name]\n",
|
|
" \n",
|
|
"if not found:\n",
|
|
" print('Creating a new compute target...')\n",
|
|
" provisioning_config = AmlCompute.provisioning_configuration(vm_size = \"STANDARD_D2_V2\", # for GPU, use \"STANDARD_NC6\"\n",
|
|
" #vm_priority = 'lowpriority', # optional\n",
|
|
" max_nodes = 4)\n",
|
|
"\n",
|
|
" # Create the cluster.\n",
|
|
" compute_target = ComputeTarget.create(ws, amlcompute_cluster_name, provisioning_config)\n",
|
|
" \n",
|
|
" # Can poll for a minimum number of nodes and for a specific timeout.\n",
|
|
" # If no min_node_count is provided, it will use the scale settings for the cluster.\n",
|
|
" compute_target.wait_for_completion(show_output = True, timeout_in_minutes = 10)\n",
|
|
" \n",
|
|
" # For a more detailed view of current AmlCompute status, use get_status()."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"## Data and arguments setup \n",
|
|
"\n",
|
|
"We will setup a trining script to run and its arguments to be used. The sample training script below will print the two arguments to show what has been passed to pipeline."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"%%writefile train_with_datapath.py\n",
|
|
"import argparse\n",
|
|
"import os\n",
|
|
"\n",
|
|
"parser = argparse.ArgumentParser(\"train\")\n",
|
|
"parser.add_argument(\"--arg1\", type=str, help=\"sample string argument\")\n",
|
|
"parser.add_argument(\"--arg2\", type=str, help=\"sample datapath argument\")\n",
|
|
"args = parser.parse_args()\n",
|
|
"\n",
|
|
"print(\"Sample string argument : %s\" % args.arg1)\n",
|
|
"print(\"Sample datapath argument: %s\" % args.arg2)\n"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"Let's setup string and DataPath arguments using PipelineParameter. \n",
|
|
"\n",
|
|
"Note that Pipeline accepts a tuple of the form ([**PipelineParameters**](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipelineparameter?view=azure-ml-py) , [**DataPathComputeBinding**](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.datapath.datapathcomputebinding?view=azure-ml-py)) as an input. DataPath defines the location of input data. DataPathComputeBinding defines how the data is consumed during step execution. The DataPath can be modified at pipeline submission time with a DataPath parameter, while the compute binding does not change. For static data inputs, we use [**DataReference**](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.data_reference.datareference?view=azure-ml-py) which defines both the data location and compute binding."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {
|
|
"tags": [
|
|
"datapath-remarks-sample"
|
|
]
|
|
},
|
|
"outputs": [],
|
|
"source": [
|
|
"def_blob_store = ws.get_default_datastore()\n",
|
|
"print(\"Default datastore's name: {}\".format(def_blob_store.name))\n",
|
|
"\n",
|
|
"data_path = DataPath(datastore=def_blob_store, path_on_datastore='sample_datapath1')\n",
|
|
"datapath1_pipeline_param = PipelineParameter(name=\"input_datapath\", default_value=data_path)\n",
|
|
"datapath_input = (datapath1_pipeline_param, DataPathComputeBinding(mode='mount'))\n",
|
|
"\n",
|
|
"string_pipeline_param = PipelineParameter(name=\"input_string\", default_value='sample_string1')"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"<a id='index1'></a>"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"## Create a Pipeline with a DataPath PipelineParameter\n",
|
|
"\n",
|
|
"Note that the ```datapath_input``` is specified on both arguments and inputs to create a step."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"train_step = PythonScriptStep(\n",
|
|
" name='train_step',\n",
|
|
" script_name=\"train_with_datapath.py\",\n",
|
|
" arguments=[\"--arg1\", string_pipeline_param, \"--arg2\", datapath_input],\n",
|
|
" inputs=[datapath_input],\n",
|
|
" compute_target=compute_target, \n",
|
|
" source_directory=source_directory)\n",
|
|
"print(\"train_step created\")\n",
|
|
"\n",
|
|
"pipeline = Pipeline(workspace=ws, steps=[train_step])\n",
|
|
"print(\"pipeline with the train_step created\")"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"<a id='index2'></a>"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"## Submit a Pipeline with a DataPath PipelineParameter\n",
|
|
"\n",
|
|
"Pipelines can be submitted with default values of PipelineParameters by not specifying any parameters."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"pipeline_run = experiment.submit(pipeline)\n",
|
|
"print(\"Pipeline is submitted for execution\")"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"RunDetails(pipeline_run).show()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"pipeline_run.wait_for_completion()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"<a id='index3'></a>"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"## Submit a Pipeline and change the DataPath PipelineParameter value from the sdk\n",
|
|
"\n",
|
|
"Or Pipelines can be submitted with values other than default ones by using pipeline_parameters. "
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"pipeline_run_with_params = experiment.submit(pipeline, \\\n",
|
|
" pipeline_parameters={'input_datapath': DataPath(datastore=def_blob_store, path_on_datastore='sample_datapath2'),\n",
|
|
" 'input_string': 'sample_string2'}) "
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"RunDetails(pipeline_run_with_params).show()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"pipeline_run_with_params.wait_for_completion()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"<a id='index4'></a>"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"## Submit a Pipeline and change the DataPath PipelineParameter value using a REST call\n",
|
|
"\n",
|
|
"Let's published the pipeline to use the rest endpoint of the published pipeline."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"published_pipeline = pipeline.publish(name=\"DataPath_Pipeline\", description=\"Pipeline to test Datapath\", continue_on_step_failure=True)\n",
|
|
"published_pipeline"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"from azureml.core.authentication import InteractiveLoginAuthentication\n",
|
|
"import requests\n",
|
|
"\n",
|
|
"auth = InteractiveLoginAuthentication()\n",
|
|
"aad_token = auth.get_authentication_header()\n",
|
|
"\n",
|
|
"rest_endpoint = published_pipeline.endpoint\n",
|
|
"\n",
|
|
"print(\"You can perform HTTP POST on URL {} to trigger this pipeline\".format(rest_endpoint))"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# specify the param when running the pipeline\n",
|
|
"response = requests.post(rest_endpoint, \n",
|
|
" headers=aad_token, \n",
|
|
" json={\"ExperimentName\": \"MyRestPipeline\",\n",
|
|
" \"RunSource\": \"SDK\",\n",
|
|
" \"DataPathAssignments\": {\n",
|
|
" \"input_datapath\": { \n",
|
|
" \"DataStoreName\": def_blob_store.name,\n",
|
|
" \"RelativePath\": 'sample_datapath3'\n",
|
|
" }\n",
|
|
" },\n",
|
|
" \"ParameterAssignments\": {\"input_string\": \"sample_string3\"}\n",
|
|
" }\n",
|
|
" )"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"try:\n",
|
|
" response.raise_for_status()\n",
|
|
"except Exception: \n",
|
|
" raise Exception('Received bad response from the endpoint: {}\\n'\n",
|
|
" 'Response Code: {}\\n'\n",
|
|
" 'Headers: {}\\n'\n",
|
|
" 'Content: {}'.format(rest_endpoint, response.status_code, response.headers, response.content))\n",
|
|
"\n",
|
|
"run_id = response.json().get('Id')\n",
|
|
"print('Submitted pipeline run: ', run_id)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"published_pipeline_run_via_rest = PipelineRun(ws.experiments[\"MyRestPipeline\"], run_id)\n",
|
|
"RunDetails(published_pipeline_run_via_rest).show()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"published_pipeline_run_via_rest.wait_for_completion()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"<a id='index5'></a>"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"## Create a Datastore trigger schedule and use data path parameter\n",
|
|
"\n",
|
|
"When the Pipeline is scheduled with DataPath parameter, it will be triggered by the modified or added data in the DataPath. ```path_on_datastore``` should be a folder and the value of the DataPath will be replaced by the path of the modified data."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"from azureml.pipeline.core import Schedule\n",
|
|
"\n",
|
|
"schedule = Schedule.create(workspace=ws, \n",
|
|
" name=\"Datastore_trigger_schedule\",\n",
|
|
" pipeline_id=published_pipeline.id, \n",
|
|
" experiment_name='Scheduled_Pipeline',\n",
|
|
" datastore=def_blob_store,\n",
|
|
" wait_for_provisioning=True,\n",
|
|
" description=\"Datastore trigger schedule demo\",\n",
|
|
" path_on_datastore=\"sample_datapath_for_folder\",\n",
|
|
" data_path_parameter_name=\"input_datapath\") #Same name as used above to create PipelineParameter\n",
|
|
"\n",
|
|
"print(\"Created schedule with id: {}\".format(schedule.id))"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"schedule.disable()\n",
|
|
"schedule"
|
|
]
|
|
}
|
|
],
|
|
"metadata": {
|
|
"authors": [
|
|
{
|
|
"name": "sanpil"
|
|
}
|
|
],
|
|
"category": "tutorial",
|
|
"compute": [
|
|
"AML Compute"
|
|
],
|
|
"datasets": [
|
|
"Custom"
|
|
],
|
|
"deployment": [
|
|
"None"
|
|
],
|
|
"exclude_from_index": false,
|
|
"framework": [
|
|
"Azure ML"
|
|
],
|
|
"friendly_name": "How to use DataPath as a PipelineParameter",
|
|
"kernelspec": {
|
|
"display_name": "Python 3.6",
|
|
"language": "python",
|
|
"name": "python36"
|
|
},
|
|
"language_info": {
|
|
"codemirror_mode": {
|
|
"name": "ipython",
|
|
"version": 3
|
|
},
|
|
"file_extension": ".py",
|
|
"mimetype": "text/x-python",
|
|
"name": "python",
|
|
"nbconvert_exporter": "python",
|
|
"pygments_lexer": "ipython3",
|
|
"version": "3.6.7"
|
|
},
|
|
"order_index": 13,
|
|
"star_tag": [
|
|
"featured"
|
|
],
|
|
"tags": [
|
|
"None"
|
|
],
|
|
"task": "Demonstrates the use of DataPath as a PipelineParameter"
|
|
},
|
|
"nbformat": 4,
|
|
"nbformat_minor": 2
|
|
} |