Compare commits

...

14 Commits

Author SHA1 Message Date
Roope Astala
4a6bcebccc Update configuration.ipynb 2019-06-21 09:35:13 -04:00
Roope Astala
56e0ebc5ac Merge pull request #438 from rastala/master
add pipeline scripts
2019-06-19 18:56:42 -04:00
rastala
2aa39f2f4a add pipeline scripts 2019-06-19 18:55:32 -04:00
Roope Astala
4d247c1877 Merge pull request #437 from rastala/master
pytorch with mlflow
2019-06-19 17:23:06 -04:00
rastala
f6682f6f6d pytorch with mlflow 2019-06-19 17:21:52 -04:00
Roope Astala
26ecf25233 Merge pull request #436 from rastala/master
Update readme
2019-06-19 11:52:23 -04:00
Roope Astala
44c3a486c0 update readme 2019-06-19 11:49:49 -04:00
Roope Astala
c574f429b8 update readme 2019-06-19 11:48:52 -04:00
Roope Astala
77d557a5dc Merge pull request #435 from ganzhi/jamgan/drift
Add demo notebook for AML Data Drift
2019-06-17 16:39:46 -04:00
James Gan
13dedec4a4 Make it in same folder as internal repo 2019-06-17 13:38:27 -07:00
James Gan
6f5c52676f Add notebook to demo data drift 2019-06-17 13:33:30 -07:00
James Gan
90c105537c Add demo notebook for AML Data Drift 2019-06-17 13:31:08 -07:00
Roope Astala
ef264b1073 Merge pull request #434 from rastala/master
update pytorch
2019-06-17 11:57:29 -04:00
Roope Astala
824ac5e021 update pytorch 2019-06-17 11:56:42 -04:00
17 changed files with 2522 additions and 748 deletions

View File

@@ -4,6 +4,10 @@ This repository contains example notebooks demonstrating the [Azure Machine Lear
![Azure ML workflow](https://raw.githubusercontent.com/MicrosoftDocs/azure-docs/master/articles/machine-learning/service/media/overview-what-is-azure-ml/aml.png) ![Azure ML workflow](https://raw.githubusercontent.com/MicrosoftDocs/azure-docs/master/articles/machine-learning/service/media/overview-what-is-azure-ml/aml.png)
## News
* [Try Azure Machine Learning with MLflow](./how-to-use-azureml/using-mlflow)
## Quick installation ## Quick installation
```sh ```sh
pip install azureml-sdk pip install azureml-sdk

View File

@@ -258,7 +258,7 @@
"```shell\n", "```shell\n",
"az vm list-skus -o tsv\n", "az vm list-skus -o tsv\n",
"```\n", "```\n",
"* min_nodes - this sets the minimum size of the cluster. If you set the minimum to 0 the cluster will shut down all nodes while note in use. Setting this number to a value higher than 0 will allow for faster start-up times, but you will also be billed when the cluster is not in use.\n", "* min_nodes - this sets the minimum size of the cluster. If you set the minimum to 0 the cluster will shut down all nodes while not in use. Setting this number to a value higher than 0 will allow for faster start-up times, but you will also be billed when the cluster is not in use.\n",
"* max_nodes - this sets the maximum size of the cluster. Setting this to a larger number allows for more concurrency and a greater distributed processing of scale-out jobs.\n", "* max_nodes - this sets the maximum size of the cluster. Setting this to a larger number allows for more concurrency and a greater distributed processing of scale-out jobs.\n",
"\n", "\n",
"\n", "\n",
@@ -380,4 +380,4 @@
}, },
"nbformat": 4, "nbformat": 4,
"nbformat_minor": 2 "nbformat_minor": 2
} }

View File

