Files
MachineLearningNotebooks/how-to-use-azureml/azure-synapse/Synapse_Job_Scala_Support.ipynb

186 lines
6.2 KiB
Plaintext

{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Copyright (c) Microsoft Corporation. All rights reserved. \n",
"\n",
"Licensed under the MIT License."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/azure-arcadia/Synapse_Job_Scala_Support.png)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Get AML workspace which has synapse spark pool attached"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.core import Workspace, Experiment, Dataset, Environment\n",
"\n",
"ws = Workspace.from_config()\n",
"print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep = '\\n')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Leverage ScriptRunConfig to submit scala job to an attached synapse spark cluster"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Prepare data"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.core.datastore import Datastore\n",
"# Use the default blob storage\n",
"def_blob_store = Datastore(ws, \"workspaceblobstore\")\n",
"\n",
"# We are uploading a sample file in the local directory to be used as a datasource\n",
"file_name = \"shakespeare.txt\"\n",
"def_blob_store.upload_files(files=[\"./{}\".format(file_name)], overwrite=False)\n",
"\n",
"# Create file dataset\n",
"file_dataset = Dataset.File.from_files(path=[(def_blob_store, file_name)])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.core.runconfig import RunConfiguration\n",
"from azureml.data import HDFSOutputDatasetConfig\n",
"import uuid\n",
"\n",
"run_config = RunConfiguration(framework=\"pyspark\")\n",
"run_config.target = \"link-pool\"\n",
"run_config.spark.configuration[\"spark.driver.memory\"] = \"2g\"\n",
"run_config.spark.configuration[\"spark.driver.cores\"] = 2\n",
"run_config.spark.configuration[\"spark.executor.memory\"] = \"2g\"\n",
"run_config.spark.configuration[\"spark.executor.cores\"] = 1\n",
"run_config.spark.configuration[\"spark.executor.instances\"] = 1\n",
"# This can be removed if you are using local jars in source folder\n",
"run_config.spark.configuration[\"spark.yarn.dist.jars\"]=\"wasbs://synapse@azuremlexamples.blob.core.windows.net/shared/wordcount.jar\"\n",
"\n",
"dir_name = \"wordcount-{}\".format(str(uuid.uuid4()))\n",
"input = file_dataset.as_named_input(\"input\").as_hdfs()\n",
"output = HDFSOutputDatasetConfig(destination=(ws.get_default_datastore(), \"{}/result\".format(dir_name)))\n",
"\n",
"from azureml.core import ScriptRunConfig\n",
"args = ['--input', input, '--output', output]\n",
"script_run_config = ScriptRunConfig(source_directory = '.',\n",
" script= 'start_script.py',\n",
" arguments= args,\n",
" run_config = run_config)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.core import Experiment\n",
"exp = Experiment(workspace=ws, name='synapse-spark')\n",
"run = exp.submit(config=script_run_config)\n",
"run"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Leverage SynapseSparkStep in an AML pipeline to add dataprep step on synapse spark cluster"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.pipeline.core import Pipeline\n",
"from azureml.pipeline.steps import SynapseSparkStep\n",
"\n",
"configs = {}\n",
"#configs[\"spark.yarn.dist.jars\"] = \"wasbs://synapse@azuremlexamples.blob.core.windows.net/shared/wordcount.jar\"\n",
"step_1 = SynapseSparkStep(name = 'synapse-spark',\n",
" file = 'start_script.py',\n",
" jars = \"wasbs://synapse@azuremlexamples.blob.core.windows.net/shared/wordcount.jar\",\n",
" source_directory=\".\",\n",
" arguments = args,\n",
" compute_target = 'link-pool',\n",
" driver_memory = \"2g\",\n",
" driver_cores = 2,\n",
" executor_memory = \"2g\",\n",
" executor_cores = 1,\n",
" num_executors = 1,\n",
" conf = configs)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"pipeline = Pipeline(workspace=ws, steps=[step_1])\n",
"pipeline_run = pipeline.submit('synapse-pipeline', regenerate_outputs=True)"
]
}
],
"metadata": {
"authors": [
{
"name": "feli1"
}
],
"kernelspec": {
"display_name": "Python 3.6",
"language": "python",
"name": "python36"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.5"
},
"nteract": {
"version": "0.28.0"
}
},
"nbformat": 4,
"nbformat_minor": 2
}