diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 00000000..9e26dfee --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1 @@ +{} \ No newline at end of file diff --git a/how-to-use-azureml/machine-learning-pipelines/intro-to-pipelines/aml-pipelines-use-databricks-as-compute-target.ipynb b/how-to-use-azureml/machine-learning-pipelines/intro-to-pipelines/aml-pipelines-use-databricks-as-compute-target.ipynb index 7455053c..729da267 100644 --- a/how-to-use-azureml/machine-learning-pipelines/intro-to-pipelines/aml-pipelines-use-databricks-as-compute-target.ipynb +++ b/how-to-use-azureml/machine-learning-pipelines/intro-to-pipelines/aml-pipelines-use-databricks-as-compute-target.ipynb @@ -2,132 +2,132 @@ "cells": [ { "cell_type": "markdown", - "metadata": {}, "source": [ "Copyright (c) Microsoft Corporation. All rights reserved. \n", "Licensed under the MIT License." - ] + ], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/machine-learning-pipelines/intro-to-pipelines/aml-pipelines-use-databricks-as-compute-target.png)" - ] + ], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ - "# Using Databricks as a Compute Target from Azure Machine Learning Pipeline\n", - "To use Databricks as a compute target from [Azure Machine Learning Pipeline](https://aka.ms/pl-concept), a [DatabricksStep](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.databricks_step.databricksstep?view=azure-ml-py) is used. This notebook demonstrates the use of DatabricksStep in Azure Machine Learning Pipeline.\n", - "\n", - "The notebook will show:\n", - "1. Running an arbitrary Databricks notebook that the customer has in Databricks workspace\n", - "2. Running an arbitrary Python script that the customer has in DBFS\n", - "3. Running an arbitrary Python script that is available on local computer (will upload to DBFS, and then run in Databricks) \n", - "4. Running a JAR job that the customer has in DBFS.\n", - "\n", - "## Before you begin:\n", - "\n", - "1. **Create an Azure Databricks workspace** in the same subscription where you have your Azure Machine Learning workspace. You will need details of this workspace later on to define DatabricksStep. [Click here](https://ms.portal.azure.com/#blade/HubsExtension/Resources/resourceType/Microsoft.Databricks%2Fworkspaces) for more information.\n", - "2. **Create PAT (access token)**: Manually create a Databricks access token at the Azure Databricks portal. See [this](https://docs.databricks.com/api/latest/authentication.html#generate-a-token) for more information.\n", - "3. **Add demo notebook to ADB**: This notebook has a sample you can use as is. Launch Azure Databricks attached to your Azure Machine Learning workspace and add a new notebook. \n", + "# Using Databricks as a Compute Target from Azure Machine Learning Pipeline\r\n", + "To use Databricks as a compute target from [Azure Machine Learning Pipeline](https://aka.ms/pl-concept), a [DatabricksStep](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.databricks_step.databricksstep?view=azure-ml-py) is used. This notebook demonstrates the use of DatabricksStep in Azure Machine Learning Pipeline.\r\n", + "\r\n", + "The notebook will show:\r\n", + "1. Running an arbitrary Databricks notebook that the customer has in Databricks workspace\r\n", + "2. Running an arbitrary Python script that the customer has in DBFS\r\n", + "3. Running an arbitrary Python script that is available on local computer (will upload to DBFS, and then run in Databricks) \r\n", + "4. Running a JAR job that the customer has in DBFS.\r\n", + "5. How to get run context in a Databricks interactive cluster\r\n", + "\r\n", + "## Before you begin:\r\n", + "\r\n", + "1. **Create an Azure Databricks workspace** in the same subscription where you have your Azure Machine Learning workspace. You will need details of this workspace later on to define DatabricksStep. [Click here](https://ms.portal.azure.com/#blade/HubsExtension/Resources/resourceType/Microsoft.Databricks%2Fworkspaces) for more information.\r\n", + "2. **Create PAT (access token)**: Manually create a Databricks access token at the Azure Databricks portal. See [this](https://docs.databricks.com/api/latest/authentication.html#generate-a-token) for more information.\r\n", + "3. **Add demo notebook to ADB**: This notebook has a sample you can use as is. Launch Azure Databricks attached to your Azure Machine Learning workspace and add a new notebook. \r\n", "4. **Create/attach a Blob storage** for use from ADB" - ] + ], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "## Add demo notebook to ADB Workspace\n", "Copy and paste the below code to create a new notebook in your ADB workspace." - ] + ], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ - "```python\n", - "# direct access\n", - "dbutils.widgets.get(\"myparam\")\n", - "p = getArgument(\"myparam\")\n", - "print (\"Param -\\'myparam':\")\n", - "print (p)\n", - "\n", - "dbutils.widgets.get(\"input\")\n", - "i = getArgument(\"input\")\n", - "print (\"Param -\\'input':\")\n", - "print (i)\n", - "\n", - "dbutils.widgets.get(\"output\")\n", - "o = getArgument(\"output\")\n", - "print (\"Param -\\'output':\")\n", - "print (o)\n", - "\n", - "n = i + \"/testdata.txt\"\n", - "df = spark.read.csv(n)\n", - "\n", - "display (df)\n", - "\n", - "data = [('value1', 'value2')]\n", - "df2 = spark.createDataFrame(data)\n", - "\n", - "z = o + \"/output.txt\"\n", - "df2.write.csv(z)\n", + "```python\r\n", + "# direct access\r\n", + "dbutils.widgets.get(\"myparam\")\r\n", + "p = getArgument(\"myparam\")\r\n", + "print (\"Param -\\'myparam':\")\r\n", + "print (p)\r\n", + "\r\n", + "dbutils.widgets.get(\"input\")\r\n", + "i = getArgument(\"input\")\r\n", + "print (\"Param -\\'input':\")\r\n", + "print (i)\r\n", + "\r\n", + "dbutils.widgets.get(\"output\")\r\n", + "o = getArgument(\"output\")\r\n", + "print (\"Param -\\'output':\")\r\n", + "print (o)\r\n", + "\r\n", + "n = i + \"/testdata.txt\"\r\n", + "df = spark.read.csv(n)\r\n", + "\r\n", + "display (df)\r\n", + "\r\n", + "data = [('value1', 'value2')]\r\n", + "df2 = spark.createDataFrame(data)\r\n", + "\r\n", + "z = o + \"/output.txt\"\r\n", + "df2.write.csv(z)\r\n", "```" - ] + ], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "## Azure Machine Learning and Pipeline SDK-specific imports" - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "import os\n", - "import azureml.core\n", - "from azureml.core.runconfig import JarLibrary\n", - "from azureml.core.compute import ComputeTarget, DatabricksCompute\n", - "from azureml.exceptions import ComputeTargetException\n", - "from azureml.core import Workspace, Experiment\n", - "from azureml.pipeline.core import Pipeline, PipelineData\n", - "from azureml.pipeline.steps import DatabricksStep\n", - "from azureml.core.datastore import Datastore\n", - "from azureml.data.data_reference import DataReference\n", - "\n", - "# Check core SDK version number\n", + "import os\r\n", + "import azureml.core\r\n", + "from azureml.core.runconfig import JarLibrary\r\n", + "from azureml.core.compute import ComputeTarget, DatabricksCompute\r\n", + "from azureml.exceptions import ComputeTargetException\r\n", + "from azureml.core import Workspace, Experiment\r\n", + "from azureml.pipeline.core import Pipeline, PipelineData\r\n", + "from azureml.pipeline.steps import DatabricksStep\r\n", + "from azureml.core.datastore import Datastore\r\n", + "from azureml.data.data_reference import DataReference\r\n", + "\r\n", + "# Check core SDK version number\r\n", "print(\"SDK version:\", azureml.core.VERSION)" - ] + ], + "outputs": [], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "## Initialize Workspace\n", "\n", "Initialize a workspace object from persisted configuration. If you are using an Azure Machine Learning Notebook VM, you are all set. Otherwise, make sure you go through the [configuration Notebook](https://aka.ms/pl-config) first if you haven't." - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "ws = Workspace.from_config()\n", + "ws = Workspace.from_config()\r\n", "print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep = '\\n')" - ] + ], + "outputs": [], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "## Attach Databricks compute target\n", "Next, you need to add your Databricks workspace to Azure Machine Learning as a compute target and give it a name. You will use this name to refer to your Databricks workspace compute target inside Azure Machine Learning.\n", @@ -137,46 +137,46 @@ "- **Databricks Access Token** - The access token you created in ADB\n", "\n", "**The Databricks workspace need to be present in the same subscription as your AML workspace**" - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": null, + "source": [ + "# Replace with your account info before running.\r\n", + " \r\n", + "db_compute_name=os.getenv(\"DATABRICKS_COMPUTE_NAME\", \"\") # Databricks compute name\r\n", + "db_resource_group=os.getenv(\"DATABRICKS_RESOURCE_GROUP\", \"\") # Databricks resource group\r\n", + "db_workspace_name=os.getenv(\"DATABRICKS_WORKSPACE_NAME\", \"\") # Databricks workspace name\r\n", + "db_access_token=os.getenv(\"DATABRICKS_ACCESS_TOKEN\", \"\") # Databricks access token\r\n", + " \r\n", + "try:\r\n", + " databricks_compute = DatabricksCompute(workspace=ws, name=db_compute_name)\r\n", + " print('Compute target {} already exists'.format(db_compute_name))\r\n", + "except ComputeTargetException:\r\n", + " print('Compute not found, will use below parameters to attach new one')\r\n", + " print('db_compute_name {}'.format(db_compute_name))\r\n", + " print('db_resource_group {}'.format(db_resource_group))\r\n", + " print('db_workspace_name {}'.format(db_workspace_name))\r\n", + " print('db_access_token {}'.format(db_access_token))\r\n", + " \r\n", + " config = DatabricksCompute.attach_configuration(\r\n", + " resource_group = db_resource_group,\r\n", + " workspace_name = db_workspace_name,\r\n", + " access_token= db_access_token)\r\n", + " databricks_compute=ComputeTarget.attach(ws, db_compute_name, config)\r\n", + " databricks_compute.wait_for_completion(True)\r\n" + ], + "outputs": [], "metadata": { "tags": [ "sample-databrickscompute-attach" ] - }, - "outputs": [], - "source": [ - "# Replace with your account info before running.\n", - " \n", - "db_compute_name=os.getenv(\"DATABRICKS_COMPUTE_NAME\", \"\") # Databricks compute name\n", - "db_resource_group=os.getenv(\"DATABRICKS_RESOURCE_GROUP\", \"\") # Databricks resource group\n", - "db_workspace_name=os.getenv(\"DATABRICKS_WORKSPACE_NAME\", \"\") # Databricks workspace name\n", - "db_access_token=os.getenv(\"DATABRICKS_ACCESS_TOKEN\", \"\") # Databricks access token\n", - " \n", - "try:\n", - " databricks_compute = DatabricksCompute(workspace=ws, name=db_compute_name)\n", - " print('Compute target {} already exists'.format(db_compute_name))\n", - "except ComputeTargetException:\n", - " print('Compute not found, will use below parameters to attach new one')\n", - " print('db_compute_name {}'.format(db_compute_name))\n", - " print('db_resource_group {}'.format(db_resource_group))\n", - " print('db_workspace_name {}'.format(db_workspace_name))\n", - " print('db_access_token {}'.format(db_access_token))\n", - " \n", - " config = DatabricksCompute.attach_configuration(\n", - " resource_group = db_resource_group,\n", - " workspace_name = db_workspace_name,\n", - " access_token= db_access_token)\n", - " databricks_compute=ComputeTarget.attach(ws, db_compute_name, config)\n", - " databricks_compute.wait_for_completion(True)\n" - ] + } }, { "cell_type": "markdown", - "metadata": {}, "source": [ "## Data Connections with Inputs and Outputs\n", "The DatabricksStep supports DBFS, Azure Blob and ADLS for inputs and outputs. You also will need to define a [Secrets](https://docs.azuredatabricks.net/user-guide/secrets/index.html) scope to enable authentication to external data sources such as Blob and ADLS from Databricks.\n", @@ -188,38 +188,38 @@ "Databricks allows to interact with Azure Blob and ADLS in two ways.\n", "- **Direct Access**: Databricks allows you to interact with Azure Blob or ADLS URIs directly. The input or output URIs will be mapped to a Databricks widget param in the Databricks notebook.\n", "- **Mounting**: You will be supplied with additional parameters and secrets that will enable you to mount your ADLS or Azure Blob input or output location in your Databricks notebook." - ] + ], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "#### Direct Access: Python sample code\n", "If you have a data reference named \"input\" it will represent the URI of the input and you can access it directly in the Databricks python notebook like so:" - ] + ], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "```python\n", "dbutils.widgets.get(\"input\")\n", "y = getArgument(\"input\")\n", "df = spark.read.csv(y)\n", "```" - ] + ], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "#### Mounting: Python sample code for Azure Blob\n", "Given an Azure Blob data reference named \"input\" the following widget params will be made available in the Databricks notebook:" - ] + ], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "```python\n", "# This contains the input URI\n", @@ -243,19 +243,19 @@ " mount_point = \"/mnt/input\",\n", " extra_configs = {myinput_blob_config:dbutils.secrets.get(scope = \"amlscope\", key = myinput_blob_secretname)})\n", "```" - ] + ], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "#### Mounting: Python sample code for ADLS\n", "Given an ADLS data reference named \"input\" the following widget params will be made available in the Databricks notebook:" - ] + ], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "```python\n", "# This contains the input URI\n", @@ -287,42 +287,42 @@ " mount_point = \"/mnt/output\",\n", " extra_configs = configs)\n", "```" - ] + ], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "## Use Databricks from Azure Machine Learning Pipeline\n", "To use Databricks as a compute target from Azure Machine Learning Pipeline, a DatabricksStep is used. Let's define a datasource (via DataReference), intermediate data (via PipelineData) and a pipeline parameter (via PipelineParameter) to be used in DatabricksStep." - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "from azureml.pipeline.core import PipelineParameter\n", - "\n", - "# Use the default blob storage\n", - "def_blob_store = Datastore(ws, \"workspaceblobstore\")\n", - "print('Datastore {} will be used'.format(def_blob_store.name))\n", - "\n", - "pipeline_param = PipelineParameter(name=\"my_pipeline_param\", default_value=\"pipeline_param1\")\n", - "\n", - "# We are uploading a sample file in the local directory to be used as a datasource\n", - "def_blob_store.upload_files(files=[\"./testdata.txt\"], target_path=\"dbtest\", overwrite=False)\n", - "\n", - "step_1_input = DataReference(datastore=def_blob_store, path_on_datastore=\"dbtest\",\n", - " data_reference_name=\"input\")\n", - "\n", + "from azureml.pipeline.core import PipelineParameter\r\n", + "\r\n", + "# Use the default blob storage\r\n", + "def_blob_store = Datastore(ws, \"workspaceblobstore\")\r\n", + "print('Datastore {} will be used'.format(def_blob_store.name))\r\n", + "\r\n", + "pipeline_param = PipelineParameter(name=\"my_pipeline_param\", default_value=\"pipeline_param1\")\r\n", + "\r\n", + "# We are uploading a sample file in the local directory to be used as a datasource\r\n", + "def_blob_store.upload_files(files=[\"./testdata.txt\"], target_path=\"dbtest\", overwrite=False)\r\n", + "\r\n", + "step_1_input = DataReference(datastore=def_blob_store, path_on_datastore=\"dbtest\",\r\n", + " data_reference_name=\"input\")\r\n", + "\r\n", "step_1_output = PipelineData(\"output\", datastore=def_blob_store)" - ] + ], + "outputs": [], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "### Add a DatabricksStep\n", "Adds a Databricks notebook as a step in a Pipeline.\n", @@ -405,83 +405,83 @@ "runconfig = RunConfiguration()\n", "runconfig.load(path='', name='')\n", "```" - ] + ], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "### 1. Running the demo notebook already added to the Databricks workspace\n", - "Create a notebook in the Azure Databricks workspace, and provide the path to that notebook as the value associated with the environment variable \"DATABRICKS_NOTEBOOK_PATH\". This will then set the variable\u00c2\u00a0notebook_path\u00c2\u00a0when you run the code cell below:\n", + "Create a notebook in the Azure Databricks workspace, and provide the path to that notebook as the value associated with the environment variable \"DATABRICKS_NOTEBOOK_PATH\". This will then set the variable notebook_path when you run the code cell below:\n", "\n", "your notebook's path in Azure Databricks UI by hovering over to notebook's title. A typical path of notebook looks like this `/Users/example@databricks.com/example`. See [Databricks Workspace](https://docs.azuredatabricks.net/user-guide/workspace.html) to learn about the folder structure.\n", "\n", "Note: DataPath `PipelineParameter` should be provided in list of inputs. Such parameters can be accessed by the datapath `name`." - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "notebook_path=os.getenv(\"DATABRICKS_NOTEBOOK_PATH\", \"\") # Databricks notebook path\n", - "\n", - "dbNbStep = DatabricksStep(\n", - " name=\"DBNotebookInWS\",\n", - " inputs=[step_1_input],\n", - " outputs=[step_1_output],\n", - " num_workers=1,\n", - " notebook_path=notebook_path,\n", - " notebook_params={'myparam': 'testparam', \n", - " 'myparam2': pipeline_param},\n", - " run_name='DB_Notebook_demo',\n", - " compute_target=databricks_compute,\n", - " allow_reuse=True\n", + "notebook_path=os.getenv(\"DATABRICKS_NOTEBOOK_PATH\", \"\") # Databricks notebook path\r\n", + "\r\n", + "dbNbStep = DatabricksStep(\r\n", + " name=\"DBNotebookInWS\",\r\n", + " inputs=[step_1_input],\r\n", + " outputs=[step_1_output],\r\n", + " num_workers=1,\r\n", + " notebook_path=notebook_path,\r\n", + " notebook_params={'myparam': 'testparam', \r\n", + " 'myparam2': pipeline_param},\r\n", + " run_name='DB_Notebook_demo',\r\n", + " compute_target=databricks_compute,\r\n", + " allow_reuse=True\r\n", ")" - ] + ], + "outputs": [], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "#### Build and submit the Experiment\n", "\n", "Note: Default value of `pipeline_param` will be used if different value is not specified in pipeline parameters during submission" - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "steps = [dbNbStep]\n", - "pipeline = Pipeline(workspace=ws, steps=steps)\n", - "pipeline_run = Experiment(ws, 'DB_Notebook_demo').submit(pipeline)\n", + "steps = [dbNbStep]\r\n", + "pipeline = Pipeline(workspace=ws, steps=steps)\r\n", + "pipeline_run = Experiment(ws, 'DB_Notebook_demo').submit(pipeline)\r\n", "pipeline_run.wait_for_completion()" - ] + ], + "outputs": [], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "#### View Run Details" - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "from azureml.widgets import RunDetails\n", + "from azureml.widgets import RunDetails\r\n", "RunDetails(pipeline_run).show()" - ] + ], + "outputs": [], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "### 2. Running a Python script from DBFS\n", "This shows how to run a Python script in DBFS. \n", @@ -495,67 +495,67 @@ "The code in the below cell assumes that you have completed the previous step of uploading the script `train-db-dbfs.py` to the root folder in DBFS.\n", "\n", "Note: `pipeline_param` will add two values in the python_script_params, a name followed by value. the name will be in this format `--MY_PIPELINE_PARAM`. For example, in the given case, python_script_params will be `[\"arg1\", \"--MY_PIPELINE_PARAM\", \"pipeline_param1\", \"arg2\"]`" - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "python_script_path = os.getenv(\"DATABRICKS_PYTHON_SCRIPT_PATH\", \"\") # Databricks python script path\n", - "\n", - "dbPythonInDbfsStep = DatabricksStep(\n", - " name=\"DBPythonInDBFS\",\n", - " inputs=[step_1_input],\n", - " num_workers=1,\n", - " python_script_path=python_script_path,\n", - " python_script_params={'arg1', pipeline_param, 'arg2'},\n", - " run_name='DB_Python_demo',\n", - " compute_target=databricks_compute,\n", - " allow_reuse=True\n", + "python_script_path = os.getenv(\"DATABRICKS_PYTHON_SCRIPT_PATH\", \"\") # Databricks python script path\r\n", + "\r\n", + "dbPythonInDbfsStep = DatabricksStep(\r\n", + " name=\"DBPythonInDBFS\",\r\n", + " inputs=[step_1_input],\r\n", + " num_workers=1,\r\n", + " python_script_path=python_script_path,\r\n", + " python_script_params={'arg1', pipeline_param, 'arg2'},\r\n", + " run_name='DB_Python_demo',\r\n", + " compute_target=databricks_compute,\r\n", + " allow_reuse=True\r\n", ")" - ] + ], + "outputs": [], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "#### Build and submit the Experiment" - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "steps = [dbPythonInDbfsStep]\n", - "pipeline = Pipeline(workspace=ws, steps=steps)\n", - "pipeline_run = Experiment(ws, 'DB_Python_demo').submit(pipeline)\n", + "steps = [dbPythonInDbfsStep]\r\n", + "pipeline = Pipeline(workspace=ws, steps=steps)\r\n", + "pipeline_run = Experiment(ws, 'DB_Python_demo').submit(pipeline)\r\n", "pipeline_run.wait_for_completion()" - ] + ], + "outputs": [], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "#### View Run Details" - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "from azureml.widgets import RunDetails\n", + "from azureml.widgets import RunDetails\r\n", "RunDetails(pipeline_run).show()" - ] + ], + "outputs": [], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "### 3. Running a Python script in Databricks that currenlty is in local computer\n", "To run a Python script that is currently in your local computer, follow the instructions below. \n", @@ -565,68 +565,68 @@ "The best practice is to use separate folders for scripts and its dependent files for each step and specify that folder as the `source_directory` for the step. This helps reduce the size of the snapshot created for the step (only the specific folder is snapshotted). Since changes in any files in the `source_directory` would trigger a re-upload of the snapshot, this helps keep the reuse of the step when there are no changes in the `source_directory` of the step.\n", "\n", "In this case, the Python script will be uploaded first to DBFS, and then the script will be run in Databricks." - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "python_script_name = \"train-db-local.py\"\n", - "source_directory = \"./databricks_train\"\n", - "\n", - "dbPythonInLocalMachineStep = DatabricksStep(\n", - " name=\"DBPythonInLocalMachine\",\n", - " inputs=[step_1_input],\n", - " num_workers=1,\n", - " python_script_name=python_script_name,\n", - " source_directory=source_directory,\n", - " run_name='DB_Python_Local_demo',\n", - " compute_target=databricks_compute,\n", - " allow_reuse=True\n", + "python_script_name = \"train-db-local.py\"\r\n", + "source_directory = \"./databricks_train\"\r\n", + "\r\n", + "dbPythonInLocalMachineStep = DatabricksStep(\r\n", + " name=\"DBPythonInLocalMachine\",\r\n", + " inputs=[step_1_input],\r\n", + " num_workers=1,\r\n", + " python_script_name=python_script_name,\r\n", + " source_directory=source_directory,\r\n", + " run_name='DB_Python_Local_demo',\r\n", + " compute_target=databricks_compute,\r\n", + " allow_reuse=True\r\n", ")" - ] + ], + "outputs": [], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "#### Build and submit the Experiment" - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "steps = [dbPythonInLocalMachineStep]\n", - "pipeline = Pipeline(workspace=ws, steps=steps)\n", - "pipeline_run = Experiment(ws, 'DB_Python_Local_demo').submit(pipeline)\n", + "steps = [dbPythonInLocalMachineStep]\r\n", + "pipeline = Pipeline(workspace=ws, steps=steps)\r\n", + "pipeline_run = Experiment(ws, 'DB_Python_Local_demo').submit(pipeline)\r\n", "pipeline_run.wait_for_completion()" - ] + ], + "outputs": [], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "#### View Run Details" - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "from azureml.widgets import RunDetails\n", + "from azureml.widgets import RunDetails\r\n", "RunDetails(pipeline_run).show()" - ] + ], + "outputs": [], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "### 4. Running a JAR job that is alreay added in DBFS\n", "To run a JAR job that is already uploaded to DBFS, follow the instructions below. You will first upload the JAR file to DBFS using the [CLI](https://docs.azuredatabricks.net/user-guide/dbfs-databricks-file-system.html).\n", @@ -638,222 +638,275 @@ "```\n", "\n", "Note: `pipeline_param` will add two values in the python_script_params, a name followed by value. the name will be in this format `--MY_PIPELINE_PARAM`. For example, in the given case, python_script_params will be `[\"arg1\", \"--MY_PIPELINE_PARAM\", \"pipeline_param1\", \"arg2\"]`" - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "main_jar_class_name = \"com.microsoft.aeva.Main\"\n", - "jar_library_dbfs_path = os.getenv(\"DATABRICKS_JAR_LIB_PATH\", \"\") # Databricks jar library path\n", - "\n", - "dbJarInDbfsStep = DatabricksStep(\n", - " name=\"DBJarInDBFS\",\n", - " inputs=[step_1_input],\n", - " num_workers=1,\n", - " main_class_name=main_jar_class_name,\n", - " jar_params={'arg1', pipeline_param, 'arg2'},\n", - " run_name='DB_JAR_demo',\n", - " jar_libraries=[JarLibrary(jar_library_dbfs_path)],\n", - " compute_target=databricks_compute,\n", - " allow_reuse=True\n", + "main_jar_class_name = \"com.microsoft.aeva.Main\"\r\n", + "jar_library_dbfs_path = os.getenv(\"DATABRICKS_JAR_LIB_PATH\", \"\") # Databricks jar library path\r\n", + "\r\n", + "dbJarInDbfsStep = DatabricksStep(\r\n", + " name=\"DBJarInDBFS\",\r\n", + " inputs=[step_1_input],\r\n", + " num_workers=1,\r\n", + " main_class_name=main_jar_class_name,\r\n", + " jar_params={'arg1', pipeline_param, 'arg2'},\r\n", + " run_name='DB_JAR_demo',\r\n", + " jar_libraries=[JarLibrary(jar_library_dbfs_path)],\r\n", + " compute_target=databricks_compute,\r\n", + " allow_reuse=True\r\n", ")" - ] + ], + "outputs": [], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "#### Build and submit the Experiment" - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "steps = [dbJarInDbfsStep]\n", - "pipeline = Pipeline(workspace=ws, steps=steps)\n", - "pipeline_run = Experiment(ws, 'DB_JAR_demo').submit(pipeline)\n", + "steps = [dbJarInDbfsStep]\r\n", + "pipeline = Pipeline(workspace=ws, steps=steps)\r\n", + "pipeline_run = Experiment(ws, 'DB_JAR_demo').submit(pipeline)\r\n", "pipeline_run.wait_for_completion()" - ] + ], + "outputs": [], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "#### View Run Details" - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "from azureml.widgets import RunDetails\n", + "from azureml.widgets import RunDetails\r\n", "RunDetails(pipeline_run).show()" - ] + ], + "outputs": [], + "metadata": {} }, { + "cell_type": "markdown", "source": [ "### 5. Running demo notebook already added to the Databricks workspace using existing cluster\n", "First you need register DBFS datastore and make sure path_on_datastore does exist in databricks file system, you can browser the files by refering [this](https://docs.azuredatabricks.net/user-guide/dbfs-databricks-file-system.html).\n", "\n", "Find existing_cluster_id by opeing Azure Databricks UI with Clusters page and in url you will find a string connected with '-' right after \"clusters/\"." ], - "cell_type": "markdown", "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "try:\n", - " dbfs_ds = Datastore.get(workspace=ws, datastore_name='dbfs_datastore')\n", - " print('DBFS Datastore already exists')\n", - "except Exception as ex:\n", - " dbfs_ds = Datastore.register_dbfs(ws, datastore_name='dbfs_datastore')\n", - "\n", - "step_1_input = DataReference(datastore=dbfs_ds, path_on_datastore=\"FileStore\", data_reference_name=\"input\")\n", + "try:\r\n", + " dbfs_ds = Datastore.get(workspace=ws, datastore_name='dbfs_datastore')\r\n", + " print('DBFS Datastore already exists')\r\n", + "except Exception as ex:\r\n", + " dbfs_ds = Datastore.register_dbfs(ws, datastore_name='dbfs_datastore')\r\n", + "\r\n", + "step_1_input = DataReference(datastore=dbfs_ds, path_on_datastore=\"FileStore\", data_reference_name=\"input\")\r\n", "step_1_output = PipelineData(\"output\", datastore=dbfs_ds)" - ] + ], + "outputs": [], + "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "dbNbWithExistingClusterStep = DatabricksStep(\n", - " name=\"DBFSReferenceWithExisting\",\n", - " inputs=[step_1_input],\n", - " outputs=[step_1_output],\n", - " notebook_path=notebook_path,\n", - " notebook_params={'myparam': 'testparam', \n", - " 'myparam2': pipeline_param},\n", - " run_name='DBFS_Reference_With_Existing',\n", - " compute_target=databricks_compute,\n", - " existing_cluster_id=\"your existing cluster id\",\n", - " allow_reuse=True\n", + "dbNbWithExistingClusterStep = DatabricksStep(\r\n", + " name=\"DBFSReferenceWithExisting\",\r\n", + " inputs=[step_1_input],\r\n", + " outputs=[step_1_output],\r\n", + " notebook_path=notebook_path,\r\n", + " notebook_params={'myparam': 'testparam', \r\n", + " 'myparam2': pipeline_param},\r\n", + " run_name='DBFS_Reference_With_Existing',\r\n", + " compute_target=databricks_compute,\r\n", + " existing_cluster_id=\"your existing cluster id\",\r\n", + " allow_reuse=True\r\n", ")" - ] + ], + "outputs": [], + "metadata": {} }, { + "cell_type": "markdown", "source": [ "#### Build and submit the Experiment" ], - "cell_type": "markdown", "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "steps = [dbNbWithExistingClusterStep]\n", - "pipeline = Pipeline(workspace=ws, steps=steps)\n", - "pipeline_run = Experiment(ws, 'DBFS_Reference_With_Existing').submit(pipeline)\n", + "steps = [dbNbWithExistingClusterStep]\r\n", + "pipeline = Pipeline(workspace=ws, steps=steps)\r\n", + "pipeline_run = Experiment(ws, 'DBFS_Reference_With_Existing').submit(pipeline)\r\n", "pipeline_run.wait_for_completion()" - ] + ], + "outputs": [], + "metadata": {} }, { + "cell_type": "markdown", "source": [ "#### View Run Details" ], - "cell_type": "markdown", "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "from azureml.widgets import RunDetails\n", + "from azureml.widgets import RunDetails\r\n", "RunDetails(pipeline_run).show()" - ] + ], + "outputs": [], + "metadata": {} }, { + "cell_type": "markdown", "source": [ - "### 6. Running a Python script in Databricks that currenlty is in local computer with existing cluster\n", - "When you access azure blob or data lake storage from an existing (interactive) cluster, you need to ensure the Spark configuration is set up correctly to access this storage and this set up may require the cluster to be restarted.\n", - "\n", + "### 6. Running a Python script in Databricks that is currently in local computer with existing cluster\r\n", + "When you access azure blob or data lake storage from an existing (interactive) cluster, you need to ensure the Spark configuration is set up correctly to access this storage and this set up may require the cluster to be restarted.\r\n", + "\r\n", "If you set permit_cluster_restart to True, AML will check if the spark configuration needs to be updated and restart the cluster for you if required. This will ensure that the storage can be correctly accessed from the Databricks cluster." ], - "cell_type": "markdown", "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "step_1_input = DataReference(datastore=def_blob_store, path_on_datastore=\"dbtest\",\n", - " data_reference_name=\"input\")\n", - "\n", - "dbPythonInLocalWithExistingStep = DatabricksStep(\n", - " name=\"DBPythonInLocalMachineWithExisting\",\n", - " inputs=[step_1_input],\n", - " python_script_name=python_script_name,\n", - " source_directory=source_directory,\n", - " run_name='DB_Python_Local_existing_demo',\n", - " compute_target=databricks_compute,\n", - " existing_cluster_id=\"your existing cluster id\",\n", - " allow_reuse=False,\n", - " permit_cluster_restart=True\n", + "step_1_input = DataReference(datastore=def_blob_store, path_on_datastore=\"dbtest\",\r\n", + " data_reference_name=\"input\")\r\n", + "\r\n", + "dbPythonInLocalWithExistingStep = DatabricksStep(\r\n", + " name=\"DBPythonInLocalMachineWithExisting\",\r\n", + " inputs=[step_1_input],\r\n", + " python_script_name=python_script_name,\r\n", + " source_directory=source_directory,\r\n", + " run_name='DB_Python_Local_existing_demo',\r\n", + " compute_target=databricks_compute,\r\n", + " existing_cluster_id=\"your existing cluster id\",\r\n", + " allow_reuse=False,\r\n", + " permit_cluster_restart=True\r\n", ")" - ] + ], + "outputs": [], + "metadata": {} }, { + "cell_type": "markdown", "source": [ "#### Build and submit the Experiment" ], - "cell_type": "markdown", "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "steps = [dbPythonInLocalWithExistingStep]\n", - "pipeline = Pipeline(workspace=ws, steps=steps)\n", - "pipeline_run = Experiment(ws, 'DB_Python_Local_existing_demo').submit(pipeline)\n", + "steps = [dbPythonInLocalWithExistingStep]\r\n", + "pipeline = Pipeline(workspace=ws, steps=steps)\r\n", + "pipeline_run = Experiment(ws, 'DB_Python_Local_existing_demo').submit(pipeline)\r\n", "pipeline_run.wait_for_completion()" - ] + ], + "outputs": [], + "metadata": {} }, { + "cell_type": "markdown", "source": [ "#### View Run Details" ], - "cell_type": "markdown", "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "from azureml.widgets import RunDetails\n", + "from azureml.widgets import RunDetails\r\n", "RunDetails(pipeline_run).show()" - ] + ], + "outputs": [], + "metadata": {} }, { + "cell_type": "markdown", + "source": [ + "### How to get run context in a Databricks interactive cluster\r\n", + "\r\n", + "Users are used to being able to use Run.get_context() to retrieve the parent_run_id for a given run_id. In DatabricksStep, however, a little more work is required to achieve this.\r\n", + "\r\n", + "The solution is to parse the script arguments and set corresponding environment variables to access the run context from within Databricks.\r\n", + "\r\n", + "Here is a code sample:" + ], + "metadata": {} + }, + { + "cell_type": "markdown", + "source": [ + "```python\r\n", + "from azureml.core import Run\r\n", + "import argparse\r\n", + "import os\r\n", + "\r\n", + "\r\n", + "def populate_environ():\r\n", + " parser = argparse.ArgumentParser(description='Process arguments passed to script')\r\n", + " parser.add_argument('--AZUREML_SCRIPT_DIRECTORY_NAME')\r\n", + " parser.add_argument('--AZUREML_RUN_TOKEN')\r\n", + " parser.add_argument('--AZUREML_RUN_TOKEN_EXPIRY')\r\n", + " parser.add_argument('--AZUREML_RUN_ID')\r\n", + " parser.add_argument('--AZUREML_ARM_SUBSCRIPTION')\r\n", + " parser.add_argument('--AZUREML_ARM_RESOURCEGROUP')\r\n", + " parser.add_argument('--AZUREML_ARM_WORKSPACE_NAME')\r\n", + " parser.add_argument('--AZUREML_ARM_PROJECT_NAME')\r\n", + " parser.add_argument('--AZUREML_SERVICE_ENDPOINT')\r\n", + "\r\n", + " args = parser.parse_args()\r\n", + " os.environ['AZUREML_SCRIPT_DIRECTORY_NAME'] = args.AZUREML_SCRIPT_DIRECTORY_NAME\r\n", + " os.environ['AZUREML_RUN_TOKEN'] = args.AZUREML_RUN_TOKEN\r\n", + " os.environ['AZUREML_RUN_TOKEN_EXPIRY'] = args.AZUREML_RUN_TOKEN_EXPIRY\r\n", + " os.environ['AZUREML_RUN_ID'] = args.AZUREML_RUN_ID\r\n", + " os.environ['AZUREML_ARM_SUBSCRIPTION'] = args.AZUREML_ARM_SUBSCRIPTION\r\n", + " os.environ['AZUREML_ARM_RESOURCEGROUP'] = args.AZUREML_ARM_RESOURCEGROUP\r\n", + " os.environ['AZUREML_ARM_WORKSPACE_NAME'] = args.AZUREML_ARM_WORKSPACE_NAME\r\n", + " os.environ['AZUREML_ARM_PROJECT_NAME'] = args.AZUREML_ARM_PROJECT_NAME\r\n", + " os.environ['AZUREML_SERVICE_ENDPOINT'] = args.AZUREML_SERVICE_ENDPOINT\r\n", + "\r\n", + "populate_environ()\r\n", + "run = Run.get_context(allow_offline=False)\r\n", + "print(run._run_dto[\"parent_run_id\"])\r\n", + "```" + ], + "metadata": {} + }, + { + "cell_type": "markdown", "source": [ "# Next: ADLA as a Compute Target\n", "To use ADLA as a compute target from Azure Machine Learning Pipeline, a AdlaStep is used. This [notebook](https://aka.ms/pl-adla) demonstrates the use of AdlaStep in Azure Machine Learning Pipeline." ], - "cell_type": "markdown", "metadata": {} } ],