Copyright (c) Microsoft Corporation. All rights reserved.  
Licensed under the MIT License.

![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/machine-learning-pipelines/intro-to-pipelines/aml-pipelines-data-transfer.png)

# Azure Machine Learning Pipeline with DataTranferStep
This notebook is used to demonstrate the use of DataTranferStep in Azure Machine Learning Pipeline.

In certain cases, you will need to transfer data from one data location to another. For example, your data may be in Azure SQL Database and you may want to move it to Azure Data Lake storage. Or, your data is in an ADLS account and you want to make it available in the Blob storage. The built-in **DataTransferStep** class helps you transfer data in these situations.

The below examples show how to move data between an ADLS account, Blob storage, SQL Server, PostgreSQL server. 

## Data transfer currently supports following storage types:

| Data store | Supported as a source | Supported as a sink |
| --- | --- | --- |
| Azure Blob Storage | Yes | Yes |
| Azure Data Lake Storage Gen 1 | Yes | Yes |
| Azure Data Lake Storage Gen 2 | Yes | Yes |
| Azure SQL Database | Yes | Yes |
| Azure Database for PostgreSQL | Yes | No |

## Azure Machine Learning and Pipeline SDK-specific imports

In [None]:
import os
import azureml.core
from azureml.core.compute import ComputeTarget, DataFactoryCompute
from azureml.exceptions import ComputeTargetException
from azureml.core import Workspace, Experiment
from azureml.pipeline.core import Pipeline
from azureml.core.datastore import Datastore
from azureml.data.data_reference import DataReference
from azureml.pipeline.steps import DataTransferStep

# Check core SDK version number
print("SDK version:", azureml.core.VERSION)

## Initialize Workspace

Initialize a workspace object from persisted configuration. If you are using an Azure Machine Learning Notebook VM, you are all set. Otherwise, make sure the config file is present at .\config.json

If you don't have a config.json file, please go through the [configuration Notebook](https://aka.ms/pl-config) first.

This sets you up with a working config file that has information on your workspace, subscription id, etc. 

In [None]:
ws = Workspace.from_config()
print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep = '\n')

## Register Datastores and create DataReferences

For background on registering your data store, consult this article:

https://docs.microsoft.com/en-us/azure/data-lake-store/data-lake-store-service-to-service-authenticate-using-active-directory

> Please make sure to update the following code examples with appropriate values.

### Azure Blob Storage

In [None]:
from msrest.exceptions import HttpOperationError

blob_datastore_name='MyBlobDatastore'
account_name=os.getenv("BLOB_ACCOUNTNAME_62", "<my-account-name>") # Storage account name
container_name=os.getenv("BLOB_CONTAINER_62", "<my-container-name>") # Name of Azure blob container
account_key=os.getenv("BLOB_ACCOUNT_KEY_62", "<my-account-key>") # Storage account key

try:
    blob_datastore = Datastore.get(ws, blob_datastore_name)
    print("found blob datastore with name: %s" % blob_datastore_name)
except HttpOperationError:
    blob_datastore = Datastore.register_azure_blob_container(
        workspace=ws,
        datastore_name=blob_datastore_name,
        account_name=account_name, # Storage account name
        container_name=container_name, # Name of Azure blob container
        account_key=account_key) # Storage account key
    print("registered blob datastore with name: %s" % blob_datastore_name)

blob_data_ref = DataReference(
    datastore=blob_datastore,
    data_reference_name="blob_test_data",
    path_on_datastore="testdata")

### Azure Data Lake Storage Gen1

In [None]:
from msrest.exceptions import HttpOperationError

datastore_name='MyAdlsDatastore'
subscription_id=os.getenv("ADL_SUBSCRIPTION_62", "<my-subscription-id>") # subscription id of ADLS account
resource_group=os.getenv("ADL_RESOURCE_GROUP_62", "<my-resource-group>") # resource group of ADLS account
store_name=os.getenv("ADL_STORENAME_62", "<my-datastore-name>") # ADLS account name
tenant_id=os.getenv("ADL_TENANT_62", "<my-tenant-id>") # tenant id of service principal
client_id=os.getenv("ADL_CLIENTID_62", "<my-client-id>") # client id of service principal
client_secret=os.getenv("ADL_CLIENT_SECRET_62", "<my-client-secret>") # the secret of service principal

try:
    adls_datastore = Datastore.get(ws, datastore_name)
    print("found datastore with name: %s" % datastore_name)
