{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "Copyright (c) Microsoft Corporation. All rights reserved. \n", "\n", "Licensed under the MIT License." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/azure-arcadia/Synapse_Job_Scala_Support.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Get AML workspace which has synapse spark pool attached" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from azureml.core import Workspace, Experiment, Dataset, Environment\n", "\n", "ws = Workspace.from_config()\n", "print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep = '\\n')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Leverage ScriptRunConfig to submit scala job to an attached synapse spark cluster" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Prepare data" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from azureml.core.datastore import Datastore\n", "# Use the default blob storage\n", "def_blob_store = Datastore(ws, \"workspaceblobstore\")\n", "\n", "# We are uploading a sample file in the local directory to be used as a datasource\n", "file_name = \"shakespeare.txt\"\n", "def_blob_store.upload_files(files=[\"./{}\".format(file_name)], overwrite=False)\n", "\n", "# Create file dataset\n", "file_dataset = Dataset.File.from_files(path=[(def_blob_store, file_name)])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from azureml.core.runconfig import RunConfiguration\n", "from azureml.data import HDFSOutputDatasetConfig\n", "import uuid\n", "\n", "run_config = RunConfiguration(framework=\"pyspark\")\n", "run_config.target = \"link-pool\"\n", "run_config.spark.configuration[\"spark.driver.memory\"] = \"2g\"\n", "run_config.spark.configuration[\"spark.driver.cores\"] = 2\n", "run_config.spark.configuration[\"spark.executor.memory\"] = \"2g\"\n", "run_config.spark.configuration[\"spark.executor.cores\"] = 1\n", "run_config.spark.configuration[\"spark.executor.instances\"] = 1\n", "# This can be removed if you are using local jars in source folder\n", "run_config.spark.configuration[\"spark.yarn.dist.jars\"]=\"wasbs://synapse@azuremlexamples.blob.core.windows.net/shared/wordcount.jar\"\n", "\n", "dir_name = \"wordcount-{}\".format(str(uuid.uuid4()))\n", "input = file_dataset.as_named_input(\"input\").as_hdfs()\n", "output = HDFSOutputDatasetConfig(destination=(ws.get_default_datastore(), \"{}/result\".format(dir_name)))\n", "\n", "from azureml.core import ScriptRunConfig\n", "args = ['--input', input, '--output', output]\n", "script_run_config = ScriptRunConfig(source_directory = '.',\n", " script= 'start_script.py',\n", " arguments= args,\n", " run_config = run_config)\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from azureml.core import Experiment\n", "exp = Experiment(workspace=ws, name='synapse-spark')\n", "run = exp.submit(config=script_run_config)\n", "run" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Leverage SynapseSparkStep in an AML pipeline to add dataprep step on synapse spark cluster" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from azureml.pipeline.core import Pipeline\n", "from azureml.pipeline.steps import SynapseSparkStep\n", "\n", "configs = {}\n", "#configs[\"spark.yarn.dist.jars\"] = \"wasbs://synapse@azuremlexamples.blob.core.windows.net/shared/wordcount.jar\"\n", "step_1 = SynapseSparkStep(name = 'synapse-spark',\n", " file = 'start_script.py',\n", " jars = \"wasbs://synapse@azuremlexamples.blob.core.windows.net/shared/wordcount.jar\",\n", " source_directory=\".\",\n", " arguments = args,\n", " compute_target = 'link-pool',\n", " driver_memory = \"2g\",\n", " driver_cores = 2,\n", " executor_memory = \"2g\",\n", " executor_cores = 1,\n", " num_executors = 1,\n", " conf = configs)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pipeline = Pipeline(workspace=ws, steps=[step_1])\n", "pipeline_run = pipeline.submit('synapse-pipeline', regenerate_outputs=True)" ] } ], "metadata": { "authors": [ { "name": "feli1" } ], "kernelspec": { "display_name": "Python 3.8 - AzureML", "language": "python", "name": "python38-azureml" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.5" }, "nteract": { "version": "0.28.0" } }, "nbformat": 4, "nbformat_minor": 2 }