{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "Copyright (c) Microsoft Corporation. All rights reserved.\n", "\n", "Licensed under the MIT License." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/machine-learning-pipelines/pipeline-style-transfer/pipeline-style-transfer.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Neural style transfer on video\n", "Using modified code from `pytorch`'s neural style [example](https://pytorch.org/tutorials/advanced/neural_style_tutorial.html), we show how to setup a pipeline for doing style transfer on video. The pipeline has following steps:\n", "1. Split a video into images\n", "2. Run neural style on each image using one of the provided models (from `pytorch` pretrained models for this example).\n", "3. Stitch the image back into a video." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Prerequisites\n", "If you are using an Azure Machine Learning Notebook VM, you are all set. Otherwise, make sure you go through the configuration Notebook located at https://github.com/Azure/MachineLearningNotebooks first if you haven't. This sets you up with a working config file that has information on your workspace, subscription id, etc. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Initialize Workspace\n", "\n", "Initialize a workspace object from persisted configuration." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "from azureml.core import Workspace, Experiment\n", "\n", "ws = Workspace.from_config()\n", "print('Workspace name: ' + ws.name, \n", " 'Azure region: ' + ws.location, \n", " 'Subscription id: ' + ws.subscription_id, \n", " 'Resource group: ' + ws.resource_group, sep = '\\n')\n", "\n", "scripts_folder = \"scripts_folder\"\n", "\n", "if not os.path.isdir(scripts_folder):\n", " os.mkdir(scripts_folder)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from azureml.core.compute import AmlCompute, ComputeTarget\n", "from azureml.core.datastore import Datastore\n", "from azureml.data.data_reference import DataReference\n", "from azureml.pipeline.core import Pipeline, PipelineData\n", "from azureml.pipeline.steps import PythonScriptStep, MpiStep\n", "from azureml.core.runconfig import CondaDependencies, RunConfiguration\n", "from azureml.core.compute_target import ComputeTargetException" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Create or use existing compute" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# AmlCompute\n", "cpu_cluster_name = \"cpu-cluster\"\n", "try:\n", " cpu_cluster = AmlCompute(ws, cpu_cluster_name)\n", " print(\"found existing cluster.\")\n", "except ComputeTargetException:\n", " print(\"creating new cluster\")\n", " provisioning_config = AmlCompute.provisioning_configuration(vm_size = \"STANDARD_D2_v2\",\n", " max_nodes = 1)\n", "\n", " # create the cluster\n", " cpu_cluster = ComputeTarget.create(ws, cpu_cluster_name, provisioning_config)\n", " cpu_cluster.wait_for_completion(show_output=True)\n", " \n", "# AmlCompute\n", "gpu_cluster_name = \"gpu-cluster\"\n", "try:\n", " gpu_cluster = AmlCompute(ws, gpu_cluster_name)\n", " print(\"found existing cluster.\")\n", "except ComputeTargetException:\n", " print(\"creating new cluster\")\n", " provisioning_config = AmlCompute.provisioning_configuration(vm_size = \"STANDARD_NC6\",\n", " max_nodes = 3)\n", "\n", " # create the cluster\n", " gpu_cluster = ComputeTarget.create(ws, gpu_cluster_name, provisioning_config)\n", " gpu_cluster.wait_for_completion(show_output=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Python Scripts\n", "We use an edited version of `neural_style_mpi.py` (original is [here](https://github.com/pytorch/examples/blob/master/fast_neural_style/neural_style/neural_style.py)). Scripts to split and stitch the video are thin wrappers to calls to `ffmpeg`. \n", "\n", "We install `ffmpeg` through conda dependencies." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import shutil\n", "shutil.copy(\"neural_style_mpi.py\", scripts_folder)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile $scripts_folder/process_video.py\n", "import argparse\n", "import glob\n", "import os\n", "import subprocess\n", "\n", "parser = argparse.ArgumentParser(description=\"Process input video\")\n", "parser.add_argument('--input_video', required=True)\n", "parser.add_argument('--output_audio', required=True)\n", "parser.add_argument('--output_images', required=True)\n", "\n", "args = parser.parse_args()\n", "\n", "os.makedirs(args.output_audio, exist_ok=True)\n", "os.makedirs(args.output_images, exist_ok=True)\n", "\n", "subprocess.run(\"ffmpeg -i {} {}/video.aac\"\n", " .format(args.input_video, args.output_audio),\n", " shell=True, check=True\n", " )\n", "\n", "subprocess.run(\"ffmpeg -i {} {}/%05d_video.jpg -hide_banner\"\n", " .format(args.input_video, args.output_images),\n", " shell=True, check=True\n", " )" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile $scripts_folder/stitch_video.py\n", "import argparse\n", "import os\n", "import subprocess\n", "\n", "parser = argparse.ArgumentParser(description=\"Process input video\")\n", "parser.add_argument('--images_dir', required=True)\n", "parser.add_argument('--input_audio', required=True)\n", "parser.add_argument('--output_dir', required=True)\n", "\n", "args = parser.parse_args()\n", "\n", "os.makedirs(args.output_dir, exist_ok=True)\n", "\n", "subprocess.run(\"ffmpeg -framerate 30 -i {}/%05d_video.jpg -c:v libx264 -profile:v high -crf 20 -pix_fmt yuv420p \"\n", " \"-y {}/video_without_audio.mp4\"\n", " .format(args.images_dir, args.output_dir),\n", " shell=True, check=True\n", " )\n", "\n", "subprocess.run(\"ffmpeg -i {}/video_without_audio.mp4 -i {}/video.aac -map 0:0 -map 1:0 -vcodec \"\n", " \"copy -acodec copy -y {}/video_with_audio.mp4\"\n", " .format(args.output_dir, args.input_audio, args.output_dir),\n", " shell=True, check=True\n", " )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The sample video **organutan.mp4** is stored at a publicly shared datastore. We are registering the datastore below. If you want to take a look at the original video, click here. (https://pipelinedata.blob.core.windows.net/sample-videos/orangutan.mp4)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# datastore for input video\n", "account_name = \"pipelinedata\"\n", "video_ds = Datastore.register_azure_blob_container(ws, \"videos\", \"sample-videos\",\n", " account_name=account_name, overwrite=True)\n", "\n", "# datastore for models\n", "models_ds = Datastore.register_azure_blob_container(ws, \"models\", \"styletransfer\", \n", " account_name=\"pipelinedata\", \n", " overwrite=True)\n", " \n", "# downloaded models from https://pytorch.org/tutorials/advanced/neural_style_tutorial.html are kept here\n", "models_dir = DataReference(data_reference_name=\"models\", datastore=models_ds, \n", " path_on_datastore=\"saved_models\", mode=\"download\")\n", "\n", "# the default blob store attached to a workspace\n", "default_datastore = ws.get_default_datastore()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Sample video" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "video_name=os.getenv(\"STYLE_TRANSFER_VIDEO_NAME\", \"orangutan.mp4\") \n", "orangutan_video = DataReference(datastore=video_ds,\n", " data_reference_name=\"video\",\n", " path_on_datastore=video_name, mode=\"download\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "cd = CondaDependencies()\n", "\n", "cd.add_channel(\"conda-forge\")\n", "cd.add_conda_package(\"ffmpeg\")\n", "\n", "cd.add_channel(\"pytorch\")\n", "cd.add_conda_package(\"pytorch\")\n", "cd.add_conda_package(\"torchvision\")\n", "\n", "# Runconfig\n", "amlcompute_run_config = RunConfiguration(conda_dependencies=cd)\n", "amlcompute_run_config.environment.docker.enabled = True\n", "amlcompute_run_config.environment.docker.gpu_support = True\n", "amlcompute_run_config.environment.docker.base_image = \"pytorch/pytorch\"\n", "amlcompute_run_config.environment.spark.precache_packages = False" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ffmpeg_audio = PipelineData(name=\"ffmpeg_audio\", datastore=default_datastore)\n", "ffmpeg_images = PipelineData(name=\"ffmpeg_images\", datastore=default_datastore)\n", "processed_images = PipelineData(name=\"processed_images\", datastore=default_datastore)\n", "output_video = PipelineData(name=\"output_video\", datastore=default_datastore)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Define tweakable parameters to pipeline\n", "These parameters can be changed when the pipeline is published and rerun from a REST call" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from azureml.pipeline.core.graph import PipelineParameter\n", "# create a parameter for style (one of \"candy\", \"mosaic\", \"rain_princess\", \"udnie\") to transfer the images to\n", "style_param = PipelineParameter(name=\"style\", default_value=\"mosaic\")\n", "# create a parameter for the number of nodes to use in step no. 2 (style transfer)\n", "nodecount_param = PipelineParameter(name=\"nodecount\", default_value=1)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "split_video_step = PythonScriptStep(\n", " name=\"split video\",\n", " script_name=\"process_video.py\",\n", " arguments=[\"--input_video\", orangutan_video,\n", " \"--output_audio\", ffmpeg_audio,\n", " \"--output_images\", ffmpeg_images,\n", " ],\n", " compute_target=cpu_cluster,\n", " inputs=[orangutan_video],\n", " outputs=[ffmpeg_images, ffmpeg_audio],\n", " runconfig=amlcompute_run_config,\n", " source_directory=scripts_folder\n", ")\n", "\n", "# create a MPI step for distributing style transfer step across multiple nodes in AmlCompute \n", "# using 'nodecount_param' PipelineParameter\n", "distributed_style_transfer_step = MpiStep(\n", " name=\"mpi style transfer\",\n", " script_name=\"neural_style_mpi.py\",\n", " arguments=[\"--content-dir\", ffmpeg_images,\n", " \"--output-dir\", processed_images,\n", " \"--model-dir\", models_dir,\n", " \"--style\", style_param,\n", " \"--cuda\", 1\n", " ],\n", " compute_target=gpu_cluster,\n", " node_count=nodecount_param, \n", " process_count_per_node=1,\n", " inputs=[models_dir, ffmpeg_images],\n", " outputs=[processed_images],\n", " pip_packages=[\"mpi4py\", \"torch\", \"torchvision\"],\n", " use_gpu=True,\n", " source_directory=scripts_folder\n", ")\n", "\n", "stitch_video_step = PythonScriptStep(\n", " name=\"stitch\",\n", " script_name=\"stitch_video.py\",\n", " arguments=[\"--images_dir\", processed_images, \n", " \"--input_audio\", ffmpeg_audio, \n", " \"--output_dir\", output_video],\n", " compute_target=cpu_cluster,\n", " inputs=[processed_images, ffmpeg_audio],\n", " outputs=[output_video],\n", " runconfig=amlcompute_run_config,\n", " source_directory=scripts_folder\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Run the pipeline" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pipeline = Pipeline(workspace=ws, steps=[stitch_video_step])\n", "# submit the pipeline and provide values for the PipelineParameters used in the pipeline\n", "pipeline_run = Experiment(ws, 'style_transfer').submit(pipeline, pipeline_parameters={\"style\": \"mosaic\", \"nodecount\": 3})" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Monitor using widget" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from azureml.widgets import RunDetails\n", "RunDetails(pipeline_run).show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Downloads the video in `output_video` folder" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Download output video" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def download_video(run, target_dir=None):\n", " stitch_run = run.find_step_run(\"stitch\")[0]\n", " port_data = stitch_run.get_output_data(\"output_video\")\n", " port_data.download(target_dir, show_progress=True)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pipeline_run.wait_for_completion()\n", "download_video(pipeline_run, \"output_video_mosaic\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Publish pipeline" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "published_pipeline = pipeline_run.publish_pipeline(\n", " name=\"batch score style transfer\", description=\"style transfer\", version=\"1.0\")\n", "\n", "published_pipeline" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Get published pipeline\n", "\n", "You can get the published pipeline using **pipeline id**.\n", "\n", "To get all the published pipelines for a given workspace(ws): \n", "```css\n", "all_pub_pipelines = PublishedPipeline.get_all(ws)\n", "```" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from azureml.pipeline.core import PublishedPipeline\n", "\n", "pipeline_id = published_pipeline.id # use your published pipeline id\n", "published_pipeline = PublishedPipeline.get(ws, pipeline_id)\n", "\n", "published_pipeline" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Re-run pipeline through REST calls for other styles" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Get AAD token\n", "[This notebook](https://aka.ms/pl-restep-auth) shows how to authenticate to AML workspace." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from azureml.core.authentication import InteractiveLoginAuthentication\n", "import requests\n", "\n", "auth = InteractiveLoginAuthentication()\n", "aad_token = auth.get_authentication_header()\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Get endpoint URL" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "rest_endpoint = published_pipeline.endpoint" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Send request and monitor" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# run the pipeline using PipelineParameter values style='candy' and nodecount=2\n", "response = requests.post(rest_endpoint, \n", " headers=aad_token,\n", " json={\"ExperimentName\": \"style_transfer\",\n", " \"ParameterAssignments\": {\"style\": \"candy\", \"nodecount\": 2}}) \n", "run_id = response.json()[\"Id\"]\n", "\n", "from azureml.pipeline.core.run import PipelineRun\n", "published_pipeline_run_candy = PipelineRun(ws.experiments[\"style_transfer\"], run_id)\n", "\n", "RunDetails(published_pipeline_run_candy).show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# run the pipeline using PipelineParameter values style='rain_princess' and nodecount=3\n", "response = requests.post(rest_endpoint, \n", " headers=aad_token,\n", " json={\"ExperimentName\": \"style_transfer\",\n", " \"ParameterAssignments\": {\"style\": \"rain_princess\", \"nodecount\": 3}}) \n", "run_id = response.json()[\"Id\"]\n", "\n", "published_pipeline_run_rain = PipelineRun(ws.experiments[\"style_transfer\"], run_id)\n", "\n", "RunDetails(published_pipeline_run_rain).show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# run the pipeline using PipelineParameter values style='udnie' and nodecount=4\n", "response = requests.post(rest_endpoint, \n", " headers=aad_token,\n", " json={\"ExperimentName\": \"style_transfer\",\n", " \"ParameterAssignments\": {\"style\": \"udnie\", \"nodecount\": 3}}) \n", "run_id = response.json()[\"Id\"]\n", "\n", "published_pipeline_run_udnie = PipelineRun(ws.experiments[\"style_transfer\"], run_id)\n", "\n", "RunDetails(published_pipeline_run_udnie).show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Download output from re-run" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "published_pipeline_run_candy.wait_for_completion()\n", "published_pipeline_run_rain.wait_for_completion()\n", "published_pipeline_run_udnie.wait_for_completion()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "download_video(published_pipeline_run_candy, target_dir=\"output_video_candy\")\n", "download_video(published_pipeline_run_rain, target_dir=\"output_video_rain_princess\")\n", "download_video(published_pipeline_run_udnie, target_dir=\"output_video_udnie\")" ] } ], "metadata": { "authors": [ { "name": "sanpil" } ], "kernelspec": { "display_name": "Python 3.6", "language": "python", "name": "python36" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.7" } }, "nbformat": 4, "nbformat_minor": 2 }