Copyright (c) Microsoft Corporation. All rights reserved. 
Licensed under the MIT License.

![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/machine-learning-pipelines/parallel-run/file-dataset-partition-per-folder.png)

# Using Azure Machine Learning Pipelines for Batch Inference for files input partitioned by folder structure

In this notebook, we will demonstrate how to make predictions on large quantities of data asynchronously using the ML pipelines with Azure Machine Learning. Batch inference (or batch scoring) provides cost-effective inference, with unparalleled throughput for asynchronous applications. Batch prediction pipelines can scale to perform inference on terabytes of production data. Batch prediction is optimized for high throughput, fire-and-forget predictions for a large collection of data.

> **Tip**
If your system requires low-latency processing (to process a single document or small set of documents quickly), use [real-time scoring](https://docs.microsoft.com/en-us/azure/machine-learning/service/how-to-consume-web-service) instead of batch prediction.

This example will create a sample dataset with nested folder structure, where the folder name corresponds to the attribute of the files inside it. The Batch Inference job would split the files inside the dataset according to their attributes, so that all files with identical value on the specified attribute will form up a single mini-batch to be processed.

The outline of this notebook is as follows:

- Create a dataset with nested folder structure and `partition_format` to interpret the folder structure into the attributes of files inside.
- Do batch inference on each mini-batch defined by the folder structure.

## Prerequisites
If you are using an Azure Machine Learning Notebook VM, you are all set. Otherwise, make sure you go through the configuration Notebook located at https://github.com/Azure/MachineLearningNotebooks first. This sets you up with a working config file that has information on your workspace, subscription id, etc. 

### Connect to workspace

In [None]:
from azureml.core.workspace import Workspace
ws = Workspace.from_config()
print('Workspace name: ' + ws.name, 
      'Azure region: ' + ws.location, 
      'Subscription id: ' + ws.subscription_id, 
      'Resource group: ' + ws.resource_group, sep = '\n')

datastore = ws.get_default_datastore()

In [None]:
import azureml.core
print(azureml.core.VERSION)

### Upload local test data to datastore
The destination folder in the datastore is structured so that the name of each folder layer corresponds to a property of all the files inside the foler.

In [None]:
from azureml.core import Dataset

datastore.upload('test_files/disco', 'dataset_partition_test/user1/winter', overwrite=True, show_progress=False)
datastore.upload('test_files/orchestra', 'dataset_partition_test/user1/fall', overwrite=True, show_progress=False)
datastore.upload('test_files/piano', 'dataset_partition_test/user2/summer', overwrite=True, show_progress=False)
datastore.upload('test_files/spirituality', 'dataset_partition_test/user3/fall', overwrite=True, show_progress=False)
datastore.upload('test_files/piano', 'dataset_partition_test/user4/spring', overwrite=True, show_progress=False)
datastore.upload('test_files/piano', 'dataset_partition_test/user4/fall', overwrite=True, show_progress=False)

### Create partitioned file dataset
Create a file dataset partitioned by 'user', 'season', and 'genres', each corresponds to a folder layer specified in `partition_format`. You can get a partition of data by specifying the value of one or more partition keys. E.g., by specifying `user=user1 and genres=piano`, you can get all the file that matches `dataset_partition_test/user1/*/piano.wav`.

In [None]:
partitioned_file_dataset = Dataset.File.from_files(path=(datastore, 'dataset_partition_test/*/*/*.wav'),
                                                   partition_format="dataset_partition_test/{user}/{season}/{genres}.wav",
                                                   validate=False)

In [None]:
partitioned_file_dataset.partition_keys

### Create or Attach existing compute resource

In [None]:
import os
from azureml.core.compute import AmlCompute, ComputeTarget

# choose a name for your cluster
compute_name = os.environ.get("AML_COMPUTE_CLUSTER_NAME", "cpu-cluster")
compute_min_nodes = os.environ.get("AML_COMPUTE_CLUSTER_MIN_NODES", 0)
compute_max_nodes = os.environ.get("AML_COMPUTE_CLUSTER_MAX_NODES", 2)

# This example uses CPU VM. For using GPU VM, set SKU to STANDARD_NC6
vm_size = os.environ.get("AML_COMPUTE_CLUSTER_SKU", "STANDARD_D2_V2")


if compute_name in ws.compute_targets:
    compute_target = ws.compute_targets[compute_name]
    if compute_target and type(compute_target) is AmlCompute:
        print('found compute target. just use it. ' + compute_name)
else:
    print('creating a new compute target...')
    provisioning_config = AmlCompute.provisioning_configuration(vm_size = vm_size,
                                                                min_nodes = compute_min_nodes, 
                                                                max_nodes = compute_max_nodes)

    # create the cluster
    compute_target = ComputeTarget.create(ws, compute_name, provisioning_config)
    
    # can poll for a minimum number of nodes and for a specific timeout. 
    # if no min node count is provided it will use the scale settings for the cluster
    compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)
    
     # For a more detailed view of current AmlCompute status, use get_status()
    print(compute_target.get_status().serialize())

