Copyright (c) Microsoft Corporation. All rights reserved. 

Licensed under the MIT License.

![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/azure-arcadia/spark_job_on_synapse_spark_pool.png)

# Using Synapse Spark Pool as a Compute Target from Azure Machine Learning Remote Run
1. To use Synapse Spark Pool as a compute target from Experiment Run, [ScriptRunConfig](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.script_run_config.scriptrunconfig?view=azure-ml-py) is used, the same as other Experiment Runs. This notebook demonstrates how to leverage ScriptRunConfig to submit an experiment run to an attached Synapse Spark cluster.
2. To use Synapse Spark Pool as a compute target from [Azure Machine Learning Pipeline](https://aka.ms/pl-concept), a [SynapseSparkStep](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.synapse_spark_step.synapsesparkstep?view=azure-ml-py) is used. This notebook demonstrates how to leverage SynapseSparkStep in Azure Machine Learning Pipeline.

## Before you begin:
1. **Create an Azure Synapse workspace**, check [this] (https://docs.microsoft.com/en-us/azure/synapse-analytics/quickstart-create-workspace) for more information.
2. **Create Spark Pool in Synapse workspace**: check [this] (https://docs.microsoft.com/en-us/azure/synapse-analytics/quickstart-create-apache-spark-pool-portal) for more information.

# Azure Machine Learning and Pipeline SDK-specific imports

In [None]:
import os
import azureml.core
from azureml.core import Workspace, Experiment
from azureml.core import LinkedService, SynapseWorkspaceLinkedServiceConfiguration
from azureml.core.compute import ComputeTarget, AmlCompute, SynapseCompute
from azureml.exceptions import ComputeTargetException
from azureml.data import HDFSOutputDatasetConfig
from azureml.core.datastore import Datastore
from azureml.core.runconfig import RunConfiguration
from azureml.core.conda_dependencies import CondaDependencies
from azureml.pipeline.core import Pipeline
from azureml.pipeline.steps import PythonScriptStep, SynapseSparkStep

# Check core SDK version number
print("SDK version:", azureml.core.VERSION)

In [None]:
ws = Workspace.from_config()
print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep = '\n')

# Link Synapse workspace to AML 
You have to be an "Owner" of Synapse workspace resource to perform linking. You can check your role in the Azure resource management portal, if you don't have an "Owner" role, you can contact an "Owner" to link the workspaces for you.

In [None]:

# Replace with your resource info before running.

synapse_subscription_id=os.getenv("SYNAPSE_SUBSCRIPTION_ID", "<my-synapse-subscription-id>")
synapse_resource_group=os.getenv("SYNAPSE_RESOURCE_GROUP", "<my-synapse-resource-group>")
synapse_workspace_name=os.getenv("SYNAPSE_WORKSPACE_NAME", "<my-synapse-workspace-name>")
synapse_linked_service_name=os.getenv("SYNAPSE_LINKED_SERVICE_NAME", "<my-synapse-linked-service-name>")

synapse_link_config = SynapseWorkspaceLinkedServiceConfiguration(
    subscription_id=synapse_subscription_id,
    resource_group=synapse_resource_group,
    name=synapse_workspace_name
)

linked_service = LinkedService.register(
    workspace=ws,
    name=synapse_linked_service_name,
    linked_service_config=synapse_link_config)

# Linked service property

A MSI (system_assigned_identity_principal_id) will be generated for each linked service, for example:

name=synapselink,</p>
type=Synapse, </p>
linked_service_resource_id=/subscriptions/4faaaf21-663f-4391-96fd-47197c630979/resourceGroups/static_resources_synapse_test/providers/Microsoft.Synapse/workspaces/synapsetest2, </p>
system_assigned_identity_principal_id=eb355d52-3806-4c5a-aec9-91447e8cfc2e </p>

#### Make sure you grant "Synapse Apache Spark Administrator" role of the synapse workspace to the generated workspace linking MSI in Synapse studio portal before you submit job.

In [None]:
linked_service

In [None]:
LinkedService.list(ws)

# Attach Synapse spark pool as AML compute target

In [None]:
synapse_spark_pool_name=os.getenv("SYNAPSE_SPARK_POOL_NAME", "<my-synapse-spark-pool-name>")
synapse_compute_name=os.getenv("SYNAPSE_COMPUTE_NAME", "<my-synapse-compute-name>")

attach_config = SynapseCompute.attach_configuration(
        linked_service,
        type="SynapseSpark",
        pool_name=synapse_spark_pool_name)

synapse_compute=ComputeTarget.attach(
        workspace=ws,
        name=synapse_compute_name,
        attach_configuration=attach_config)

synapse_compute.wait_for_completion()

# Start an experiment run

## Prepare data

In [None]:
# Use the default blob storage
def_blob_store = Datastore(ws, "workspaceblobstore")
print('Datastore {} will be used'.format(def_blob_store.name))

# We are uploading a sample file in the local directory to be used as a datasource
file_name = "Titanic.csv"
def_blob_store.upload_files(files=["./{}".format(file_name)], overwrite=False)
 

## Tabular dataset as input

In [None]:
from azureml.core import Dataset
titanic_tabular_dataset = Dataset.Tabular.from_delimited_files(path=[(def_blob_store, file_name)])
input1 = titanic_tabular_dataset.as_named_input("tabular_input")

## File dataset as input

In [None]:
from azureml.core import Dataset
titanic_file_dataset = Dataset.File.from_files(path=[(def_blob_store, file_name)])
input2 = titanic_file_dataset.as_named_input("file_input").as_hdfs()

## Output config: the output will be registered as a File dataset



In [None]:
from azureml.data import HDFSOutputDatasetConfig
output = HDFSOutputDatasetConfig(destination=(def_blob_store,"test")).register_on_complete(name="registered_dataset")

## Dataprep script

In [None]:
os.makedirs("code", exist_ok=True)

In [None]:
%%writefile code/dataprep.py
import os
import sys
import azureml.core
from pyspark.sql import SparkSession
from azureml.core import Run, Dataset

print(azureml.core.VERSION)
print(os.environ)

import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--tabular_input")
parser.add_argument("--file_input")
parser.add_argument("--output_dir")
args = parser.parse_args()

# use dataset sdk to read tabular dataset
run_context = Run.get_context()
dataset = Dataset.get_by_id(run_context.experiment.workspace,id=args.tabular_input)
sdf = dataset.to_spark_dataframe()
sdf.show()

# use hdfs path to read file dataset
spark= SparkSession.builder.getOrCreate()
sdf = spark.read.option("header", "true").csv(args.file_input)
sdf.show()

sdf.coalesce(1).write\
.option("header", "true")\
.mode("append")\
.csv(args.output_dir)

## Set up Conda dependency for the following Script Run

In [None]:
from azureml.core.environment import CondaDependencies
conda_dep = CondaDependencies()
conda_dep.add_pip_package("azureml-core==1.20.0")

## How to leverage ScriptRunConfig to submit an experiment run to an attached Synapse Spark cluster

In [None]:
from azureml.core import RunConfiguration
from azureml.core import ScriptRunConfig 
from azureml.core import Experiment

run_config = RunConfiguration(framework="pyspark")
run_config.target = synapse_compute_name

run_config.spark.configuration["spark.driver.memory"] = "1g" 
run_config.spark.configuration["spark.driver.cores"] = 2 
run_config.spark.configuration["spark.executor.memory"] = "1g" 
run_config.spark.configuration["spark.executor.cores"] = 1 
run_config.spark.configuration["spark.executor.instances"] = 1 

run_config.environment.python.conda_dependencies = conda_dep

script_run_config = ScriptRunConfig(source_directory = './code',
                                    script= 'dataprep.py',
                                    arguments = ["--tabular_input", input1, 
                                                 "--file_input", input2,
                                                 "--output_dir", output],
                                    run_config = run_config) 

In [None]:
from azureml.core import Experiment 
exp = Experiment(workspace=ws, name="synapse-spark") 
run = exp.submit(config=script_run_config) 
run

## How to leverage SynapseSparkStep in an AML pipeline to orchestrate data prep step on Synapse Spark and training step on AzureML compute.

In [None]:
# Choose a name for your CPU cluster
cpu_cluster_name = "cpucluster"

# Verify that cluster does not exist already
try:
    cpu_cluster = ComputeTarget(workspace=ws, name=cpu_cluster_name)
    print('Found existing cluster, use it.')
except ComputeTargetException:
    compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_D2_V2',
                                                           max_nodes=1)
    cpu_cluster = ComputeTarget.create(ws, cpu_cluster_name, compute_config)

cpu_cluster.wait_for_completion(show_output=True)

In [None]:
%%writefile code/train.py
import glob
import os
import sys
from os import listdir
from os.path import isfile, join

mypath = os.environ["step2_input"]
files = [f for f in listdir(mypath) if isfile(join(mypath, f))]
for file in files:
    with open(join(mypath,file)) as f:
        print(f.read())

In [None]:
titanic_tabular_dataset = Dataset.Tabular.from_delimited_files(path=[(def_blob_store, file_name)])
titanic_file_dataset = Dataset.File.from_files(path=[(def_blob_store, file_name)])

step1_input1 = titanic_tabular_dataset.as_named_input("tabular_input")
step1_input2 = titanic_file_dataset.as_named_input("file_input").as_hdfs()
step1_output = HDFSOutputDatasetConfig(destination=(def_blob_store,"test")).register_on_complete(name="registered_dataset")

step2_input = step1_output.as_input("step2_input").as_download()


from azureml.core.environment import Environment
env = Environment(name="myenv")
env.python.conda_dependencies.add_pip_package("azureml-core==1.20.0")

step_1 = SynapseSparkStep(name = 'synapse-spark',
                          file = 'dataprep.py',
                          source_directory="./code", 
                          inputs=[step1_input1, step1_input2],
                          outputs=[step1_output],
                          arguments = ["--tabular_input", step1_input1, 
                                       "--file_input", step1_input2,
                                       "--output_dir", step1_output],
                          compute_target = synapse_compute_name,
                          driver_memory = "7g",
                          driver_cores = 4,
                          executor_memory = "7g",
                          executor_cores = 2,
                          num_executors = 1,
                          environment = env)

step_2 = PythonScriptStep(script_name="train.py",
                          arguments=[step2_input],
                          inputs=[step2_input],
                          compute_target=cpu_cluster_name,
                          source_directory="./code",
                          allow_reuse=False)

pipeline = Pipeline(workspace=ws, steps=[step_1, step_2])
pipeline_run = pipeline.submit('synapse-pipeline', regenerate_outputs=True)