mirror of
https://github.com/Azure/MachineLearningNotebooks.git
synced 2025-12-19 17:17:04 -05:00
418 lines
16 KiB
Plaintext
418 lines
16 KiB
Plaintext
{
|
|
"cells": [
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"Copyright (c) Microsoft Corporation. All rights reserved. \n",
|
|
"Licensed under the MIT License."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"# Azure Machine Learning Pipelines with Data Dependency\n",
|
|
"In this notebook, we will see how we can build a pipeline with implicit data dependancy."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"## Prerequisites and Azure Machine Learning Basics\n",
|
|
"Make sure you go through the configuration Notebook located at https://github.com/Azure/MachineLearningNotebooks first if you haven't. This sets you up with a working config file that has information on your workspace, subscription id, etc. \n",
|
|
"\n",
|
|
"### Azure Machine Learning and Pipeline SDK-specific Imports"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"import azureml.core\n",
|
|
"from azureml.core import Workspace, Run, Experiment, Datastore\n",
|
|
"from azureml.core.compute import AmlCompute\n",
|
|
"from azureml.core.compute import ComputeTarget\n",
|
|
"from azureml.core.compute import DataFactoryCompute\n",
|
|
"from azureml.widgets import RunDetails\n",
|
|
"\n",
|
|
"# Check core SDK version number\n",
|
|
"print(\"SDK version:\", azureml.core.VERSION)\n",
|
|
"\n",
|
|
"from azureml.data.data_reference import DataReference\n",
|
|
"from azureml.pipeline.core import Pipeline, PipelineData, StepSequence\n",
|
|
"from azureml.pipeline.steps import PythonScriptStep\n",
|
|
"from azureml.pipeline.steps import DataTransferStep\n",
|
|
"from azureml.pipeline.core import PublishedPipeline\n",
|
|
"from azureml.pipeline.core.graph import PipelineParameter\n",
|
|
"\n",
|
|
"print(\"Pipeline SDK-specific imports completed\")"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"### Initialize Workspace\n",
|
|
"\n",
|
|
"Initialize a [workspace](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.workspace(class%29) object from persisted configuration."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {
|
|
"tags": [
|
|
"create workspace"
|
|
]
|
|
},
|
|
"outputs": [],
|
|
"source": [
|
|
"ws = Workspace.from_config()\n",
|
|
"print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep = '\\n')\n",
|
|
"\n",
|
|
"# Default datastore (Azure file storage)\n",
|
|
"def_file_store = ws.get_default_datastore() \n",
|
|
"print(\"Default datastore's name: {}\".format(def_file_store.name))\n",
|
|
"\n",
|
|
"def_blob_store = Datastore(ws, \"workspaceblobstore\")\n",
|
|
"print(\"Blobstore's name: {}\".format(def_blob_store.name))"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# project folder\n",
|
|
"project_folder = '.'\n",
|
|
" \n",
|
|
"print('Sample projects will be created in {}.'.format(project_folder))"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"### Required data and script files for the the tutorial\n",
|
|
"Sample files required to finish this tutorial are already copied to the project folder specified above. Even though the .py provided in the samples don't have much \"ML work,\" as a data scientist, you will work on this extensively as part of your work. To complete this tutorial, the contents of these files are not very important. The one-line files are for demostration purpose only."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"### Compute Targets\n",
|
|
"See the list of Compute Targets on the workspace."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"cts = ws.compute_targets\n",
|
|
"for ct in cts:\n",
|
|
" print(ct)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"#### Retrieve or create a Aml compute\n",
|
|
"Azure Machine Learning Compute is a service for provisioning and managing clusters of Azure virtual machines for running machine learning workloads. Let's create a new Aml Compute in the current workspace, if it doesn't already exist. We will then run the training script on this compute target."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"\n",
|
|
"aml_compute_target = \"aml-compute\"\n",
|
|
"try:\n",
|
|
" aml_compute = AmlCompute(ws, aml_compute_target)\n",
|
|
" print(\"found existing compute target.\")\n",
|
|
"except:\n",
|
|
" print(\"creating new compute target\")\n",
|
|
" \n",
|
|
" provisioning_config = AmlCompute.provisioning_configuration(vm_size = \"STANDARD_D2_V2\",\n",
|
|
" min_nodes = 1, \n",
|
|
" max_nodes = 4) \n",
|
|
" aml_compute = ComputeTarget.create(ws, aml_compute_target, provisioning_config)\n",
|
|
" aml_compute.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)\n",
|
|
" \n",
|
|
"print(\"Aml Compute attached\")\n"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# For a more detailed view of current Azure Machine Learning Compute status, use get_status()\n",
|
|
"# example: un-comment the following line.\n",
|
|
"# print(aml_compute.get_status().serialize())"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"**Wait for this call to finish before proceeding (you will see the asterisk turning to a number).**\n",
|
|
"\n",
|
|
"Now that you have created the compute target, let's see what the workspace's compute_targets() function returns. You should now see one entry named 'amlcompute' of type AmlCompute."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"## Building Pipeline Steps with Inputs and Outputs\n",
|
|
"As mentioned earlier, a step in the pipeline can take data as input. This data can be a data source that lives in one of the accessible data locations, or intermediate data produced by a previous step in the pipeline.\n",
|
|
"\n",
|
|
"### Datasources\n",
|
|
"Datasource is represented by **[DataReference](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.data_reference.datareference?view=azure-ml-py)** object and points to data that lives in or is accessible from Datastore. DataReference could be a pointer to a file or a directory."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# Reference the data uploaded to blob storage using DataReference\n",
|
|
"# Assign the datasource to blob_input_data variable\n",
|
|
"\n",
|
|
"# DataReference(datastore, \n",
|
|
"# data_reference_name=None, \n",
|
|
"# path_on_datastore=None, \n",
|
|
"# mode='mount', \n",
|
|
"# path_on_compute=None, \n",
|
|
"# overwrite=False)\n",
|
|
"\n",
|
|
"blob_input_data = DataReference(\n",
|
|
" datastore=def_blob_store,\n",
|
|
" data_reference_name=\"test_data\",\n",
|
|
" path_on_datastore=\"20newsgroups/20news.pkl\")\n",
|
|
"print(\"DataReference object created\")"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"### Intermediate/Output Data\n",
|
|
"Intermediate data (or output of a Step) is represented by **[PipelineData](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipelinedata?view=azure-ml-py)** object. PipelineData can be produced by one step and consumed in another step by providing the PipelineData object as an output of one step and the input of one or more steps.\n",
|
|
"\n",
|
|
"#### Constructing PipelineData\n",
|
|
"- **name:** [*Required*] Name of the data item within the pipeline graph\n",
|
|
"- **datastore_name:** Name of the Datastore to write this output to\n",
|
|
"- **output_name:** Name of the output\n",
|
|
"- **output_mode:** Specifies \"upload\" or \"mount\" modes for producing output (default: mount)\n",
|
|
"- **output_path_on_compute:** For \"upload\" mode, the path to which the module writes this output during execution\n",
|
|
"- **output_overwrite:** Flag to overwrite pre-existing data"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# Define intermediate data using PipelineData\n",
|
|
"# Syntax\n",
|
|
"\n",
|
|
"# PipelineData(name, \n",
|
|
"# datastore=None, \n",
|
|
"# output_name=None, \n",
|
|
"# output_mode='mount', \n",
|
|
"# output_path_on_compute=None, \n",
|
|
"# output_overwrite=None, \n",
|
|
"# data_type=None, \n",
|
|
"# is_directory=None)\n",
|
|
"\n",
|
|
"# Naming the intermediate data as processed_data1 and assigning it to the variable processed_data1.\n",
|
|
"processed_data1 = PipelineData(\"processed_data1\",datastore=def_blob_store)\n",
|
|
"print(\"PipelineData object created\")"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"### Pipelines steps using datasources and intermediate data\n",
|
|
"Machine learning pipelines can have many steps and these steps could use or reuse datasources and intermediate data. Here's how we construct such a pipeline:"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"#### Define a Step that consumes a datasource and produces intermediate data.\n",
|
|
"In this step, we define a step that consumes a datasource and produces intermediate data.\n",
|
|
"\n",
|
|
"**Open `train.py` in the local machine and examine the arguments, inputs, and outputs for the script. That will give you a good sense of why the script argument names used below are important.** "
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# step4 consumes the datasource (Datareference) in the previous step\n",
|
|
"# and produces processed_data1\n",
|
|
"trainStep = PythonScriptStep(\n",
|
|
" script_name=\"train.py\", \n",
|
|
" arguments=[\"--input_data\", blob_input_data, \"--output_train\", processed_data1],\n",
|
|
" inputs=[blob_input_data],\n",
|
|
" outputs=[processed_data1],\n",
|
|
" compute_target=aml_compute, \n",
|
|
" source_directory=project_folder\n",
|
|
")\n",
|
|
"print(\"trainStep created\")"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"#### Define a Step that consumes intermediate data and produces intermediate data\n",
|
|
"In this step, we define a step that consumes an intermediate data and produces intermediate data.\n",
|
|
"\n",
|
|
"**Open `extract.py` in the local machine and examine the arguments, inputs, and outputs for the script. That will give you a good sense of why the script argument names used below are important.** "
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# step5 to use the intermediate data produced by step4\n",
|
|
"# This step also produces an output processed_data2\n",
|
|
"processed_data2 = PipelineData(\"processed_data2\", datastore=def_blob_store)\n",
|
|
"\n",
|
|
"extractStep = PythonScriptStep(\n",
|
|
" script_name=\"extract.py\",\n",
|
|
" arguments=[\"--input_extract\", processed_data1, \"--output_extract\", processed_data2],\n",
|
|
" inputs=[processed_data1],\n",
|
|
" outputs=[processed_data2],\n",
|
|
" compute_target=aml_compute, \n",
|
|
" source_directory=project_folder)\n",
|
|
"print(\"extractStep created\")"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"#### Define a Step that consumes multiple intermediate data and produces intermediate data\n",
|
|
"In this step, we define a step that consumes multiple intermediate data and produces intermediate data.\n",
|
|
"\n",
|
|
"**Open `compare.py` in the local machine and examine the arguments, inputs, and outputs for the script. That will give you a good sense of why the script argument names used below are important.**"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# Now define step6 that takes two inputs (both intermediate data), and produce an output\n",
|
|
"processed_data3 = PipelineData(\"processed_data3\", datastore=def_blob_store)\n",
|
|
"\n",
|
|
"compareStep = PythonScriptStep(\n",
|
|
" script_name=\"compare.py\",\n",
|
|
" arguments=[\"--compare_data1\", processed_data1, \"--compare_data2\", processed_data2, \"--output_compare\", processed_data3],\n",
|
|
" inputs=[processed_data1, processed_data2],\n",
|
|
" outputs=[processed_data3], \n",
|
|
" compute_target=aml_compute, \n",
|
|
" source_directory=project_folder)\n",
|
|
"print(\"compareStep created\")"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"#### Build the pipeline"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"pipeline1 = Pipeline(workspace=ws, steps=[compareStep])\n",
|
|
"print (\"Pipeline is built\")\n",
|
|
"\n",
|
|
"pipeline1.validate()\n",
|
|
"print(\"Simple validation complete\") "
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"pipeline_run1 = Experiment(ws, 'Data_dependency').submit(pipeline1)\n",
|
|
"print(\"Pipeline is submitted for execution\")"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"RunDetails(pipeline_run1).show()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"# Next: Publishing the Pipeline and calling it from the REST endpoint\n",
|
|
"See this [notebook](./aml-pipelines-publish-and-run-using-rest-endpoint.ipynb) to understand how the pipeline is published and you can call the REST endpoint to run the pipeline."
|
|
]
|
|
}
|
|
],
|
|
"metadata": {
|
|
"authors": [
|
|
{
|
|
"name": "diray"
|
|
}
|
|
],
|
|
"kernelspec": {
|
|
"display_name": "Python 3.6",
|
|
"language": "python",
|
|
"name": "python36"
|
|
},
|
|
"language_info": {
|
|
"codemirror_mode": {
|
|
"name": "ipython",
|
|
"version": 3
|
|
},
|
|
"file_extension": ".py",
|
|
"mimetype": "text/x-python",
|
|
"name": "python",
|
|
"nbconvert_exporter": "python",
|
|
"pygments_lexer": "ipython3",
|
|
"version": "3.6.7"
|
|
}
|
|
},
|
|
"nbformat": 4,
|
|
"nbformat_minor": 2
|
|
} |