except HttpOperationError:
    adls_datastore = Datastore.register_azure_data_lake(
        workspace=ws,
        datastore_name=datastore_name,
        subscription_id=subscription_id, # subscription id of ADLS account
        resource_group=resource_group, # resource group of ADLS account
        store_name=store_name, # ADLS account name
        tenant_id=tenant_id, # tenant id of service principal
        client_id=client_id, # client id of service principal
        client_secret=client_secret) # the secret of service principal
    print("registered datastore with name: %s" % datastore_name)

adls_data_ref = DataReference(
    datastore=adls_datastore,
    data_reference_name="adls_test_data",
    path_on_datastore="testdata")

### Azure Data Lake Storage Gen2

In [None]:

adlsgen2_datastore_name = 'myadlsgen2datastore'
account_name=os.getenv("ADLSGEN2_ACCOUNTNAME_62", "<my-account-name>") # ADLS Gen2 account name
tenant_id=os.getenv("ADLSGEN2_TENANT_62", "<my-tenant-id>") # tenant id of service principal
client_id=os.getenv("ADLSGEN2_CLIENTID_62", "<my-client-id>") # client id of service principal
client_secret=os.getenv("ADLSGEN2_CLIENT_SECRET_62", "<my-client-secret>") # the secret of service principal

try:
    adlsgen2_datastore = Datastore.get(ws, adlsgen2_datastore_name)
    print("found ADLS Gen2 datastore with name: %s" % adlsgen2_datastore_name)
except:
    adlsgen2_datastore = Datastore.register_azure_data_lake_gen2(
        workspace=ws,
        datastore_name=adlsgen2_datastore_name,
        filesystem='test', # Name of ADLS Gen2 filesystem
        account_name=account_name, # ADLS Gen2 account name
        tenant_id=tenant_id, # tenant id of service principal
        client_id=client_id, # client id of service principal
        client_secret=client_secret) # the secret of service principal
    print("registered datastore with name: %s" % adlsgen2_datastore_name)

adlsgen2_data_ref = DataReference(
    datastore=adlsgen2_datastore,
    data_reference_name='adlsgen2_test_data',
    path_on_datastore='testdata')

### Azure SQL Database

In [None]:

sql_datastore_name="MySqlDatastore"
server_name=os.getenv("SQL_SERVERNAME_62", "<my-server-name>") # Name of SQL server
database_name=os.getenv("SQL_DATBASENAME_62", "<my-database-name>") # Name of SQL database
client_id=os.getenv("SQL_CLIENTNAME_62", "<my-client-id>") # client id of service principal with permissions to access database
client_secret=os.getenv("SQL_CLIENTSECRET_62", "<my-client-secret>") # the secret of service principal
tenant_id=os.getenv("SQL_TENANTID_62", "<my-tenant-id>") # tenant id of service principal

try:
    sql_datastore = Datastore.get(ws, sql_datastore_name)
    print("found sql database datastore with name: %s" % sql_datastore_name)
except HttpOperationError:
    sql_datastore = Datastore.register_azure_sql_database(
        workspace=ws,
        datastore_name=sql_datastore_name,
        server_name=server_name,
        database_name=database_name,
        client_id=client_id,
        client_secret=client_secret,
        tenant_id=tenant_id)
    print("registered sql databse datastore with name: %s" % sql_datastore_name)

from azureml.data.sql_data_reference import SqlDataReference

sql_query_data_ref = SqlDataReference(
    datastore=sql_datastore,
    data_reference_name="sql_query_data_ref",
    sql_query="select top 1 * from TestData")

### Azure Database for PostgreSQL

In [None]:

psql_datastore_name="MyPostgreSqlDatastore"
server_name=os.getenv("PSQL_SERVERNAME_62", "<my-server-name>") # Name of PostgreSQL server 
database_name=os.getenv("PSQL_DATBASENAME_62", "<my-database-name>") # Name of PostgreSQL database
user_id=os.getenv("PSQL_USERID_62", "<my-user-id>") # user id
user_password=os.getenv("PSQL_USERPW_62", "<my-user-password>") # user password

try:
    psql_datastore = Datastore.get(ws, psql_datastore_name)
    print("found PostgreSQL database datastore with name: %s" % psql_datastore_name)
