{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "Copyright (c) Microsoft Corporation. All rights reserved.\n", "\n", "Licensed under the MIT License." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/azure-arcadia/spark_session_on_synapse_spark_pool.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Interactive Spark Session on Synapse Spark Pool" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Install package" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!pip install -U \"azureml-synapse\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For JupyterLab, please additionally run:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!jupyter lab build --minimize=False" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## PLEASE restart kernel and then refresh web page before starting spark session." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 0. How to leverage Spark Magic for interactive Spark experience" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "execution": { "iopub.execute_input": "2020-06-05T03:22:14.965395Z", "iopub.status.busy": "2020-06-05T03:22:14.965395Z", "iopub.status.idle": "2020-06-05T03:22:14.970398Z", "shell.execute_reply": "2020-06-05T03:22:14.969397Z", "shell.execute_reply.started": "2020-06-05T03:22:14.965395Z" } }, "outputs": [], "source": [ "# show help\n", "%synapse ?" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 1. Start Synapse Session" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "synapse_compute_name=os.getenv(\"SYNAPSE_COMPUTE_NAME\", \"\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# use Synapse compute linked to the Compute Instance's workspace with an aml envrionment.\n", "# conda dependencies specified in the environment will be installed before the spark session started.\n", "\n", "%synapse start -c $synapse_compute_name -e AzureML-Minimal" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# use Synapse compute from anther workspace via its config file\n", "\n", "# %synapse start -c -f config.json" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# use Synapse compute from anther workspace via subscription_id, resource_group and workspace_name\n", "\n", "# %synapse start -c -s -r -w " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# start a spark session with an AML environment, \n", "# %synapse start -c -s -r -w -e AzureML-Minimal" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 2. Data prepration\n", "\n", "Three types of datastore are supported in synapse spark, and you have two ways to load the data.\n", "\n", "\n", "| Datastore Type | Data Acess |\n", "|--------------------|-------------------------------|\n", "| Blob | Credential |\n", "| Adlsgen1 | Credential & Credential-less |\n", "| Adlsgen2 | Credential & Credential-less |" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Example 1: Data loading by HDFS path" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Read data from Blob**\n", "\n", "```python\n", "# setup access key or sas token\n", "\n", "sc._jsc.hadoopConfiguration().set(\"fs.azure.account.key..blob.core.windows.net\", \"\")\n", "sc._jsc.hadoopConfiguration().set(\"fs.azure.sas...blob.core.windows.net\", \"sas token\")\n", "\n", "df = spark.read.parquet(\"wasbs://@.blob.core.windows.net/\")\n", "```\n", "\n", "**Read data from Adlsgen1**\n", "\n", "```python\n", "# setup service pricinpal which has access of the data\n", "# If no data Credential is setup, the user identity will be used to do access control\n", "\n", "sc._jsc.hadoopConfiguration().set(\"fs.adl.account..oauth2.access.token.provider.type\",\"ClientCredential\")\n", "sc._jsc.hadoopConfiguration().set(\"fs.adl.account..oauth2.client.id\", \"\")\n", "sc._jsc.hadoopConfiguration().set(\"fs.adl.account..oauth2.credential\", \"\")\n", "sc._jsc.hadoopConfiguration().set(\"fs.adl.account..oauth2.refresh.url\", \"https://login.microsoftonline.com//oauth2/token\")\n", "\n", "df = spark.read.csv(\"adl://.azuredatalakestore.net/\")\n", "```\n", "\n", "**Read data from Adlsgen2**\n", "\n", "```python\n", "# setup service pricinpal which has access of the data\n", "# If no data Credential is setup, the user identity will be used to do access control\n", "\n", "sc._jsc.hadoopConfiguration().set(\"fs.azure.account.auth.type..dfs.core.windows.net\",\"OAuth\")\n", "sc._jsc.hadoopConfiguration().set(\"fs.azure.account.oauth.provider.type..dfs.core.windows.net\", \"org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider\")\n", "sc._jsc.hadoopConfiguration().set(\"fs.azure.account.oauth2.client.id..dfs.core.windows.net\", \"\")\n", "sc._jsc.hadoopConfiguration().set(\"fs.azure.account.oauth2.client.secret..dfs.core.windows.net\", \"\")\n", "sc._jsc.hadoopConfiguration().set(\"fs.azure.account.oauth2.client.endpoint..dfs.core.windows.net\", \"https://login.microsoftonline.com//oauth2/token\")\n", "\n", "df = spark.read.csv(\"abfss://@.dfs.core.windows.net/\")\n", "```" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "execution": { "iopub.execute_input": "2020-06-04T08:11:18.812276Z", "iopub.status.busy": "2020-06-04T08:11:18.812276Z", "iopub.status.idle": "2020-06-04T08:11:23.854526Z", "shell.execute_reply": "2020-06-04T08:11:23.853525Z", "shell.execute_reply.started": "2020-06-04T08:11:18.812276Z" } }, "outputs": [], "source": [ "%%synapse\n", "\n", "from pyspark.sql.functions import col, desc\n", "\n", "df = spark.read.option(\"header\", \"true\").csv(\"wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv\")\n", "df.filter(col('Survived') == 1).groupBy('Age').count().orderBy(desc('count')).show(10)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Example 2: Data loading by AML Dataset\n", "\n", "You can create tabular data by following the [guidance](https://docs.microsoft.com/en-us/azure/machine-learning/how-to-create-register-datasets) and use to_spark_dataframe() to load the data.\n", "\n", "```text\n", "%%synapse\n", "\n", "import azureml.core\n", "print(azureml.core.VERSION)\n", "\n", "from azureml.core import Workspace, Dataset\n", "ws = Workspace.get(name='', subscription_id='', resource_group='')\n", "ds = Dataset.get_by_name(ws, \"\")\n", "df = ds.to_spark_dataframe()\n", "\n", "# You can do more data transformation on spark dataframe\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 3. Session Metadata\n", "After session started, you can check the session's metadata, find the links to Synapse portal." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%synapse meta" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 4. Stop Session\n", "When current session reach the status timeout, dead or any failure, you must explicitly stop it before start new one. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%synapse stop" ] } ], "metadata": { "authors": [ { "name": "yunzhan" } ], "kernelspec": { "display_name": "Python 3.8 - AzureML", "language": "python", "name": "python38-azureml" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.7" }, "nteract": { "version": "0.28.0" } }, "nbformat": 4, "nbformat_minor": 4 }