@@ -0,0 +1,709 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Track Data Drift between Training and Inference Data in Production \n",
"\n",
"With this notebook, you will learn how to enable the DataDrift service to automatically track and determine whether your inference data is drifting from the data your model was initially trained on. The DataDrift service provides metrics and visualizations to help stakeholders identify which specific features cause the concept drift to occur.\n",
"\n",
"Please email driftfeedback@microsoft.com with any issues. A member from the DataDrift team will respond shortly. \n",
"\n",
"The DataDrift Public Preview API can be found [here](https://docs.microsoft.com/en-us/python/api/azureml-contrib-datadrift/?view=azure-ml-py). "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/contrib/datadrift/azureml-datadrift.png)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Prerequisites and Setup"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Install the DataDrift package\n",
"\n",
"Install the azureml-contrib-datadrift, azureml-contrib-opendatasets and lightgbm packages before running this notebook.\n",
"```\n",
"pip install azureml-contrib-datadrift\n",
"pip install azureml-contrib-datasets\n",
"pip install lightgbm\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Import Dependencies"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import json\n",
"import os\n",
"import time\n",
"from datetime import datetime, timedelta\n",
"\n",
"import numpy as np\n",
"import pandas as pd\n",
"import requests\n",
"from azureml.contrib.datadrift import DataDriftDetector, AlertConfiguration\n",
"from azureml.contrib.opendatasets import NoaaIsdWeather\n",
"from azureml.core import Dataset, Workspace, Run\n",
"from azureml.core.compute import AksCompute, ComputeTarget\n",
"from azureml.core.conda_dependencies import CondaDependencies\n",
"from azureml.core.experiment import Experiment\n",
"from azureml.core.image import ContainerImage\n",
"from azureml.core.model import Model\n",
"from azureml.core.webservice import Webservice, AksWebservice\n",
"from azureml.widgets import RunDetails\n",
"from sklearn.externals import joblib\n",
"from sklearn.model_selection import train_test_split\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Set up Configuraton and Create Azure ML Workspace\n",
"\n",
"If you are using an Azure Machine Learning Notebook VM, you are all set. Otherwise, go through the [configuration notebook](../../../configuration.ipynb) first if you haven't already to establish your connection to the AzureML Workspace."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Please type in your initials/alias. The prefix is prepended to the names of resources created by this notebook. \n",
"prefix = \"dd\"\n",
"\n",
"# NOTE: Please do not change the model_name, as it's required by the score.py file\n",
"model_name = \"driftmodel\"\n",
"image_name = \"{}driftimage\".format(prefix)\n",
"service_name = \"{}driftservice\".format(prefix)\n",
"\n",
"# optionally, set email address to receive an email alert for DataDrift\n",
"email_address = \"\""
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ws = Workspace.from_config()\n",
"print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep = '\\n')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Generate Train/Testing Data\n",
"\n",
"For this demo, we will use NOAA weather data from [Azure Open Datasets](https://azure.microsoft.com/services/open-datasets/). You may replace this step with your own dataset. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"usaf_list = ['725724', '722149', '723090', '722159', '723910', '720279',\n",
" '725513', '725254', '726430', '720381', '723074', '726682',\n",
" '725486', '727883', '723177', '722075', '723086', '724053',\n",
" '725070', '722073', '726060', '725224', '725260', '724520',\n",
" '720305', '724020', '726510', '725126', '722523', '703333',\n",
" '722249', '722728', '725483', '722972', '724975', '742079',\n",
" '727468', '722193', '725624', '722030', '726380', '720309',\n",
" '722071', '720326', '725415', '724504', '725665', '725424',\n",
" '725066']\n",
"\n",
"columns = ['usaf', 'wban', 'datetime', 'latitude', 'longitude', 'elevation', 'windAngle', 'windSpeed', 'temperature', 'stationName', 'p_k']\n",
"\n",
"\n",
"def enrich_weather_noaa_data(noaa_df):\n",
" hours_in_day = 23\n",
" week_in_year = 52\n",
" \n",
" noaa_df[\"hour\"] = noaa_df[\"datetime\"].dt.hour\n",
" noaa_df[\"weekofyear\"] = noaa_df[\"datetime\"].dt.week\n",
" \n",
" noaa_df[\"sine_weekofyear\"] = noaa_df['datetime'].transform(lambda x: np.sin((2*np.pi*x.dt.week-1)/week_in_year))\n",
" noaa_df[\"cosine_weekofyear\"] = noaa_df['datetime'].transform(lambda x: np.cos((2*np.pi*x.dt.week-1)/week_in_year))\n",
"\n",
" noaa_df[\"sine_hourofday\"] = noaa_df['datetime'].transform(lambda x: np.sin(2*np.pi*x.dt.hour/hours_in_day))\n",
" noaa_df[\"cosine_hourofday\"] = noaa_df['datetime'].transform(lambda x: np.cos(2*np.pi*x.dt.hour/hours_in_day))\n",
" \n",
" return noaa_df\n",
"\n",
"def add_window_col(input_df):\n",
" shift_interval = pd.Timedelta('-7 days') # your X days interval\n",
" df_shifted = input_df.copy()\n",
" df_shifted['datetime'] = df_shifted['datetime'] - shift_interval\n",
" df_shifted.drop(list(input_df.columns.difference(['datetime', 'usaf', 'wban', 'sine_hourofday', 'temperature'])), axis=1, inplace=True)\n",
"\n",
" # merge, keeping only observations where -1 lag is present\n",
" df2 = pd.merge(input_df,\n",
" df_shifted,\n",
" on=['datetime', 'usaf', 'wban', 'sine_hourofday'],\n",
" how='inner', # use 'left' to keep observations without lags\n",
" suffixes=['', '-7'])\n",
" return df2\n",
"\n",
"def get_noaa_data(start_time, end_time, cols, station_list):\n",
" isd = NoaaIsdWeather(start_time, end_time, cols=cols)\n",
" # Read into Pandas data frame.\n",
" noaa_df = isd.to_pandas_dataframe()\n",
" noaa_df = noaa_df.rename(columns={\"stationName\": \"station_name\"})\n",
" \n",
" df_filtered = noaa_df[noaa_df[\"usaf\"].isin(station_list)]\n",
" df_filtered.reset_index(drop=True)\n",
" \n",
" # Enrich with time features\n",
" df_enriched = enrich_weather_noaa_data(df_filtered)\n",
" \n",
" return df_enriched\n",
"\n",
"def get_featurized_noaa_df(start_time, end_time, cols, station_list):\n",
" df_1 = get_noaa_data(start_time - timedelta(days=7), start_time - timedelta(seconds=1), cols, station_list)\n",
" df_2 = get_noaa_data(start_time, end_time, cols, station_list)\n",
" noaa_df = pd.concat([df_1, df_2])\n",
" \n",
" print(\"Adding window feature\")\n",
" df_window = add_window_col(noaa_df)\n",
" \n",
" cat_columns = df_window.dtypes == object\n",
" cat_columns = cat_columns[cat_columns == True]\n",
" \n",
" print(\"Encoding categorical columns\")\n",
" df_encoded = pd.get_dummies(df_window, columns=cat_columns.keys().tolist())\n",
" \n",
" print(\"Dropping unnecessary columns\")\n",
" df_featurized = df_encoded.drop(['windAngle', 'windSpeed', 'datetime', 'elevation'], axis=1).dropna().drop_duplicates()\n",
" \n",
" return df_featurized"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Train model on Jan 1 - 14, 2009 data\n",
"df = get_featurized_noaa_df(datetime(2009, 1, 1), datetime(2009, 1, 14, 23, 59, 59), columns, usaf_list)\n",
"df.head()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"label = \"temperature\"\n",
"x_df = df.drop(label, axis=1)\n",
"y_df = df[[label]]\n",
"x_train, x_test, y_train, y_test = train_test_split(df, y_df, test_size=0.2, random_state=223)\n",
"print(x_train.shape, x_test.shape, y_train.shape, y_test.shape)\n",
"\n",
"training_dir = 'outputs/training'\n",
"training_file = \"training.csv\"\n",
"\n",
"# Generate training dataframe to register as Training Dataset\n",
"os.makedirs(training_dir, exist_ok=True)\n",
"training_df = pd.merge(x_train.drop(label, axis=1), y_train, left_index=True, right_index=True)\n",
"training_df.to_csv(training_dir + \"/\" + training_file)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Create/Register Training Dataset"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"dataset_name = \"dataset\"\n",
"name_suffix = datetime.utcnow().strftime(\"%Y-%m-%d-%H-%M-%S\")\n",
"snapshot_name = \"snapshot-{}\".format(name_suffix)\n",
"\n",
"dstore = ws.get_default_datastore()\n",
"dstore.upload(training_dir, \"data/training\", show_progress=True)\n",
"dpath = dstore.path(\"data/training/training.csv\")\n",
"trainingDataset = Dataset.auto_read_files(dpath, include_path=True)\n",
"trainingDataset = trainingDataset.register(workspace=ws, name=dataset_name, description=\"dset\", exist_ok=True)\n",
"\n",
"trainingDataSnapshot = trainingDataset.create_snapshot(snapshot_name=snapshot_name, compute_target=None, create_data_snapshot=True)\n",
"datasets = [(Dataset.Scenario.TRAINING, trainingDataSnapshot)]\n",
"print(\"dataset registration done.\\n\")\n",
"datasets"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Train and Save Model"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import lightgbm as lgb\n",
"\n",
"train = lgb.Dataset(data=x_train, \n",
" label=y_train)\n",
"\n",
"test = lgb.Dataset(data=x_test, \n",
" label=y_test,\n",
" reference=train)\n",
"\n",
"params = {'learning_rate' : 0.1,\n",
" 'boosting' : 'gbdt',\n",
" 'metric' : 'rmse',\n",
" 'feature_fraction' : 1,\n",
" 'bagging_fraction' : 1,\n",
" 'max_depth': 6,\n",
" 'num_leaves' : 31,\n",
" 'objective' : 'regression',\n",
" 'bagging_freq' : 1,\n",
" \"verbose\": -1,\n",
" 'min_data_per_leaf': 100}\n",
"\n",
"model = lgb.train(params, \n",
" num_boost_round=500,\n",
" train_set=train,\n",
" valid_sets=[train, test],\n",
" verbose_eval=50,\n",
" early_stopping_rounds=25)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"model_file = 'outputs/{}.pkl'.format(model_name)\n",
"\n",
"os.makedirs('outputs', exist_ok=True)\n",
"joblib.dump(model, model_file)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Register Model"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"model = Model.register(model_path=model_file,\n",
" model_name=model_name,\n",
" workspace=ws,\n",
" datasets=datasets)\n",
"\n",
"print(model_name, image_name, service_name, model)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Deploy Model To AKS"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Prepare Environment"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"myenv = CondaDependencies.create(conda_packages=['numpy','scikit-learn', 'joblib', 'lightgbm', 'pandas'],\n",
" pip_packages=['azureml-monitoring', 'azureml-sdk[automl]'])\n",
"\n",
"with open(\"myenv.yml\",\"w\") as f:\n",
" f.write(myenv.serialize_to_string())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Create Image"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Image creation may take up to 15 minutes.\n",
"\n",
"image_name = image_name + str(model.version)\n",
"\n",
"if not image_name in ws.images:\n",
" # Use the score.py defined in this directory as the execution script\n",
" # NOTE: The Model Data Collector must be enabled in the execution script for DataDrift to run correctly\n",
" image_config = ContainerImage.image_configuration(execution_script=\"score.py\",\n",
" runtime=\"python\",\n",
" conda_file=\"myenv.yml\",\n",
" description=\"Image with weather dataset model\")\n",
" image = ContainerImage.create(name=image_name,\n",
" models=[model],\n",
" image_config=image_config,\n",
" workspace=ws)\n",
"\n",
" image.wait_for_creation(show_output=True)\n",
"else:\n",
" image = ws.images[image_name]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Create Compute Target"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"aks_name = 'dd-demo-e2e'\n",
"prov_config = AksCompute.provisioning_configuration()\n",
"\n",
"if not aks_name in ws.compute_targets:\n",
" aks_target = ComputeTarget.create(workspace=ws,\n",
" name=aks_name,\n",
" provisioning_configuration=prov_config)\n",
"\n",
" aks_target.wait_for_completion(show_output=True)\n",
" print(aks_target.provisioning_state)\n",
" print(aks_target.provisioning_errors)\n",
"else:\n",
" aks_target=ws.compute_targets[aks_name]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Deploy Service"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"aks_service_name = service_name\n",
"\n",
"if not aks_service_name in ws.webservices:\n",
" aks_config = AksWebservice.deploy_configuration(collect_model_data=True, enable_app_insights=True)\n",
" aks_service = Webservice.deploy_from_image(workspace=ws,\n",
" name=aks_service_name,\n",
" image=image,\n",
" deployment_config=aks_config,\n",
" deployment_target=aks_target)\n",
" aks_service.wait_for_deployment(show_output=True)\n",
" print(aks_service.state)\n",
"else:\n",
" aks_service = ws.webservices[aks_service_name]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Run DataDrift Analysis"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Send Scoring Data to Service"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Download Scoring Data"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Score Model on March 15, 2016 data\n",
"scoring_df = get_noaa_data(datetime(2016, 3, 15) - timedelta(days=7), datetime(2016, 3, 16), columns, usaf_list)\n",
"# Add the window feature column\n",
"scoring_df = add_window_col(scoring_df)\n",
"\n",
"# Drop features not used by the model\n",
"print(\"Dropping unnecessary columns\")\n",
"scoring_df = scoring_df.drop(['windAngle', 'windSpeed', 'datetime', 'elevation'], axis=1).dropna()\n",
"scoring_df.head()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# One Hot Encode the scoring dataset to match the training dataset schema\n",
"columns_dict = model.datasets[\"training\"][0].get_profile().columns\n",
"extra_cols = ('Path', 'Column1')\n",
"for k in extra_cols:\n",
" columns_dict.pop(k, None)\n",
"training_columns = list(columns_dict.keys())\n",
"\n",
"categorical_columns = scoring_df.dtypes == object\n",
"categorical_columns = categorical_columns[categorical_columns == True]\n",
"\n",
"test_df = pd.get_dummies(scoring_df[categorical_columns.keys().tolist()])\n",
"encoded_df = scoring_df.join(test_df)\n",
"\n",
"# Populate missing OHE columns with 0 values to match traning dataset schema\n",
"difference = list(set(training_columns) - set(encoded_df.columns.tolist()))\n",
"for col in difference:\n",
" encoded_df[col] = 0\n",
"encoded_df.head()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Serialize dataframe to list of row dictionaries\n",
"encoded_dict = encoded_df.to_dict('records')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Submit Scoring Data to Service"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"\n",
"# retreive the API keys. AML generates two keys.\n",
"key1, key2 = aks_service.get_keys()\n",
"\n",
"total_count = len(scoring_df)\n",
"i = 0\n",
"load = []\n",
"for row in encoded_dict:\n",
" load.append(row)\n",
" i = i + 1\n",
" if i % 100 == 0:\n",
" payload = json.dumps({\"data\": load})\n",
" \n",
" # construct raw HTTP request and send to the service\n",
" payload_binary = bytes(payload,encoding = 'utf8')\n",
" headers = {'Content-Type':'application/json', 'Authorization': 'Bearer ' + key1}\n",
" resp = requests.post(aks_service.scoring_uri, payload_binary, headers=headers)\n",
" \n",
" print(\"prediction:\", resp.content, \"Progress: {}/{}\".format(i, total_count)) \n",
"\n",
" load = []\n",
" time.sleep(3)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Configure DataDrift"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"services = [service_name]\n",
"start = datetime.now() - timedelta(days=2)\n",
"end = datetime(year=2020, month=1, day=22, hour=15, minute=16)\n",
"feature_list = ['usaf', 'wban', 'latitude', 'longitude', 'station_name', 'p_k', 'sine_hourofday', 'cosine_hourofday', 'temperature-7']\n",
"alert_config = AlertConfiguration([email_address]) if email_address else None\n",
"\n",
"# there will be an exception indicating using get() method if DataDrift object already exist\n",
"try:\n",
" datadrift = DataDriftDetector.create(ws, model.name, model.version, services, frequency=\"Day\", alert_config=alert_config)\n",
"except KeyError:\n",
" datadrift = DataDriftDetector.get(ws, model.name, model.version)\n",
" \n",
"print(\"Details of DataDrift Object:\\n{}\".format(datadrift))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Run an Adhoc DataDriftDetector Run"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"target_date = datetime.today()\n",
"run = datadrift.run(target_date, services, feature_list=feature_list, create_compute_target=True)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"exp = Experiment(ws, datadrift._id)\n",
"dd_run = Run(experiment=exp, run_id=run)\n",
"RunDetails(dd_run).show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Get Drift Analysis Results"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"children = list(dd_run.get_children())\n",
"for child in children:\n",
" child.wait_for_completion()\n",
"\n",
"drift_metrics = datadrift.get_output(start_time=start, end_time=end)\n",
"drift_metrics"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Show all drift figures, one per serivice.\n",
"# If setting with_details is False (by default), only drift will be shown; if it's True, all details will be shown.\n",
"\n",
"drift_figures = datadrift.show(with_details=True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Enable DataDrift Schedule"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"datadrift.enable_schedule()"
]
}
],
"metadata": {
"authors": [
{
"name": "rafarmah"
}
],
"kernelspec": {
"display_name": "Python 3.6",
"language": "python",
"name": "python36"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.6"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

View File

@@ -1 +1,3 @@
Under contruction...please visit again soon! ## Using data drift APIs
1. [Detect data drift for a model](azure-ml-datadrift.ipynb): Detect data drift for a deployed model.

View File

@@ -0,0 +1,58 @@
import pickle
import json
import numpy
import azureml.train.automl
from sklearn.externals import joblib
from sklearn.linear_model import Ridge
from azureml.core.model import Model
from azureml.core.run import Run
from azureml.monitoring import ModelDataCollector
import time
import pandas as pd
def init():
global model, inputs_dc, prediction_dc, feature_names, categorical_features
print("Model is initialized" + time.strftime("%H:%M:%S"))
model_path = Model.get_model_path(model_name="driftmodel")
model = joblib.load(model_path)
feature_names = ["usaf", "wban", "latitude", "longitude", "station_name", "p_k",
"sine_weekofyear", "cosine_weekofyear", "sine_hourofday", "cosine_hourofday",
"temperature-7"]
categorical_features = ["usaf", "wban", "p_k", "station_name"]
inputs_dc = ModelDataCollector(model_name="driftmodel",
identifier="inputs",
feature_names=feature_names)
prediction_dc = ModelDataCollector("driftmodel",
identifier="predictions",
feature_names=["temperature"])
def run(raw_data):
global inputs_dc, prediction_dc
try:
data = json.loads(raw_data)["data"]
data = pd.DataFrame(data)
# Remove the categorical features as the model expects OHE values
input_data = data.drop(categorical_features, axis=1)
result = model.predict(input_data)
# Collect the non-OHE dataframe
collected_df = data[feature_names]
inputs_dc.collect(collected_df.values)
prediction_dc.collect(result)
return result.tolist()
except Exception as e:
error = str(e)
print(error + time.strftime("%H:%M:%S"))
return error

View File

@@ -0,0 +1,58 @@
# Copyright (c) Microsoft. All rights reserved.
# Licensed under the MIT license.
import argparse
import os
import pandas as pd
import azureml.dataprep as dprep
def get_dict(dict_str):
pairs = dict_str.strip("{}").split("\;")
new_dict = {}
for pair in pairs:
key, value = pair.strip('\\').split(":")
new_dict[key.strip().strip("'")] = value.strip().strip("'")
return new_dict
print("Cleans the input data")
parser = argparse.ArgumentParser("cleanse")
parser.add_argument("--input_cleanse", type=str, help="raw taxi data")
parser.add_argument("--output_cleanse", type=str, help="cleaned taxi data directory")
parser.add_argument("--useful_columns", type=str, help="useful columns to keep")
parser.add_argument("--columns", type=str, help="rename column pattern")
args = parser.parse_args()
print("Argument 1(input taxi data path): %s" % args.input_cleanse)
print("Argument 2(columns to keep): %s" % str(args.useful_columns.strip("[]").split("\;")))
print("Argument 3(columns renaming mapping): %s" % str(args.columns.strip("{}").split("\;")))
print("Argument 4(output cleansed taxi data path): %s" % args.output_cleanse)
raw_df = dprep.read_csv(path=args.input_cleanse, header=dprep.PromoteHeadersMode.GROUPED)
# These functions ensure that null data is removed from the data set,
# which will help increase machine learning model accuracy.
# Visit https://docs.microsoft.com/en-us/azure/machine-learning/service/tutorial-data-prep
# for more details
useful_columns = [s.strip().strip("'") for s in args.useful_columns.strip("[]").split("\;")]
columns = get_dict(args.columns)
all_columns = dprep.ColumnSelector(term=".*", use_regex=True)
drop_if_all_null = [all_columns, dprep.ColumnRelationship(dprep.ColumnRelationship.ALL)]
new_df = (raw_df
.replace_na(columns=all_columns)
.drop_nulls(*drop_if_all_null)
.rename_columns(column_pairs=columns)
.keep_columns(columns=useful_columns))
if not (args.output_cleanse is None):
os.makedirs(args.output_cleanse, exist_ok=True)
print("%s created" % args.output_cleanse)
write_df = new_df.write_to_csv(directory_path=dprep.LocalFileOutput(args.output_cleanse))
write_df.run_local()

View File

@@ -0,0 +1,55 @@
import argparse
import os
import azureml.dataprep as dprep
print("Filters out coordinates for locations that are outside the city border.",
"Chain the column filter commands within the filter() function",
"and define the minimum and maximum bounds for each field.")
parser = argparse.ArgumentParser("filter")
parser.add_argument("--input_filter", type=str, help="merged taxi data directory")
parser.add_argument("--output_filter", type=str, help="filter out out of city locations")
args = parser.parse_args()
print("Argument 1(input taxi data path): %s" % args.input_filter)
print("Argument 2(output filtered taxi data path): %s" % args.output_filter)
combined_df = dprep.read_csv(args.input_filter + '/part-*')
# These functions filter out coordinates for locations that are outside the city border.
# Visit https://docs.microsoft.com/en-us/azure/machine-learning/service/tutorial-data-prep for more details
# Create a condensed view of the dataflow to just show the lat/long fields,
# which makes it easier to evaluate missing or out-of-scope coordinates
decimal_type = dprep.TypeConverter(data_type=dprep.FieldType.DECIMAL)
combined_df = combined_df.set_column_types(type_conversions={
"pickup_longitude": decimal_type,
"pickup_latitude": decimal_type,
"dropoff_longitude": decimal_type,
"dropoff_latitude": decimal_type
})
# Filter out coordinates for locations that are outside the city border.
# Chain the column filter commands within the filter() function
# and define the minimum and maximum bounds for each field
latlong_filtered_df = (combined_df
.drop_nulls(columns=["pickup_longitude",
"pickup_latitude",
"dropoff_longitude",
"dropoff_latitude"],
column_relationship=dprep.ColumnRelationship(dprep.ColumnRelationship.ANY))
.filter(dprep.f_and(dprep.col("pickup_longitude") <= -73.72,
dprep.col("pickup_longitude") >= -74.09,
dprep.col("pickup_latitude") <= 40.88,
dprep.col("pickup_latitude") >= 40.53,
dprep.col("dropoff_longitude") <= -73.72,
dprep.col("dropoff_longitude") >= -74.09,
dprep.col("dropoff_latitude") <= 40.88,
dprep.col("dropoff_latitude") >= 40.53)))
if not (args.output_filter is None):
os.makedirs(args.output_filter, exist_ok=True)
print("%s created" % args.output_filter)
write_df = latlong_filtered_df.write_to_csv(directory_path=dprep.LocalFileOutput(args.output_filter))
write_df.run_local()

View File

@@ -0,0 +1,29 @@
import argparse
import os
import azureml.dataprep as dprep
print("Merge Green and Yellow taxi data")
parser = argparse.ArgumentParser("merge")
parser.add_argument("--input_green_merge", type=str, help="cleaned green taxi data directory")
parser.add_argument("--input_yellow_merge", type=str, help="cleaned yellow taxi data directory")
parser.add_argument("--output_merge", type=str, help="green and yellow taxi data merged")
args = parser.parse_args()
print("Argument 1(input green taxi data path): %s" % args.input_green_merge)
print("Argument 2(input yellow taxi data path): %s" % args.input_yellow_merge)
print("Argument 3(output merge taxi data path): %s" % args.output_merge)
green_df = dprep.read_csv(args.input_green_merge + '/part-*')
yellow_df = dprep.read_csv(args.input_yellow_merge + '/part-*')
# Appending yellow data to green data
combined_df = green_df.append_rows([yellow_df])
if not (args.output_merge is None):
os.makedirs(args.output_merge, exist_ok=True)
print("%s created" % args.output_merge)
write_df = combined_df.write_to_csv(directory_path=dprep.LocalFileOutput(args.output_merge))
write_df.run_local()

View File

@@ -0,0 +1,47 @@
import argparse
import os
import azureml.dataprep as dprep
print("Replace undefined values to relavant values and rename columns to meaningful names")
parser = argparse.ArgumentParser("normalize")
parser.add_argument("--input_normalize", type=str, help="combined and converted taxi data")
parser.add_argument("--output_normalize", type=str, help="replaced undefined values and renamed columns")
args = parser.parse_args()
print("Argument 1(input taxi data path): %s" % args.input_normalize)
print("Argument 2(output normalized taxi data path): %s" % args.output_normalize)
combined_converted_df = dprep.read_csv(args.input_normalize + '/part-*')
# These functions replace undefined values and rename to use meaningful names.
# Visit https://docs.microsoft.com/en-us/azure/machine-learning/service/tutorial-data-prep for more details
replaced_stfor_vals_df = combined_converted_df.replace(columns="store_forward",
find="0",
replace_with="N").fill_nulls("store_forward", "N")
replaced_distance_vals_df = replaced_stfor_vals_df.replace(columns="distance",
find=".00",
replace_with=0).fill_nulls("distance", 0)
replaced_distance_vals_df = replaced_distance_vals_df.to_number(["distance"])
time_split_df = (replaced_distance_vals_df
.split_column_by_example(source_column="pickup_datetime")
.split_column_by_example(source_column="dropoff_datetime"))
# Split the pickup and dropoff datetime values into the respective date and time columns
renamed_col_df = (time_split_df
.rename_columns(column_pairs={
"pickup_datetime_1": "pickup_date",
"pickup_datetime_2": "pickup_time",
"dropoff_datetime_1": "dropoff_date",
"dropoff_datetime_2": "dropoff_time"}))
if not (args.output_normalize is None):
os.makedirs(args.output_normalize, exist_ok=True)
print("%s created" % args.output_normalize)
write_df = renamed_col_df.write_to_csv(directory_path=dprep.LocalFileOutput(args.output_normalize))
write_df.run_local()

View File

@@ -0,0 +1,88 @@
import argparse
import os
import azureml.dataprep as dprep
print("Transforms the renamed taxi data to the required format")
parser = argparse.ArgumentParser("transform")
parser.add_argument("--input_transform", type=str, help="renamed taxi data")
parser.add_argument("--output_transform", type=str, help="transformed taxi data")
args = parser.parse_args()
print("Argument 1(input taxi data path): %s" % args.input_transform)
print("Argument 2(output final transformed taxi data): %s" % args.output_transform)
renamed_df = dprep.read_csv(args.input_transform + '/part-*')
# These functions transform the renamed data to be used finally for training.
# Visit https://docs.microsoft.com/en-us/azure/machine-learning/service/tutorial-data-prep for more details
# Split the pickup and dropoff date further into the day of the week, day of the month, and month values.
# To get the day of the week value, use the derive_column_by_example() function.
# The function takes an array parameter of example objects that define the input data,
# and the preferred output. The function automatically determines your preferred transformation.
# For the pickup and dropoff time columns, split the time into the hour, minute, and second by using
# the split_column_by_example() function with no example parameter. After you generate the new features,
# use the drop_columns() function to delete the original fields as the newly generated features are preferred.
# Rename the rest of the fields to use meaningful descriptions.
transformed_features_df = (renamed_df
.derive_column_by_example(
source_columns="pickup_date",
new_column_name="pickup_weekday",
example_data=[("2009-01-04", "Sunday"), ("2013-08-22", "Thursday")])
.derive_column_by_example(
source_columns="dropoff_date",
new_column_name="dropoff_weekday",
example_data=[("2013-08-22", "Thursday"), ("2013-11-03", "Sunday")])
.split_column_by_example(source_column="pickup_time")
.split_column_by_example(source_column="dropoff_time")
.split_column_by_example(source_column="pickup_time_1")
.split_column_by_example(source_column="dropoff_time_1")
.drop_columns(columns=[
"pickup_date", "pickup_time", "dropoff_date", "dropoff_time",
"pickup_date_1", "dropoff_date_1", "pickup_time_1", "dropoff_time_1"])
.rename_columns(column_pairs={
"pickup_date_2": "pickup_month",
"pickup_date_3": "pickup_monthday",
"pickup_time_1_1": "pickup_hour",
"pickup_time_1_2": "pickup_minute",
"pickup_time_2": "pickup_second",
"dropoff_date_2": "dropoff_month",
"dropoff_date_3": "dropoff_monthday",
"dropoff_time_1_1": "dropoff_hour",
"dropoff_time_1_2": "dropoff_minute",
"dropoff_time_2": "dropoff_second"}))
# Drop the pickup_datetime and dropoff_datetime columns because they're
# no longer needed (granular time features like hour,
# minute and second are more useful for model training).
processed_df = transformed_features_df.drop_columns(columns=["pickup_datetime", "dropoff_datetime"])
# Use the type inference functionality to automatically check the data type of each field,
# and display the inference results.
type_infer = processed_df.builders.set_column_types()
type_infer.learn()
# The inference results look correct based on the data. Now apply the type conversions to the dataflow.
type_converted_df = type_infer.to_dataflow()
# Before you package the dataflow, run two final filters on the data set.
# To eliminate incorrectly captured data points,
# filter the dataflow on records where both the cost and distance variable values are greater than zero.
# This step will significantly improve machine learning model accuracy,
# because data points with a zero cost or distance represent major outliers that throw off prediction accuracy.
final_df = type_converted_df.filter(dprep.col("distance") > 0)
final_df = final_df.filter(dprep.col("cost") > 0)
# Writing the final dataframe to use for training in the following steps
if not (args.output_transform is None):
os.makedirs(args.output_transform, exist_ok=True)
print("%s created" % args.output_transform)
write_df = final_df.write_to_csv(directory_path=dprep.LocalFileOutput(args.output_transform))
write_df.run_local()

View File

@@ -0,0 +1,31 @@
import argparse
import os
import azureml.dataprep as dprep
import azureml.core
print("Extracts important features from prepared data")
parser = argparse.ArgumentParser("featurization")
parser.add_argument("--input_featurization", type=str, help="input featurization")
parser.add_argument("--useful_columns", type=str, help="columns to use")
parser.add_argument("--output_featurization", type=str, help="output featurization")
args = parser.parse_args()
print("Argument 1(input training data path): %s" % args.input_featurization)
print("Argument 2(column features to use): %s" % str(args.useful_columns.strip("[]").split("\;")))
print("Argument 3:(output featurized training data path) %s" % args.output_featurization)
dflow_prepared = dprep.read_csv(args.input_featurization + '/part-*')
# These functions extracts useful features for training
# Visit https://docs.microsoft.com/en-us/azure/machine-learning/service/tutorial-auto-train-models for more detail
useful_columns = [s.strip().strip("'") for s in args.useful_columns.strip("[]").split("\;")]
dflow = dflow_prepared.keep_columns(useful_columns)
if not (args.output_featurization is None):
os.makedirs(args.output_featurization, exist_ok=True)
print("%s created" % args.output_featurization)
write_df = dflow.write_to_csv(directory_path=dprep.LocalFileOutput(args.output_featurization))
write_df.run_local()

View File

@@ -0,0 +1,12 @@
import os
import pandas as pd
def get_data():
print("In get_data")
print(os.environ['AZUREML_DATAREFERENCE_output_split_train_x'])
X_train = pd.read_csv(os.environ['AZUREML_DATAREFERENCE_output_split_train_x'] + "/part-00000", header=0)
y_train = pd.read_csv(os.environ['AZUREML_DATAREFERENCE_output_split_train_y'] + "/part-00000", header=0)
return {"X": X_train.values, "y": y_train.values.flatten()}

View File

@@ -0,0 +1,48 @@
import argparse
import os
import azureml.dataprep as dprep
import azureml.core
from sklearn.model_selection import train_test_split
def write_output(df, path):
os.makedirs(path, exist_ok=True)
print("%s created" % path)
df.to_csv(path + "/part-00000", index=False)
print("Split the data into train and test")
parser = argparse.ArgumentParser("split")
parser.add_argument("--input_split_features", type=str, help="input split features")
parser.add_argument("--input_split_labels", type=str, help="input split labels")
parser.add_argument("--output_split_train_x", type=str, help="output split train features")
parser.add_argument("--output_split_train_y", type=str, help="output split train labels")
parser.add_argument("--output_split_test_x", type=str, help="output split test features")
parser.add_argument("--output_split_test_y", type=str, help="output split test labels")
args = parser.parse_args()
print("Argument 1(input taxi data features path): %s" % args.input_split_features)
print("Argument 2(input taxi data labels path): %s" % args.input_split_labels)
print("Argument 3(output training features split path): %s" % args.output_split_train_x)
print("Argument 4(output training labels split path): %s" % args.output_split_train_y)
print("Argument 5(output test features split path): %s" % args.output_split_test_x)
print("Argument 6(output test labels split path): %s" % args.output_split_test_y)
x_df = dprep.read_csv(path=args.input_split_features, header=dprep.PromoteHeadersMode.GROUPED).to_pandas_dataframe()
y_df = dprep.read_csv(path=args.input_split_labels, header=dprep.PromoteHeadersMode.GROUPED).to_pandas_dataframe()
# These functions splits the input features and labels into test and train data
# Visit https://docs.microsoft.com/en-us/azure/machine-learning/service/tutorial-auto-train-models for more detail
x_train, x_test, y_train, y_test = train_test_split(x_df, y_df, test_size=0.2, random_state=223)
if not (args.output_split_train_x is None and
args.output_split_test_x is None and
args.output_split_train_y is None and
args.output_split_test_y is None):
write_output(x_train, args.output_split_train_x)
write_output(y_train, args.output_split_train_y)
write_output(x_test, args.output_split_test_x)
write_output(y_test, args.output_split_test_y)

View File

@@ -4,8 +4,9 @@
Try out the sample notebooks: Try out the sample notebooks:
* [Use MLflow with Azure Machine Learning for local training run](./train-local/train-local.ipynb) * [Use MLflow with Azure Machine Learning for Local Training Run](./train-local/train-local.ipynb)
* [Use MLflow with Azure Machine Learning for remote training run](./train-remote/train-remote.ipynb) * [Use MLflow with Azure Machine Learning for Remote Training Run](./train-remote/train-remote.ipynb)
* [Deploy Model as Azure Machine Learning web service using MLflow](./deploy-model/deploy-model.ipynb) * [Deploy Model as Azure Machine Learning Web Service using MLflow](./deploy-model/deploy-model.ipynb)
* [Train and Deploy PyTorch Image Classifier](./train-deploy-pytorch/train-deploy-pytorch.ipynb)
![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/using-mlflow/README..png) ![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/using-mlflow/README..png)

View File

@@ -0,0 +1,150 @@
# Copyright (c) 2017, PyTorch Team
# All rights reserved
# Licensed under BSD 3-Clause License.
# This example is based on PyTorch MNIST example:
# https://github.com/pytorch/examples/blob/master/mnist/main.py
import mlflow
import mlflow.pytorch
from mlflow.utils.environment import _mlflow_conda_env
import warnings
import cloudpickle
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
from torchvision import datasets, transforms
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 20, 5, 1)
self.conv2 = nn.Conv2d(20, 50, 5, 1)
self.fc1 = nn.Linear(4 * 4 * 50, 500)
self.fc2 = nn.Linear(500, 10)
def forward(self, x):
# Added the view for reshaping score requests
x = x.view(-1, 1, 28, 28)
x = F.relu(self.conv1(x))
x = F.max_pool2d(x, 2, 2)
x = F.relu(self.conv2(x))
x = F.max_pool2d(x, 2, 2)
x = x.view(-1, 4 * 4 * 50)
x = F.relu(self.fc1(x))
x = self.fc2(x)
return F.log_softmax(x, dim=1)
def train(args, model, device, train_loader, optimizer, epoch):
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if batch_idx % args.log_interval == 0:
print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
epoch, batch_idx * len(data), len(train_loader.dataset),
100. * batch_idx / len(train_loader), loss.item()))
# Use MLflow logging
mlflow.log_metric("epoch_loss", loss.item())
def test(args, model, device, test_loader):
model.eval()
test_loss = 0
correct = 0
with torch.no_grad():
for data, target in test_loader:
data, target = data.to(device), target.to(device)
output = model(data)
# sum up batch loss
test_loss += F.nll_loss(output, target, reduction="sum").item()
# get the index of the max log-probability
pred = output.argmax(dim=1, keepdim=True)
correct += pred.eq(target.view_as(pred)).sum().item()
test_loss /= len(test_loader.dataset)
print("\n")
print("Test set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n".format(
test_loss, correct, len(test_loader.dataset),
100. * correct / len(test_loader.dataset)))
# Use MLflow logging
mlflow.log_metric("average_loss", test_loss)
class Args(object):
pass
# Training settings
args = Args()
setattr(args, 'batch_size', 64)
setattr(args, 'test_batch_size', 1000)
setattr(args, 'epochs', 3) # Higher number for better convergence
setattr(args, 'lr', 0.01)
setattr(args, 'momentum', 0.5)
setattr(args, 'no_cuda', True)
setattr(args, 'seed', 1)
setattr(args, 'log_interval', 10)
setattr(args, 'save_model', True)
use_cuda = not args.no_cuda and torch.cuda.is_available()
torch.manual_seed(args.seed)
device = torch.device("cuda" if use_cuda else "cpu")
kwargs = {'num_workers': 1, 'pin_memory': True} if use_cuda else {}
train_loader = torch.utils.data.DataLoader(
datasets.MNIST('../data', train=True, download=True,
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])),
batch_size=args.batch_size, shuffle=True, **kwargs)
test_loader = torch.utils.data.DataLoader(
datasets.MNIST(
'../data',
train=False,
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))])),
batch_size=args.test_batch_size, shuffle=True, **kwargs)
def driver():
warnings.filterwarnings("ignore")
# Dependencies for deploying the model
pytorch_index = "https://download.pytorch.org/whl/"
pytorch_version = "cpu/torch-1.1.0-cp36-cp36m-linux_x86_64.whl"
deps = [
"cloudpickle=={}".format(cloudpickle.__version__),
pytorch_index + pytorch_version,
"torchvision=={}".format(torchvision.__version__),
"Pillow=={}".format("6.0.0")
]
with mlflow.start_run() as run:
model = Net().to(device)
optimizer = optim.SGD(
model.parameters(),
lr=args.lr,
momentum=args.momentum)
for epoch in range(1, args.epochs + 1):
train(args, model, device, train_loader, optimizer, epoch)
test(args, model, device, test_loader)
# Log model to run history using MLflow
if args.save_model:
model_env = _mlflow_conda_env(additional_pip_deps=deps)
mlflow.pytorch.log_model(model, "model", conda_env=model_env)
return run
if __name__ == "__main__":
driver()