except HttpOperationError:
    psql_datastore = Datastore.register_azure_postgre_sql(
        workspace=ws,
        datastore_name=psql_datastore,
        server_name=server_name,
        database_name=database_name,
        user_id=user_id,
        user_password=user_password)
    print("registered PostgreSQL databse datastore with name: %s" % psql_datastore_name)

from azureml.data.sql_data_reference import SqlDataReference

psql_query_data_ref = SqlDataReference(
    datastore=psql_datastore,
    data_reference_name="psql_query_data_ref",
    sql_query="SELECT * FROM testtable")

## Setup Data Factory Account

In [None]:
data_factory_name = 'adftest'

def get_or_create_data_factory(workspace, factory_name):
    try:
        return DataFactoryCompute(workspace, factory_name)
    except ComputeTargetException as e:
        if 'ComputeTargetNotFound' in e.message:
            print('Data factory not found, creating...')
            provisioning_config = DataFactoryCompute.provisioning_configuration()
            data_factory = ComputeTarget.create(workspace, factory_name, provisioning_config)
            data_factory.wait_for_completion()
            return data_factory
        else:
            raise e
            
data_factory_compute = get_or_create_data_factory(ws, data_factory_name)

print("setup data factory account complete")

## Create a DataTransferStep

**DataTransferStep** is used to transfer data between Azure Blob, Azure Data Lake Store, and Azure SQL database.

- **name:** Name of module
- **source_data_reference:** Input connection that serves as source of data transfer operation.
- **destination_data_reference:** Input connection that serves as destination of data transfer operation.
- **compute_target:** Azure Data Factory to use for transferring data.
- **allow_reuse:** Whether the step should reuse results of previous DataTransferStep when run with same inputs. Set as False to force data to be transferred again.

Optional arguments to explicitly specify whether a path corresponds to a file or a directory. These are useful when storage contains both file and directory with the same name or when creating a new destination path.

- **source_reference_type:** An optional string specifying the type of source_data_reference. Possible values include: 'file', 'directory'. When not specified, we use the type of existing path or directory if it's a new path.
- **destination_reference_type:** An optional string specifying the type of destination_data_reference. Possible values include: 'file', 'directory'. When not specified, we use the type of existing path or directory if it's a new path.

In [None]:
transfer_adls_to_blob = DataTransferStep(
    name="transfer_adls_to_blob",
    source_data_reference=adls_data_ref,
    destination_data_reference=blob_data_ref,
    compute_target=data_factory_compute)

print("data transfer step created")

In [None]:

transfer_adlsgen2_to_blob = DataTransferStep(
    name='transfer_adlsgen2_to_blob',
    source_data_reference=adlsgen2_data_ref,
    destination_data_reference=blob_data_ref,
    compute_target=data_factory_compute)

transfer_sql_to_blob = DataTransferStep(
    name="transfer_sql_to_blob",
    source_data_reference=sql_query_data_ref,
    destination_data_reference=blob_data_ref,
    compute_target=data_factory_compute,
    destination_reference_type='file')

transfer_psql_to_blob = DataTransferStep(
    name="transfer_psql_to_blob",
    source_data_reference=psql_query_data_ref,
    destination_data_reference=blob_data_ref,
    compute_target=data_factory_compute,
    destination_reference_type='file')

print("data transfer step created for Sql server and PostgreSQL")

## Build and Submit the Experiment

In [None]:
pipeline_01 = Pipeline(
    description="data_transfer_01",
    workspace=ws,
    steps=[transfer_adls_to_blob])

pipeline_run_01 = Experiment(ws, "Data_Transfer_example_01").submit(pipeline_01)
pipeline_run_01.wait_for_completion()

In [None]:
pipeline_02 = Pipeline(
    description="data_transfer_02",
    workspace=ws,
    steps=[transfer_sql_to_blob,transfer_psql_to_blob, transfer_adlsgen2_to_blob])

pipeline_run_02 = Experiment(ws, "Data_Transfer_example_02").submit(pipeline_02)
pipeline_run_02.wait_for_completion()

### View Run Details

In [None]:
from azureml.widgets import RunDetails
RunDetails(pipeline_run_01).show()

In [None]:
from azureml.widgets import RunDetails
RunDetails(pipeline_run_02).show()

# Next: Databricks as a Compute Target
To use Databricks as a compute target from Azure Machine Learning Pipeline, a DatabricksStep is used. This [notebook](https://aka.ms/pl-databricks) demonstrates the use of a DatabricksStep in an Azure Machine Learning Pipeline.