{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "Copyright (c) Microsoft Corporation. All rights reserved.\n", "\n", "Licensed under the MIT License." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks//notebooks/work-with-data/datasets/datasets-tutorial/datasets-diff.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#
Detect drift using Dataset Diff API
\n", "\n", "
\n", "\n", " This notebook provides step by step instructions on how to compare two different datasets. It includes two parts\u00ef\u00bc\u0161\n", "
    ☑ compare two datasets using local compute;\n", "
    ☑ compare two datasets remotely using Azure ML compute.\n", "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Prerequisites and Setup\n", "\n", "This section is shared by both local and remote execution, you may need duplicate this section if splitting this notebook into separate local/remote notebooks.\n", "\n", "\n", "## Prerequisites\n", "\n", "### Install Supporting Packages" ] }, { "cell_type": "markdown", "metadata": { "scrolled": true }, "source": [ "    pip install scipy
\n", "    pip install tqdm
\n", "    pip install pandas
\n", "    pip install pyarrow
\n", "    pip install ipywidgets
\n", "    pip install lightgbm
\n", "    pip install matplotlib
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Install AzureML Packages" ] }, { "cell_type": "markdown", "metadata": { "scrolled": true }, "source": [ "    pip install --user azureml-core
\n", "\n", "    pip install --user azureml-opendatasets
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Import Dependencies" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "import sys\n", "import warnings\n", "import requests\n", "import pandas as pd\n", "import numpy as np\n", "import ipywidgets as widgets\n", "\n", "import azureml.core\n", "\n", "from io import StringIO\n", "from tqdm import tqdm\n", "from IPython import display\n", "from datetime import datetime, timedelta\n", "from azureml.core import Datastore, Dataset\n", "from azureml.opendatasets import NoaaIsdWeather\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Declare Variables For Demo\n", "\n", "Feel free to customize them." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "year = 2016\n", "month = 1\n", "date = 1\n", "b_days = 2 # for baseline\n", "t_days = 7 # for target\n", "\n", "local_folder = \"demo\"\n", "baseline_file = 'baseline.csv'\n", "\n", "feature_columns = ['usaf', 'wban', 'latitude', 'longitude', 'elevation', 'temperature', 'p_k']" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Prepare Datasets\n", "\n", "The diff calcualtion is always between two datasets, here for demo, we use \"baseline\" and \"target\" to present them." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "os.makedirs(local_folder, exist_ok=True)\n", "\n", "local_baseline = os.path.join(local_folder, baseline_file)\n", "\n", "start_date = datetime(year, month, date)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Prepare Baseline Dataset\n", "Retrieve wether data from NOAA for declared days (b_days declared in above cell). It may takes 2 minutes for 2 days." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "start = start_date\n", "isd = NoaaIsdWeather(start, start + timedelta(days=b_days))\n", "\n", "baseline_df = isd.to_pandas_dataframe()\n", "baseline_df.head()\n", "\n", "baseline_df.to_csv(local_baseline)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Prepare Target Dataset(s)\n", "\n", "Retrieve wether data from NOAA for declared days (t_days declared in above cell). It may takes 5 minutes for 7 days." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "for day in tqdm(range(0, t_days)):\n", " start = start_date + timedelta(days=day)\n", " isd = NoaaIsdWeather(start, start + timedelta(days=1))\n", "\n", " target_df = isd.to_pandas_dataframe()\n", " target_df = target_df[feature_columns]\n", " target_df.to_csv(os.path.join(local_folder, 'target_{}.csv'.format(day)))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Predefine Methods For Result Processing\n", "\n", "## Parse and Present Datasets' Diff Results\n", "\n", "Each diff result is a list of \"DiffMetric\" objects. Typically each objec present a detailed measurement output for a specific column.\n", "

Below is an example of \"DiffMetric\" object:
\n", "\n", "
    {  \n", "
       'name':'percentage_difference_median',                        --> measurement name\n", "
       'value':0.01270670472603889,                                  --> the result value a number to indicate how big the diff is for current measurement.\n", "
       'extended_properties':{  \n", "

          'action_id':'3d3da05d-0871-4cc9-93cb-f43859aae13b',        --> (remote calculation only) action id\n", "
          'from_dataset_id':'12edc566-8803-4e0f-ba91-c2ee05eeddee',  --> (remote calculation only) baseline dataset\n", "
          'from_dataset_version':'1',                                --> (remote calculation only) baseline version\n", "
          'to_dataset_id':'9b85c9ba-50c2-4227-a9bc-91dee4a18228',    --> (remote calculation only) target dataset\n", "
          'to_dataset_version':'1',                                  --> (remote calculation only) target version\n", "

          'column_name':'elevation',                                 --> column name in dataset, 
                                                                         could be ['name':'datadrift_coefficient'] for dataset level diff\n", "
          'metric_category':'profile_diff'                           --> category, could be :
                                                                             dataset_drift (dataset level)
                                                                             profile_diff (column level)
                                                                             statistical_distance (column level)\n", "
       }\n", "
    }\n", "
" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def parse_result(rst, columns, measurements):\n", " columnlist = list(columns)\n", " columnlist.insert(0, \"measurements \\ columns\")\n", " measurementlist = list(measurements)\n", " \n", " daily_result = []\n", " daily_result.append(columnlist)\n", " \n", " drift = None\n", " daily_contribution = {}\n", " \n", " for m in measurements:\n", " emptylist = ([''] * len(columns))\n", " emptylist.insert(0, m)\n", " daily_result.append(emptylist)\n", "\n", " for r in rst:\n", " # get dataset level diff (drift)\n", " if r.name == \"datadrift_coefficient\":\n", " drift = r.value\n", " # get diff (drift) contribution for each column:\n", " elif r.name == \"datadrift_contribution\":\n", " daily_contribution[r.extended_properties[\"column_name\"]] = r.value\n", " # get column level diff measurements\n", " else:\n", " if \"column_name\" in r.extended_properties:\n", " col = r.extended_properties[\"column_name\"]\n", " msm = r.name\n", " val = r.value\n", " cid = columnlist.index(col)\n", " kid = measurementlist.index(msm) + 1\n", " daily_result[kid][cid] = val\n", "\n", " return daily_result, drift, daily_contribution" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Present Dataset Level Diff (aka drift)\n", "\n", "This method will generate two graphs, the left graph presents dataset level difference for all compared baseline-target pairs, the right graph presents dataset level difference contribution for each column so that we know which column impacts more." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%matplotlib inline\n", "\n", "import matplotlib.dates as mdates\n", "import matplotlib.pyplot as plt \n", "import matplotlib as mpl\n", "\n", "def show_diff(drift_metrics, dates, columns, drift_contributions, summary_contribute, bottoms_contribute):\n", " drifts = [drift_metrics[day] for day in drift_metrics]\n", " daily_summary_contribution = list(summary_contribute.values())\n", " xrange = pd.date_range(dates[0], dates[-1], freq='D')\n", "\n", " figure = plt.figure(figsize=(16, 4))\n", " plt.tight_layout()\n", "\n", " # left graph\n", " ax1 = plt.subplot(1, 2, 1)\n", " ax1.grid()\n", " plt.sca(ax1)\n", " plt.title(\"Diff(Drift) Trend\\n\", fontsize=20)\n", " plt.xticks(rotation=30)\n", " plt.xlabel(\"Date\", fontsize=16)\n", " plt.ylabel(\"Drift Coefficent\", fontsize=16)\n", " plt.plot_date(dates, drifts, '-r', marker='.', linewidth=0.5, markersize=5)\n", "\n", " # right graph\n", " ax2 = plt.subplot(1, 2, 2)\n", " plt.sca(ax2)\n", " plt.title(\"Drift Contribution of columns\\n\", fontsize=20)\n", " plt.xticks(xrange, rotation=30)\n", " plt.xlabel(\"Date\", fontsize=16)\n", " plt.ylabel(\"Drift Contribution\", fontsize=16)\n", "\n", " yvals = ax2.get_yticks()\n", " ax2.set_yticklabels(['{:,.2%}'.format(v) for v in yvals])\n", " ax2.xaxis.set_major_formatter(mdates.DateFormatter('%Y%m-%d'))\n", "\n", " for c in columns:\n", " contribution = []\n", " for dt in drift_contributions:\n", " contribution.append(drift_contributions[dt][c])\n", " bar_ratio = [x / y for x, y in zip(contribution, daily_summary_contribution)]\n", "\n", " ax2.bar(dates, height=bar_ratio, bottom=bottoms_contribute)\n", " bottoms_contribute = [x + y for x, y in zip(bottoms_contribute, bar_ratio)]\n", "\n", " plt.legend(columns)\n", "\n", " plt.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Execute Datasets' Diff Calculation Locally\n", "\n", "Local execution let you to run in a Jupyter Notebook or Code editor in a local computer.\n", "\n", "## Calculate Dataset Diff At Local\n", "\n", "### Create Baseline Dataset\n", "\n", "Create baseline dataset object from the retrieved baseline data." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from azureml.core import Dataset\n", "\n", "baseline = Dataset.auto_read_files(local_baseline, include_path=True)\n", "\n", "# The baseline data is not filtered by feature columns list, thus all retrieved data columns will be listed below.\n", "# You'll see \"Column1\" in the output, which is a default name added when the original column is not available.\n", "baseline.get_profile()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create Target Datasets\n", "\n", "Create target dataset objects from retrieved target data." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "targets = {}\n", "\n", "for day in tqdm(range(0, t_days)):\n", " target = Dataset.auto_read_files(os.path.join(local_folder, 'target_{}.csv'.format(day)))\n", " targets[day] = target" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Calculate Diff Between Each Target Dataset And Baseline Dataset\n", "\n", "Compare each target dataset with baseline dataset to calculate diff between them." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "buf = {}\n", "\n", "columns = set()\n", "measurements = set()\n", "\n", "for day in tqdm(range(0, t_days)):\n", " diff_action = baseline.diff(rhs_dataset=targets[day])\n", " diff_action.wait_for_completion()\n", " \n", " dt = (start_date + timedelta(days=day)).strftime(\"%Y-%m-%d\")\n", " buf[dt] = diff_action._result\n", " \n", " for r in diff_action._result:\n", " if r.name not in measurements:\n", " measurements.add(r.name)\n", " if \"column_name\" in r.extended_properties and r.extended_properties[\"column_name\"] not in columns:\n", " columns.add(r.extended_properties[\"column_name\"])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Parse And Present Local Execution Results\n", "\n", "\n", "
The diff outputs usually contains two different level information:\n", "
    1. General diff, aka dataset level diff. The output is a number between 0 and 1 to indicate what level the diff is. This dataset level diff is also called drift between two datasets.\n", "
    2. Detailed diff, aka column level diff. The output is a metrics organized like a 2-D array. One dimension is column names, that is why it's in column level. The other dimension are measurements. The diff calculation actually includes variuos measurements from different perspectives, each measurement will generate an index for each column to present how big impacts this column contributed.\n", "
\n", "\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Parse and List Column Level Diff Results\n", "\n", "Here will iteratively list all details per each measurement per column calculated." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pandas import DataFrame\n", "\n", "dates = []\n", "drift_metrics = {}\n", "drift_contributions = {}\n", "summary_contribute = {}\n", "bottoms_contribute = []\n", "\n", "for dt, rst in buf.items():\n", " dates.append(dt)\n", " print(\"\\n---------------------------------------- Result of {} ----------------------------------------\".format(dt))\n", " \n", " daily_result, drift, daily_contribution = parse_result(rst, columns, measurements)\n", " drift_metrics[dt] = drift\n", " drift_contributions[dt] = daily_contribution\n", "\n", " sum_contribution = 0\n", " bottoms_contribute.append(0)\n", " for col, val in daily_contribution.items():\n", " sum_contribution += val\n", " summary_contribute[dt] = sum_contribution\n", "\n", " \n", " display.display(pd.DataFrame(daily_result))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Present Dataset Level Diff (aka drift) In Graphs\n", "\n", "The left graph presents dataset level difference for all compared baseline-target pairs, the right graph presents dataset level difference contribution for each column so that we know which column impacts more." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "show_diff(drift_metrics, dates, columns, drift_contributions, summary_contribute, bottoms_contribute)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Excute Datasets's Diff Calculation Remotely\n", "\n", "Remote execution let you to data compare on more powerful computes - Machine Learning Compute clusters." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Prepare Remote Environment\n", "### Get Workspace\n", "\n", "
If you are using an Azure Machine Learning Notebook VM, you are all set. Otherwise, check the configuration notebook first if you haven't already to establish your connection to the AzureML Workspace.\n", "
" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from azureml.core.workspace import Workspace\n", "from azureml.core.authentication import InteractiveLoginAuthentication\n", "\n", "ws = Workspace.from_config()\n", "\n", "print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep=\"\\n\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create Compute Resource For Calculation\n", "Check if compute resouce exists and create a new one if not." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from azureml.core.compute import AmlCompute, ComputeTarget\n", "\n", "existing = False\n", "del_cmpt = False\n", "cts = ws.compute_targets\n", "\n", "if (ws.DEFAULT_CPU_CLUSTER_NAME in cts and cts[ws.DEFAULT_CPU_CLUSTER_NAME].type == 'AmlCompute'):\n", " existing = True\n", " aml_compute = cts[ws.DEFAULT_CPU_CLUSTER_NAME]\n", " \n", "if not existing:\n", " aml_compute = AmlCompute.create(ws,ws.DEFAULT_CPU_CLUSTER_NAME,ws.DEFAULT_CPU_CLUSTER_CONFIGURATION)\n", " aml_compute.wait_for_completion(show_output=True)\n", " del_cmpt = True" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Upload Sample Data To Datastore\n", "\n", "Upload data files to the blob storage in Azure ML workspace." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from azureml.core import Datastore, Dataset\n", "import azureml.data\n", "from azureml.data.azure_storage_datastore import AzureFileDatastore, AzureBlobDatastore\n", "\n", "remote_data_path ='demo'\n", "\n", "dstore = ws.get_default_datastore()\n", "dstore.upload_files([local_baseline],\n", " target_path=remote_data_path,\n", " overwrite=True,\n", " show_progress=True)\n", "\n", "for day in tqdm(range(0, t_days)):\n", " target_file = os.path.join(local_folder, 'target_{}.csv'.format(day))\n", " dstore.upload_files([target_file],\n", " target_path=remote_data_path,\n", " overwrite=True,\n", " show_progress=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Register DataSets\n", "\n", "Create and Register Datasets." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from azureml.core import Datastore, Dataset\n", "dstore = ws.get_default_datastore()\n", "\n", "xpath = remote_data_path + '/' + baseline_file\n", "toregister_baseline = Dataset.from_delimited_files(dstore.path(xpath))\n", "registered_baseline = toregister_baseline.register(workspace = ws,\n", " name = 'dataset baseline for diff demo',\n", " description = 'dataset baseline for diff comparison',\n", " exist_ok = True,\n", " update_if_exist = True\n", " )\n", "\n", "registered_targets = {}\n", "for day in tqdm(range(0, t_days)):\n", " target_file = 'target_{}.csv'.format(day)\n", " toregister_target = Dataset.from_delimited_files(dstore.path(remote_data_path + '/' + target_file))\n", " registered_target = toregister_target.register(workspace = ws,\n", " name = 'dataset target-{} for diff demo'.format(day),\n", " description = 'target target-{} for diff comparison'.format(day),\n", " exist_ok = True,\n", " update_if_exist = True\n", " )\n", " registered_targets[day] = registered_target" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Calculate Dataset Diff Remotely\n", "\n", "Perform the calculation remotely. This may take 20 minutes.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "remote_diffs = {}\n", "\n", "r_columns = set()\n", "r_measurements = set()\n", "\n", "for day, registered_target in registered_targets.items():\n", " dt = (start_date + timedelta(days=day)).strftime(\"%Y-%m-%d\")\n", " remote_diff = registered_baseline.diff(registered_target, compute_target=ws.DEFAULT_CPU_CLUSTER_NAME)\n", " remote_diff.wait_for_completion()\n", " \n", " remote_diffs[dt] = remote_diff.get_result()\n", " \n", " for r in remote_diff.get_result():\n", " if r.name not in r_measurements:\n", " r_measurements.add(r.name)\n", " if \"column_name\" in r.extended_properties and r.extended_properties[\"column_name\"] not in r_columns:\n", " r_columns.add(r.extended_properties[\"column_name\"])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Parse And Present Remote Execution Results\n", "\n", "### Parse And List Column Level Diff Results\n", "\n", "Here will iteratively list all details per each measurement per column calculated." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pandas import DataFrame\n", "\n", "r_dates = []\n", "r_drift_metrics = {}\n", "r_drift_contributions = {}\n", "r_summary_contribute = {}\n", "r_bottoms_contribute = []\n", "\n", "for dt, rst in remote_diffs.items():\n", " r_dates.append(dt)\n", " print(\"\\n---------------------------------------- Result of {} ----------------------------------------\".format(dt))\n", " \n", " daily_result, drift, daily_contribution = parse_result(rst, r_columns, r_measurements)\n", " r_drift_metrics[dt] = drift\n", " r_drift_contributions[dt] = daily_contribution\n", "\n", " sum_contribution = 0\n", " r_bottoms_contribute.append(0)\n", " for col, val in daily_contribution.items():\n", " sum_contribution += val\n", " r_summary_contribute[dt] = sum_contribution\n", "\n", " \n", " display.display(pd.DataFrame(daily_result))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Present Dataset Level Diff (aka drift) In Graphs\n", "\n", "The left graph presents dataset level difference for all compared baseline-target pairs, the right graph presents dataset level difference contribution for each column so that we know which column impacts more." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "show_diff(r_drift_metrics, r_dates, r_columns, r_drift_contributions, r_summary_contribute, r_bottoms_contribute)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Clean Resources Created" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "if del_cmpt == True:\n", " try:\n", " aml_compute.delete()\n", " aml_compute.wait_for_completion()\n", " except Exception as e:\n", " if 'ComputeTargetNotFound' in e.message:\n", " print(\"Compute target deleted.\")\n", " del_cmpt = False" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Reference\n", "\n", "Detailed description of Dataset Diff attribute can be found at
\n", "https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.dataset(class)?view=azure-ml-py#diff-rhs-dataset--compute-target-none--columns-none-" ] } ], "metadata": { "authors": [ { "name": "davx" } ], "kernelspec": { "display_name": "Python 3.6", "language": "python", "name": "python36" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.8" }, "notice": "Copyright (c) Microsoft Corporation. All rights reserved. Licensed under the MIT License." }, "nbformat": 4, "nbformat_minor": 2 }