{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "Copyright (c) Microsoft Corporation. All rights reserved.\n", "\n", "Licensed under the MIT License." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/machine-learning-pipelines/pipeline-style-transfer/pipeline-style-transfer-parallel-run.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Neural style transfer on video\n", "Using modified code from `pytorch`'s neural style [example](https://pytorch.org/tutorials/advanced/neural_style_tutorial.html), we show how to setup a pipeline for doing style transfer on video. The pipeline has following steps:\n", "1. Split a video into images\n", "2. Run neural style on each image using one of the provided models (from `pytorch` pretrained models for this example).\n", "3. Stitch the image back into a video.\n", "\n", "> **Tip**\n", "If your system requires low-latency processing (to process a single document or small set of documents quickly), use [real-time scoring](https://docs.microsoft.com/en-us/azure/machine-learning/service/how-to-consume-web-service) instead of batch prediction." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Prerequisites\n", "If you are using an Azure Machine Learning Notebook VM, you are all set. Otherwise, make sure you go through the configuration Notebook located at https://github.com/Azure/MachineLearningNotebooks first if you haven't. This sets you up with a working config file that has information on your workspace, subscription id, etc. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Initialize Workspace\n", "\n", "Initialize a workspace object from persisted configuration." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Check core SDK version number\n", "import azureml.core\n", "\n", "print(\"SDK version:\", azureml.core.VERSION)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from azureml.core import Workspace, Experiment\n", "\n", "ws = Workspace.from_config()\n", "print('Workspace name: ' + ws.name, \n", " 'Azure region: ' + ws.location, \n", " 'Subscription id: ' + ws.subscription_id, \n", " 'Resource group: ' + ws.resource_group, sep = '\\n')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from azureml.core.compute import AmlCompute, ComputeTarget\n", "from azureml.core.datastore import Datastore\n", "from azureml.data.data_reference import DataReference\n", "from azureml.pipeline.core import Pipeline, PipelineData\n", "from azureml.pipeline.steps import PythonScriptStep\n", "from azureml.core.runconfig import CondaDependencies, RunConfiguration\n", "from azureml.core.compute_target import ComputeTargetException" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Download models" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "\n", "# create directory for model\n", "model_dir = 'models'\n", "if not os.path.isdir(model_dir):\n", " os.mkdir(model_dir)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import urllib.request\n", "\n", "def download_model(model_name):\n", " # downloaded models from https://pytorch.org/tutorials/advanced/neural_style_tutorial.html are kept here\n", " url = \"https://pipelinedata.blob.core.windows.net/styletransfer/saved_models/\" + model_name\n", " local_path = os.path.join(model_dir, model_name)\n", " urllib.request.urlretrieve(url, local_path)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Register all Models" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from azureml.core.model import Model\n", "mosaic_model = None\n", "candy_model = None\n", "\n", "models = Model.list(workspace=ws, tags=['scenario'])\n", "for m in models:\n", " print(\"Name:\", m.name,\"\\tVersion:\", m.version, \"\\tDescription:\", m.description, m.tags)\n", " if m.name == 'mosaic' and mosaic_model is None:\n", " mosaic_model = m\n", " elif m.name == 'candy' and candy_model is None:\n", " candy_model = m\n", "\n", "if mosaic_model is None:\n", " print('Mosaic model does not exist, registering it')\n", " download_model('mosaic.pth')\n", " mosaic_model = Model.register(model_path = os.path.join(model_dir, \"mosaic.pth\"),\n", " model_name = \"mosaic\",\n", " tags = {'type': \"mosaic\", 'scenario': \"Style transfer using batch inference\"},\n", " description = \"Style transfer - Mosaic\",\n", " workspace = ws)\n", "else:\n", " print('Reusing existing mosaic model')\n", " \n", "\n", "if candy_model is None:\n", " print('Candy model does not exist, registering it')\n", " download_model('candy.pth')\n", " candy_model = Model.register(model_path = os.path.join(model_dir, \"candy.pth\"),\n", " model_name = \"candy\",\n", " tags = {'type': \"candy\", 'scenario': \"Style transfer using batch inference\"},\n", " description = \"Style transfer - Candy\",\n", " workspace = ws)\n", "else:\n", " print('Reusing existing candy model')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Create or use existing compute" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# AmlCompute\n", "cpu_cluster_name = \"cpu-cluster\"\n", "try:\n", " cpu_cluster = AmlCompute(ws, cpu_cluster_name)\n", " print(\"found existing cluster.\")\n", "except ComputeTargetException:\n", " print(\"creating new cluster\")\n", " provisioning_config = AmlCompute.provisioning_configuration(vm_size = \"STANDARD_D2_v2\",\n", " max_nodes = 1)\n", "\n", " # create the cluster\n", " cpu_cluster = ComputeTarget.create(ws, cpu_cluster_name, provisioning_config)\n", " cpu_cluster.wait_for_completion(show_output=True)\n", " \n", "# AmlCompute\n", "gpu_cluster_name = \"gpu-cluster\"\n", "try:\n", " gpu_cluster = AmlCompute(ws, gpu_cluster_name)\n", " print(\"found existing cluster.\")\n", "except ComputeTargetException:\n", " print(\"creating new cluster\")\n", " provisioning_config = AmlCompute.provisioning_configuration(vm_size = \"STANDARD_NC6\",\n", " max_nodes = 3)\n", "\n", " # create the cluster\n", " gpu_cluster = ComputeTarget.create(ws, gpu_cluster_name, provisioning_config)\n", " gpu_cluster.wait_for_completion(show_output=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Python Scripts\n", "We use an edited version of `neural_style_mpi.py` (original is [here](https://github.com/pytorch/examples/blob/master/fast_neural_style/neural_style/neural_style.py)). Scripts to split and stitch the video are thin wrappers to calls to `ffmpeg`. \n", "\n", "We install `ffmpeg` through conda dependencies." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "scripts_folder = \"scripts\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "process_video_script_file = \"process_video.py\"\n", "\n", "# peek at contents\n", "with open(os.path.join(scripts_folder, process_video_script_file)) as process_video_file:\n", " print(process_video_file.read())" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "stitch_video_script_file = \"stitch_video.py\"\n", "\n", "# peek at contents\n", "with open(os.path.join(scripts_folder, stitch_video_script_file)) as stitch_video_file:\n", " print(stitch_video_file.read())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The sample video **organutan.mp4** is stored at a publicly shared datastore. We are registering the datastore below. If you want to take a look at the original video, click here. (https://pipelinedata.blob.core.windows.net/sample-videos/orangutan.mp4)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# datastore for input video\n", "account_name = \"pipelinedata\"\n", "video_ds = Datastore.register_azure_blob_container(ws, \"videos\", \"sample-videos\",\n", " account_name=account_name, overwrite=True)\n", "\n", "# the default blob store attached to a workspace\n", "default_datastore = ws.get_default_datastore()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Sample video" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "video_name=os.getenv(\"STYLE_TRANSFER_VIDEO_NAME\", \"orangutan.mp4\") \n", "orangutan_video = DataReference(datastore=video_ds,\n", " data_reference_name=\"video\",\n", " path_on_datastore=video_name, mode=\"download\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "cd = CondaDependencies()\n", "\n", "cd.add_channel(\"conda-forge\")\n", "cd.add_conda_package(\"ffmpeg==4.0.2\")\n", "\n", "# Runconfig\n", "amlcompute_run_config = RunConfiguration(conda_dependencies=cd)\n", "amlcompute_run_config.environment.docker.base_image = \"pytorch/pytorch\"\n", "amlcompute_run_config.environment.spark.precache_packages = False" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ffmpeg_audio = PipelineData(name=\"ffmpeg_audio\", datastore=default_datastore)\n", "processed_images = PipelineData(name=\"processed_images\", datastore=default_datastore)\n", "output_video = PipelineData(name=\"output_video\", datastore=default_datastore)\n", "\n", "ffmpeg_images_ds_name = \"ffmpeg_images_data\"\n", "ffmpeg_images = PipelineData(name=\"ffmpeg_images\", datastore=default_datastore)\n", "ffmpeg_images_file_dataset = ffmpeg_images.as_dataset()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Define tweakable parameters to pipeline\n", "These parameters can be changed when the pipeline is published and rerun from a REST call.\n", "As part of ParallelRunStep following 2 pipeline parameters will be created which can be used to override values.\n", " node_count\n", " process_count_per_node" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from azureml.pipeline.core.graph import PipelineParameter\n", "# create a parameter for style (one of \"candy\", \"mosaic\") to transfer the images to\n", "style_param = PipelineParameter(name=\"style\", default_value=\"mosaic\")\n", "# create a parameter for the number of nodes to use in step no. 2 (style transfer)\n", "nodecount_param = PipelineParameter(name=\"nodecount\", default_value=2)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "split_video_step = PythonScriptStep(\n", " name=\"split video\",\n", " script_name=\"process_video.py\",\n", " arguments=[\"--input_video\", orangutan_video,\n", " \"--output_audio\", ffmpeg_audio,\n", " \"--output_images\", ffmpeg_images_file_dataset,\n", " ],\n", " compute_target=cpu_cluster,\n", " inputs=[orangutan_video],\n", " outputs=[ffmpeg_images_file_dataset, ffmpeg_audio],\n", " runconfig=amlcompute_run_config,\n", " source_directory=scripts_folder\n", ")\n", "\n", "stitch_video_step = PythonScriptStep(\n", " name=\"stitch\",\n", " script_name=\"stitch_video.py\",\n", " arguments=[\"--images_dir\", processed_images, \n", " \"--input_audio\", ffmpeg_audio, \n", " \"--output_dir\", output_video],\n", " compute_target=cpu_cluster,\n", " inputs=[processed_images, ffmpeg_audio],\n", " outputs=[output_video],\n", " runconfig=amlcompute_run_config,\n", " source_directory=scripts_folder\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Create environment, parallel step run config and parallel run step" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from azureml.core import Environment\n", "from azureml.core.runconfig import DEFAULT_GPU_IMAGE\n", "\n", "parallel_cd = CondaDependencies()\n", "\n", "parallel_cd.add_channel(\"pytorch\")\n", "parallel_cd.add_conda_package(\"pytorch\")\n", "parallel_cd.add_conda_package(\"torchvision\")\n", "parallel_cd.add_conda_package(\"pillow<7\") # needed for torchvision==0.4.0\n", "parallel_cd.add_pip_package(\"azureml-core\")\n", "parallel_cd.add_pip_package(\"azureml-dataset-runtime[fuse]\")\n", "\n", "styleenvironment = Environment(name=\"styleenvironment\")\n", "styleenvironment.python.conda_dependencies=parallel_cd\n", "styleenvironment.docker.base_image = DEFAULT_GPU_IMAGE" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from azureml.pipeline.core import PipelineParameter\n", "from azureml.pipeline.steps import ParallelRunConfig\n", "\n", "parallel_run_config = ParallelRunConfig(\n", " environment=styleenvironment,\n", " entry_script='transform.py',\n", " output_action='summary_only',\n", " mini_batch_size=\"1\",\n", " error_threshold=1,\n", " source_directory=scripts_folder,\n", " compute_target=gpu_cluster, \n", " node_count=nodecount_param,\n", " process_count_per_node=2\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from azureml.pipeline.steps import ParallelRunStep\n", "from datetime import datetime\n", "\n", "parallel_step_name = 'styletransfer-' + datetime.now().strftime('%Y%m%d%H%M')\n", "\n", "distributed_style_transfer_step = ParallelRunStep(\n", " name=parallel_step_name,\n", " inputs=[ffmpeg_images_file_dataset], # Input file share/blob container/file dataset\n", " output=processed_images, # Output file share/blob container\n", " arguments=[\"--style\", style_param],\n", " parallel_run_config=parallel_run_config,\n", " allow_reuse=False #[optional - default value True]\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Run the pipeline" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pipeline = Pipeline(workspace=ws, steps=[stitch_video_step])\n", "\n", "pipeline.validate()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# submit the pipeline and provide values for the PipelineParameters used in the pipeline\n", "pipeline_run = Experiment(ws, 'styletransfer_parallel_mosaic').submit(pipeline)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Monitor pipeline run\n", "\n", "The pipeline run status could be checked in Azure Machine Learning portal (https://ml.azure.com). The link to the pipeline run could be retrieved by inspecting the `pipeline_run` object.\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# This will output information of the pipeline run, including the link to the details page of portal.\n", "pipeline_run" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Optional: View detailed logs (streaming) " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Wait the run for completion and show output log to console\n", "pipeline_run.wait_for_completion(show_output=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Download output video" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Downloads the video in `output_video` folder" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def download_video(run, target_dir=None):\n", " stitch_run = run.find_step_run(stitch_video_step.name)[0]\n", " port_data = stitch_run.get_output_data(output_video.name)\n", " port_data.download(target_dir, show_progress=True)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pipeline_run.wait_for_completion()\n", "download_video(pipeline_run, \"output_video_mosaic\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Publish pipeline" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pipeline_name = \"style-transfer-batch-inference\"\n", "print(pipeline_name)\n", "\n", "published_pipeline = pipeline.publish(\n", " name=pipeline_name, \n", " description=pipeline_name)\n", "print(\"Newly published pipeline id: {}\".format(published_pipeline.id))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Get published pipeline\n", "This is another way to get the published pipeline." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from azureml.pipeline.core import PublishedPipeline\n", "\n", "# You could retrieve all pipelines that are published, or \n", "# just get the published pipeline object that you have the ID for.\n", "\n", "# Get all published pipeline objects in the workspace\n", "all_pub_pipelines = PublishedPipeline.list(ws)\n", "\n", "# We will iterate through the list of published pipelines and \n", "# use the last ID in the list for Schelue operations: \n", "print(\"Published pipelines found in the workspace:\")\n", "for pub_pipeline in all_pub_pipelines:\n", " print(\"Name:\", pub_pipeline.name,\"\\tDescription:\", pub_pipeline.description, \"\\tId:\", pub_pipeline.id, \"\\tStatus:\", pub_pipeline.status)\n", " if(pub_pipeline.name == pipeline_name):\n", " published_pipeline = pub_pipeline\n", "\n", "print(\"Published pipeline id: {}\".format(published_pipeline.id))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Run pipeline through REST calls for other styles\n", "\n", "# Get AAD token" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from azureml.core.authentication import InteractiveLoginAuthentication\n", "import requests\n", "\n", "auth = InteractiveLoginAuthentication()\n", "aad_token = auth.get_authentication_header()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Get endpoint URL" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "rest_endpoint = published_pipeline.endpoint\n", "print(\"Pipeline REST endpoing: {}\".format(rest_endpoint))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Send request and monitor" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "experiment_name = 'styletransfer_parallel_candy'\n", "response = requests.post(rest_endpoint, \n", " headers=aad_token,\n", " json={\"ExperimentName\": experiment_name,\n", " \"ParameterAssignments\": {\"style\": \"candy\", \"NodeCount\": 3}})\n", "\n", "run_id = response.json()[\"Id\"]\n", "\n", "from azureml.pipeline.core.run import PipelineRun\n", "published_pipeline_run_candy = PipelineRun(ws.experiments[experiment_name], run_id)\n", "\n", "# Show detail information of run\n", "published_pipeline_run_candy" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Download output from re-run" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "published_pipeline_run_candy.wait_for_completion()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "download_video(published_pipeline_run_candy, target_dir=\"output_video_candy\")" ] } ], "metadata": { "authors": [ { "name": "sanpil joringer asraniwa pansav tracych" } ], "category": "Other notebooks", "compute": [ "AML Compute" ], "datasets": [], "deployment": [ "None" ], "exclude_from_index": true, "framework": [ "None" ], "friendly_name": "Style transfer using ParallelRunStep", "index_order": 1, "kernelspec": { "display_name": "Python 3.6", "language": "python", "name": "python36" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.9" }, "tags": [ "Batch Inferencing", "Pipeline" ], "task": "Style transfer" }, "nbformat": 4, "nbformat_minor": 2 }