Files
MachineLearningNotebooks/how-to-use-azureml/azure-synapse/spark_session_on_synapse_spark_pool.ipynb

327 lines
11 KiB
Plaintext

{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Copyright (c) Microsoft Corporation. All rights reserved.\n",
"\n",
"Licensed under the MIT License."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/azure-arcadia/spark_session_on_synapse_spark_pool.png)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Interactive Spark Session on Synapse Spark Pool"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Install package"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"!pip install -U \"azureml-synapse\""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"For JupyterLab, please additionally run:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"!jupyter lab build --minimize=False"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## PLEASE restart kernel and then refresh web page before starting spark session."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 0. How to leverage Spark Magic for interactive Spark experience"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"execution": {
"iopub.execute_input": "2020-06-05T03:22:14.965395Z",
"iopub.status.busy": "2020-06-05T03:22:14.965395Z",
"iopub.status.idle": "2020-06-05T03:22:14.970398Z",
"shell.execute_reply": "2020-06-05T03:22:14.969397Z",
"shell.execute_reply.started": "2020-06-05T03:22:14.965395Z"
}
},
"outputs": [],
"source": [
"# show help\n",
"%synapse ?"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 1. Start Synapse Session"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"synapse_compute_name=os.getenv(\"SYNAPSE_COMPUTE_NAME\", \"<my-synapse-compute-name>\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# use Synapse compute linked to the Compute Instance's workspace with an aml envrionment.\n",
"# conda dependencies specified in the environment will be installed before the spark session started.\n",
"\n",
"%synapse start -c $synapse_compute_name -e AzureML-Minimal"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# use Synapse compute from anther workspace via its config file\n",
"\n",
"# %synapse start -c <compute-name> -f config.json"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# use Synapse compute from anther workspace via subscription_id, resource_group and workspace_name\n",
"\n",
"# %synapse start -c <compute-name> -s <subscription-id> -r <resource group> -w <workspace-name>"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# start a spark session with an AML environment, \n",
"# %synapse start -c <compute-name> -s <subscription-id> -r <resource group> -w <workspace-name> -e AzureML-Minimal"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 2. Data prepration\n",
"\n",
"Three types of datastore are supported in synapse spark, and you have two ways to load the data.\n",
"\n",
"\n",
"| Datastore Type | Data Acess |\n",
"|--------------------|-------------------------------|\n",
"| Blob | Credential |\n",
"| Adlsgen1 | Credential & Credential-less |\n",
"| Adlsgen2 | Credential & Credential-less |"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Example 1: Data loading by HDFS path"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Read data from Blob**\n",
"\n",
"```python\n",
"# setup access key or sas token\n",
"\n",
"sc._jsc.hadoopConfiguration().set(\"fs.azure.account.key.<storage account name>.blob.core.windows.net\", \"<acess key>\")\n",
"sc._jsc.hadoopConfiguration().set(\"fs.azure.sas.<container name>.<storage account name>.blob.core.windows.net\", \"sas token\")\n",
"\n",
"df = spark.read.parquet(\"wasbs://<container name>@<storage account name>.blob.core.windows.net/<path>\")\n",
"```\n",
"\n",
"**Read data from Adlsgen1**\n",
"\n",
"```python\n",
"# setup service pricinpal which has access of the data\n",
"# If no data Credential is setup, the user identity will be used to do access control\n",
"\n",
"sc._jsc.hadoopConfiguration().set(\"fs.adl.account.<storage account name>.oauth2.access.token.provider.type\",\"ClientCredential\")\n",
"sc._jsc.hadoopConfiguration().set(\"fs.adl.account.<storage account name>.oauth2.client.id\", \"<client id>\")\n",
"sc._jsc.hadoopConfiguration().set(\"fs.adl.account.<storage account name>.oauth2.credential\", \"<client secret>\")\n",
"sc._jsc.hadoopConfiguration().set(\"fs.adl.account.<storage account name>.oauth2.refresh.url\", \"https://login.microsoftonline.com/<tenant id>/oauth2/token\")\n",
"\n",
"df = spark.read.csv(\"adl://<storage account name>.azuredatalakestore.net/<path>\")\n",
"```\n",
"\n",
"**Read data from Adlsgen2**\n",
"\n",
"```python\n",
"# setup service pricinpal which has access of the data\n",
"# If no data Credential is setup, the user identity will be used to do access control\n",
"\n",
"sc._jsc.hadoopConfiguration().set(\"fs.azure.account.auth.type.<storage account name>.dfs.core.windows.net\",\"OAuth\")\n",
"sc._jsc.hadoopConfiguration().set(\"fs.azure.account.oauth.provider.type.<storage account name>.dfs.core.windows.net\", \"org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider\")\n",
"sc._jsc.hadoopConfiguration().set(\"fs.azure.account.oauth2.client.id.<storage account name>.dfs.core.windows.net\", \"<client id>\")\n",
"sc._jsc.hadoopConfiguration().set(\"fs.azure.account.oauth2.client.secret.<storage account name>.dfs.core.windows.net\", \"<client secret>\")\n",
"sc._jsc.hadoopConfiguration().set(\"fs.azure.account.oauth2.client.endpoint.<storage account name>.dfs.core.windows.net\", \"https://login.microsoftonline.com/<tenant id>/oauth2/token\")\n",
"\n",
"df = spark.read.csv(\"abfss://<container name>@<storage account>.dfs.core.windows.net/<path>\")\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"execution": {
"iopub.execute_input": "2020-06-04T08:11:18.812276Z",
"iopub.status.busy": "2020-06-04T08:11:18.812276Z",
"iopub.status.idle": "2020-06-04T08:11:23.854526Z",
"shell.execute_reply": "2020-06-04T08:11:23.853525Z",
"shell.execute_reply.started": "2020-06-04T08:11:18.812276Z"
}
},
"outputs": [],
"source": [
"%%synapse\n",
"\n",
"from pyspark.sql.functions import col, desc\n",
"\n",
"df = spark.read.option(\"header\", \"true\").csv(\"wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv\")\n",
"df.filter(col('Survived') == 1).groupBy('Age').count().orderBy(desc('count')).show(10)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Example 2: Data loading by AML Dataset\n",
"\n",
"You can create tabular data by following the [guidance](https://docs.microsoft.com/en-us/azure/machine-learning/how-to-create-register-datasets) and use to_spark_dataframe() to load the data.\n",
"\n",
"```text\n",
"%%synapse\n",
"\n",
"import azureml.core\n",
"print(azureml.core.VERSION)\n",
"\n",
"from azureml.core import Workspace, Dataset\n",
"ws = Workspace.get(name='<workspace name>', subscription_id='<subscription id>', resource_group='<resource group>')\n",
"ds = Dataset.get_by_name(ws, \"<tabular dataset name>\")\n",
"df = ds.to_spark_dataframe()\n",
"\n",
"# You can do more data transformation on spark dataframe\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 3. Session Metadata\n",
"After session started, you can check the session's metadata, find the links to Synapse portal."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%synapse meta"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 4. Stop Session\n",
"When current session reach the status timeout, dead or any failure, you must explicitly stop it before start new one. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%synapse stop"
]
}
],
"metadata": {
"authors": [
{
"name": "yunzhan"
}
],
"kernelspec": {
"display_name": "Python 3.8 - AzureML",
"language": "python",
"name": "python38-azureml"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.7"
},
"nteract": {
"version": "0.28.0"
}
},
"nbformat": 4,
"nbformat_minor": 4
}