View File

@@ -0,0 +1,481 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Copyright (c) Microsoft Corporation. All rights reserved.\n",
"\n",
"Licensed under the MIT License."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/using-mlflow/train-deploy-pytorch/train-deploy-pytorch.png)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Use MLflow with Azure Machine Learning to Train and Deploy PyTorch Image Classifier\n",
"\n",
"This example shows you how to use MLflow together with Azure Machine Learning services for tracking the metrics and artifacts while training a PyTorch model to classify MNIST digit images, and then deploy the model as a web service. You'll learn how to:\n",
"\n",
" 1. Set up MLflow tracking URI so as to use Azure ML\n",
" 2. Create experiment\n",
" 3. Instrument your model with MLflow tracking\n",
" 4. Train a PyTorch model locally\n",
" 5. Train a model on GPU compute on Azure\n",
" 6. View your experiment within your Azure ML Workspace in Azure Portal\n",
" 7. Create a Docker image from the trained model\n",
" 8. Deploy the model as a web service on Azure Container Instance\n",
" 9. Call the model to make predictions\n",
" \n",
"### Pre-requisites\n",
" \n",
"Make sure you have completed the [Configuration](../../../configuration.ipnyb) notebook to set up your Azure Machine Learning workspace and ensure other common prerequisites are met.\n",
"\n",
"Also, install mlflow-azureml package using ```pip install mlflow-azureml```. Note that mlflow-azureml installs mlflow package itself as a dependency, if you haven't done so previously.\n",
"\n",
"### Set-up\n",
"\n",
"Import packages and check versions of Azure ML SDK and MLflow installed on your computer. Then connect to your Workspace."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import sys, os\n",
"import mlflow\n",
"import mlflow.azureml\n",
"import mlflow.sklearn\n",
"\n",
"import azureml.core\n",
"from azureml.core import Workspace\n",
"\n",
"\n",
"print(\"SDK version:\", azureml.core.VERSION)\n",
"print(\"MLflow version:\", mlflow.version.VERSION)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ws = Workspace.from_config()\n",
"ws.get_details()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Set tracking URI\n",
"\n",
"Set the MLFlow tracking URI to point to your Azure ML Workspace. The subsequent logging calls from MLFlow APIs will go to Azure ML services and will be tracked under your Workspace."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"mlflow.set_tracking_uri(ws.get_mlflow_tracking_uri())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Create Experiment\n",
"\n",
"In both MLflow and Azure ML, training runs are grouped into experiments. Let's create one for our experimentation."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"experiment_name = \"pytorch-with-mlflow\"\n",
"mlflow.set_experiment(experiment_name)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Train model locally while logging metrics and artifacts\n",
"\n",
"The ```scripts/train.py``` program contains the code to load the image dataset, and train and test the model. Within this program, the train.driver function wraps the end-to-end workflow.\n",
"\n",
"Within the driver, the ```mlflow.start_run``` starts MLflow tracking. Then, ```mlflow.log_metric``` functions are used to track the convergence of the neural network training iterations. Finally ```mlflow.pytorch.save_model``` is used to save the trained model in framework-aware manner.\n",
"\n",
"Let's add the program to search path, import it as a module, and then invoke the driver function. Note that the training can take few minutes."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"lib_path = os.path.abspath(\"scripts\")\n",
"sys.path.append(lib_path)\n",
"\n",
"import train"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"run = train.driver()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"You can view the metrics of the run at Azure Portal"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print(azureml.mlflow.get_portal_url(run))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Train model on GPU compute on Azure\n",
"\n",
"Next, let's run the same script on GPU-enabled compute for faster training. If you've completed the the [Configuration](../../../configuration.ipnyb) notebook, you should have a GPU cluster named \"gpu-cluster\" available in your workspace. Otherwise, follow the instructions in the notebook to create one. For simplicity, this example uses single process on single VM to train the model.\n",
"\n",
"Create a PyTorch estimator to specify the training configuration: script, compute as well as additional packages needed. To enable MLflow tracking, include ```azureml-mlflow``` as pip package. The low-level specifications for the training run are encapsulated in the estimator instance."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.train.dnn import PyTorch\n",
"\n",
"pt = PyTorch(source_directory=\"./scripts\", \n",
" entry_script = \"train.py\", \n",
" compute_target = \"gpu-cluster\", \n",
" node_count = 1, \n",
" process_count_per_node = 1, \n",
" use_gpu=True,\n",
" pip_packages = [\"azureml-mlflow\", \"Pillow==6.0.0\"])\n",
"\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Get a reference to the experiment you created previously, but this time, as Azure Machine Learning experiment object.\n",
"\n",
"Then, use ```Experiment.submit``` method to start the remote training run. Note that the first training run often takes longer as Azure Machine Learning service builds the Docker image for executing the script. Subsequent runs will be faster as cached image is used."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.core import Experiment\n",
"\n",
"exp = Experiment(ws, experiment_name)\n",
"run = exp.submit(pt)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"You can monitor the run and its metrics on Azure Portal."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"run"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Also, you can wait for run to complete."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"run.wait_for_completion(show_output=True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Deploy model as web service\n",
"\n",
"To deploy a web service, first create a Docker image, and then deploy that Docker image on inferencing compute.\n",
"\n",
"The ```mlflow.azureml.build_image``` function builds a Docker image from saved PyTorch model in a framework-aware manner. It automatically creates the PyTorch-specific inferencing wrapper code and specififies package dependencies for you."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"run.get_file_names()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Then build a docker image using *runs:/&lt;run.id&gt;/model* as the model_uri path.\n",
"\n",
"Note that the image building can take several minutes."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"model_path = \"model\"\n",
"\n",
"\n",
"azure_image, azure_model = mlflow.azureml.build_image(model_uri='runs:/{}/{}'.format(run.id, model_path),\n",
" workspace=ws,\n",
" model_name='pytorch_mnist',\n",
" image_name='pytorch-mnist-img',\n",
" synchronous=True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Then, deploy the Docker image to Azure Container Instance: a serverless compute capable of running a single container. You can tag and add descriptions to help keep track of your web service. \n",
"\n",
"[Other inferencing compute choices](https://docs.microsoft.com/en-us/azure/machine-learning/service/how-to-deploy-and-where) include Azure Kubernetes Service which provides scalable endpoint suitable for production use.\n",
"\n",
"Note that the service deployment can take several minutes."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.core.webservice import AciWebservice, Webservice\n",
"\n",
"aci_config = AciWebservice.deploy_configuration(cpu_cores=2, \n",
" memory_gb=5, \n",
" tags={\"data\": \"MNIST\", \"method\" : \"pytorch\"}, \n",
" description=\"Predict using webservice\")\n",
"\n",
"\n",
"# Deploy the image to Azure Container Instances (ACI) for real-time serving\n",
"webservice = Webservice.deploy_from_image(\n",
" image=azure_image, workspace=ws, name=\"pytorch-mnist-1\", deployment_config=aci_config)\n",
"\n",
"\n",
"webservice.wait_for_deployment()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Once the deployment has completed you can check the scoring URI of the web service."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print(\"Scoring URI is: {}\".format(webservice.scoring_uri))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In case of a service creation issue, you can use ```webservice.get_logs()``` to get logs to debug."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Make predictions using web service\n",
"\n",
"To make the web service, create a test data set as normalized PyTorch tensors. \n",
"\n",
"Then, let's define a utility function that takes a random image and converts it into format and shape suitable for as input to PyTorch inferencing end-point. The conversion is done by: \n",
"\n",
" 1. Select a random (image, label) tuple\n",
" 2. Take the image and converting the tensor to NumPy array \n",
" 3. Reshape array into 1 x 1 x N array\n",
" * 1 image in batch, 1 color channel, N = 784 pixels for MNIST images\n",
" * Note also ```x = x.view(-1, 1, 28, 28)``` in net definition in ```train.py``` program to shape incoming scoring requests.\n",
" 4. Convert the NumPy array to list to make it into a built-in type.\n",
" 5. Create a dictionary {\"data\", &lt;list&gt;} that can be converted to JSON string for web service requests."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from torchvision import datasets, transforms\n",
"import random\n",
"import numpy as np\n",
"\n",
"test_data = datasets.MNIST('../data', train=False, transform=transforms.Compose([\n",
" transforms.ToTensor(),\n",
" transforms.Normalize((0.1307,), (0.3081,))]))\n",
"\n",
"\n",
"def get_random_image():\n",
" image_idx = random.randint(0,len(test_data))\n",
" image_as_tensor = test_data[image_idx][0]\n",
" return {\"data\": elem for elem in image_as_tensor.numpy().reshape(1,1,-1).tolist()}"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Then, invoke the web service using a random test image. Convert the dictionary containing the image to JSON string before passing it to web service.\n",
"\n",
"The response contains the raw scores for each label, with greater value indicating higher probability. Sort the labels and select the one with greatest score to get the prediction. Let's also plot the image sent to web service for comparison purposes."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%matplotlib inline\n",
"\n",
"import json\n",
"import matplotlib.pyplot as plt\n",
"\n",
"test_image = get_random_image()\n",
"\n",
"response = webservice.run(json.dumps(test_image))\n",
"\n",
"response = sorted(response[0].items(), key = lambda x: x[1], reverse = True)\n",
"\n",
"\n",
"print(\"Predicted label:\", response[0][0])\n",
"plt.imshow(np.array(test_image[\"data\"]).reshape(28,28), cmap = \"gray\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"You can also call the web service using a raw POST method against the web service"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import requests\n",
"\n",
"response = requests.post(url=webservice.scoring_uri, data=json.dumps(test_image),headers={\"Content-type\": \"application/json\"})\n",
"print(response.text)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"authors": [
{
"name": "roastala"
}
],
"celltoolbar": "Edit Metadata",
"kernelspec": {
"display_name": "Python 3.6",
"language": "python",
"name": "python36"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.3"
},
"name": "mlflow-sparksummit-pytorch",
"notebookId": 2495374963457641
},
"nbformat": 4,
"nbformat_minor": 1
}