### Intermediate/Output Data

In [None]:
from azureml.pipeline.core import Pipeline, PipelineData

output_dir = PipelineData(name="file_dataset_inferences", datastore=datastore)

### Calculate total file size of each mini-batch partitioned by dataset partition key(s)
The script is to sum up the total size of files in each mini-batch.

In [None]:
scripts_folder = "Code"
script_file = "total_file_size.py"

# peek at contents
with open(os.path.join(scripts_folder, script_file)) as inference_file:
    print(inference_file.read())

## Build and run the batch inference pipeline
### Specify the environment to run the script
You would need to specify the required private azureml packages in dependencies. 

In [None]:
from azureml.core import Environment
from azureml.core.runconfig import CondaDependencies, DEFAULT_CPU_IMAGE

batch_conda_deps = CondaDependencies.create(pip_packages=["azureml-core", "azureml-dataset-runtime[fuse]"])
batch_env = Environment(name="batch_environment")
batch_env.python.conda_dependencies = batch_conda_deps
batch_env.docker.base_image = DEFAULT_CPU_IMAGE

### Create the configuration to wrap the inference script
The parameter `partition_keys` is a list containing a subset of the dataset partition keys, specifying how is the input dataset partitioned. Each and every possible combination of values of partition_keys will form up a mini-batch. E.g., by specifying `partition_keys=['user', 'genres']` will result in 5 mini-batches, i.e. `user=halit && genres=disco`, `user=halit && genres=orchestra`, `user=chunyu && genres=piano`, `user=kin && genres=spirituality` and `user=ramandeep && genres=piano`

In [None]:
from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig

# In a real-world scenario, you'll want to shape your process per node and nodes to fit your problem domain.
parallel_run_config = ParallelRunConfig(
    source_directory=scripts_folder,
    entry_script=script_file,  # the user script to run against each input
    partition_keys=['user', 'genres'],
    error_threshold=5,
    output_action='append_row',
    append_row_file_name="file_size_outputs.txt",
    environment=batch_env,
    compute_target=compute_target, 
    node_count=2,
    run_invocation_timeout=600
)

### Create the pipeline step

In [None]:
parallel_run_step = ParallelRunStep(
    name='summarize-file-size',
    inputs=[partitioned_file_dataset.as_named_input("partitioned_file_input")],
    output=output_dir,
    parallel_run_config=parallel_run_config,
    allow_reuse=False
)

### Run the pipeline

In [None]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline

pipeline = Pipeline(workspace=ws, steps=[parallel_run_step])

pipeline_run = Experiment(ws, 'file-dataset-partition').submit(pipeline)

In [None]:
pipeline_run.wait_for_completion(show_output=True)

## View the prediction results
In the total_file_size.py file above you can see that the ResultList with the filename and the prediction result gets returned. These are written to the DataStore specified in the PipelineData object as the output data, which in this case is called inferences. This containers the outputs from all of the worker nodes used in the compute cluster. You can download this data to view the results ... below just filters to the first 10 rows

In [None]:
import pandas as pd
import tempfile

batch_run = pipeline_run.find_step_run(parallel_run_step.name)[0]
batch_output = batch_run.get_output_data(output_dir.name)

target_dir = tempfile.mkdtemp()
batch_output.download(local_path=target_dir)
result_file = os.path.join(target_dir, batch_output.path_on_datastore, parallel_run_config.append_row_file_name)

df = pd.read_csv(result_file, delimiter=",", header=None)
df.columns = ["File Name", "File Size", "Ratio of Size in Partition", "user", "genres", "Total File Size of Partition"]
print("Prediction has", df.shape[0], "rows")
df.head(10)