# Track Data Drift between Training and Inference Data in Production 

With this notebook, you will learn how to enable the DataDrift service to automatically track and determine whether your inference data is drifting from the data your model was initially trained on. The DataDrift service provides metrics and visualizations to help stakeholders identify which specific features cause the concept drift to occur.

Please email driftfeedback@microsoft.com with any issues. A member from the DataDrift team will respond shortly. 

The DataDrift Public Preview API can be found [here](https://docs.microsoft.com/en-us/python/api/azureml-contrib-datadrift/?view=azure-ml-py). 

![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/monitor-models/data-drift/azureml-datadrift.png)

# Prerequisites and Setup

## Install the DataDrift package

Install the azureml-datadrift, azureml-opendatasets and lightgbm packages before running this notebook.
```
pip install azureml-datadrift
pip install lightgbm
```

## Import Dependencies

In [None]:
import json
import os
import time
from datetime import datetime, timedelta

import numpy as np
import pandas as pd
import requests
from azureml.core import Dataset, Workspace
from azureml.core.compute import AksCompute, ComputeTarget
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.image import ContainerImage
from azureml.core.model import Model
from azureml.core.webservice import Webservice, AksWebservice
from azureml.datadrift import DataDriftDetector, AlertConfiguration
from azureml.opendatasets import NoaaIsdWeather
from azureml.widgets import RunDetails
from sklearn.externals import joblib
from sklearn.model_selection import train_test_split


## Set up Configuraton and Create Azure ML Workspace

If you are using an Azure Machine Learning Notebook VM, you are all set. Otherwise, go through the [configuration notebook](../../../configuration.ipynb) first if you haven't already to establish your connection to the AzureML Workspace.

In [None]:
# Please type in your initials/alias. The prefix is prepended to the names of resources created by this notebook. 
prefix = "dd"

# NOTE: Please do not change the model_name, as it's required by the score.py file
model_name = "driftmodel"
image_name = "{}driftimage".format(prefix)
service_name = "{}driftservice".format(prefix)

# optionally, set email address to receive an email alert for DataDrift
email_address = ""

In [None]:
ws = Workspace.from_config()
print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep = '\n')

## Generate Train/Testing Data

For this demo, we will use NOAA weather data from [Azure Open Datasets](https://azure.microsoft.com/services/open-datasets/). You may replace this step with your own dataset. 

In [None]:
usaf_list = ['725724', '722149', '723090', '722159', '723910', '720279',
             '725513', '725254', '726430', '720381', '723074', '726682',
             '725486', '727883', '723177', '722075', '723086', '724053',
             '725070', '722073', '726060', '725224', '725260', '724520',
             '720305', '724020', '726510', '725126', '722523', '703333',
             '722249', '722728', '725483', '722972', '724975', '742079',
             '727468', '722193', '725624', '722030', '726380', '720309',
             '722071', '720326', '725415', '724504', '725665', '725424',
             '725066']

columns = ['wban', 'datetime', 'latitude', 'longitude', 'elevation', 'windAngle', 'windSpeed', 'temperature', 'stationName', 'p_k']


def enrich_weather_noaa_data(noaa_df):
    hours_in_day = 23
    week_in_year = 52
    
    noaa_df["hour"] = noaa_df["datetime"].dt.hour
    noaa_df["weekofyear"] = noaa_df["datetime"].dt.week
    
    noaa_df["sine_weekofyear"] = noaa_df['datetime'].transform(lambda x: np.sin((2*np.pi*x.dt.week-1)/week_in_year))
    noaa_df["cosine_weekofyear"] = noaa_df['datetime'].transform(lambda x: np.cos((2*np.pi*x.dt.week-1)/week_in_year))

    noaa_df["sine_hourofday"] = noaa_df['datetime'].transform(lambda x: np.sin(2*np.pi*x.dt.hour/hours_in_day))
    noaa_df["cosine_hourofday"] = noaa_df['datetime'].transform(lambda x: np.cos(2*np.pi*x.dt.hour/hours_in_day))
    
    return noaa_df

def add_window_col(input_df):
    shift_interval = pd.Timedelta('-7 days') # your X days interval
    df_shifted = input_df.copy()
    df_shifted['datetime'] = df_shifted['datetime'] - shift_interval
    df_shifted.drop(list(input_df.columns.difference(['datetime', 'usaf', 'wban', 'sine_hourofday', 'temperature'])), axis=1, inplace=True)

    # merge, keeping only observations where -1 lag is present
    df2 = pd.merge(input_df,
                   df_shifted,
                   on=['datetime', 'usaf', 'wban', 'sine_hourofday'],
                   how='inner',  # use 'left' to keep observations without lags
                   suffixes=['', '-7'])
    return df2

def get_noaa_data(start_time, end_time, cols, station_list):
    isd = NoaaIsdWeather(start_time, end_time, cols=cols)
    # Read into Pandas data frame.
    noaa_df = isd.to_pandas_dataframe()
    noaa_df = noaa_df.rename(columns={"stationName": "station_name"})
    
    df_filtered = noaa_df[noaa_df["usaf"].isin(station_list)]
    df_filtered.reset_index(drop=True)
    
    # Enrich with time features
    df_enriched = enrich_weather_noaa_data(df_filtered)
    
    return df_enriched

def get_featurized_noaa_df(start_time, end_time, cols, station_list):
    df_1 = get_noaa_data(start_time - timedelta(days=7), start_time - timedelta(seconds=1), cols, station_list)
    df_2 = get_noaa_data(start_time, end_time, cols, station_list)
    noaa_df = pd.concat([df_1, df_2])
    
    print("Adding window feature")
    df_window = add_window_col(noaa_df)
    
    cat_columns = df_window.dtypes == object
    cat_columns = cat_columns[cat_columns == True]
    
    print("Encoding categorical columns")
    df_encoded = pd.get_dummies(df_window, columns=cat_columns.keys().tolist())
    
    print("Dropping unnecessary columns")
    df_featurized = df_encoded.drop(['windAngle', 'windSpeed', 'datetime', 'elevation'], axis=1).dropna().drop_duplicates()
    
    return df_featurized

In [None]:
# Train model on Jan 1 - 14, 2009 data
df = get_featurized_noaa_df(datetime(2009, 1, 1), datetime(2009, 1, 14, 23, 59, 59), columns, usaf_list)
df.head()

In [None]:
label = "temperature"
x_df = df.drop(label, axis=1)
y_df = df[[label]]
x_train, x_test, y_train, y_test = train_test_split(df, y_df, test_size=0.2, random_state=223)
print(x_train.shape, x_test.shape, y_train.shape, y_test.shape)

training_dir = 'outputs/training'
training_file = "training.csv"

# Generate training dataframe to register as Training Dataset
os.makedirs(training_dir, exist_ok=True)
training_df = pd.merge(x_train.drop(label, axis=1), y_train, left_index=True, right_index=True)
training_df.to_csv(training_dir + "/" + training_file)

## Create/Register Training Dataset

In [None]:
dataset_name = "training_dataset"
dstore = ws.get_default_datastore()
dstore.upload(training_dir, "data/training", show_progress=True)

datastore_path = [(dstore, 'data/training/training.csv')]
trainingDataset = Dataset.Tabular.from_delimited_files(path=datastore_path)
trainingDataset = trainingDataset.register(workspace=ws, name=dataset_name, description="training", create_new_version=True)

datasets = [(Dataset.Scenario.TRAINING, trainingDataset)]
print("Dataset registration done.\n")
datasets

## Train and Save Model

In [None]:
import lightgbm as lgb

train = lgb.Dataset(data=x_train, 
                    label=y_train)

test = lgb.Dataset(data=x_test, 
                   label=y_test,
                   reference=train)

params = {'learning_rate'    : 0.1,
          'boosting'         : 'gbdt',
          'metric'           : 'rmse',
          'feature_fraction' : 1,
          'bagging_fraction' : 1,
          'max_depth': 6,
          'num_leaves'       : 31,
          'objective'        : 'regression',
          'bagging_freq'     : 1,
          "verbose": -1,
          'min_data_per_leaf': 100}

model = lgb.train(params, 
                  num_boost_round=500,
                  train_set=train,
                  valid_sets=[train, test],
                  verbose_eval=50,
                  early_stopping_rounds=25)

In [None]:
model_file = 'outputs/{}.pkl'.format(model_name)

os.makedirs('outputs', exist_ok=True)
joblib.dump(model, model_file)

## Register Model

In [None]:
model = Model.register(model_path=model_file,
                       model_name=model_name,
                       workspace=ws,
                       datasets=datasets)

print(model_name, image_name, service_name, model)

# Deploy Model To AKS

## Prepare Environment

In [None]:
myenv = CondaDependencies.create(conda_packages=['numpy','scikit-learn', 'joblib', 'lightgbm', 'pandas'],
                                 pip_packages=['azureml-monitoring', 'azureml-defaults'])

with open("myenv.yml","w") as f:
    f.write(myenv.serialize_to_string())

## Create Image

In [None]:
# Image creation may take up to 15 minutes.

image_name = image_name + str(model.version)

if not image_name in ws.images:
    # Use the score.py defined in this directory as the execution script
    # NOTE: The Model Data Collector must be enabled in the execution script for DataDrift to run correctly
    image_config = ContainerImage.image_configuration(execution_script="score.py",
                                                      runtime="python",
                                                      conda_file="myenv.yml",
                                                      description="Image with weather dataset model")
    image = ContainerImage.create(name=image_name,
                                  models=[model],
                                  image_config=image_config,
                                  workspace=ws)

    image.wait_for_creation(show_output=True)
else:
    image = ws.images[image_name]

## Create Compute Target

In [None]:
aks_name = 'dd-demo-e2e'
prov_config = AksCompute.provisioning_configuration()

if not aks_name in ws.compute_targets:
    aks_target = ComputeTarget.create(workspace=ws,
                                      name=aks_name,
                                      provisioning_configuration=prov_config)

    aks_target.wait_for_completion(show_output=True)
    print(aks_target.provisioning_state)
    print(aks_target.provisioning_errors)
else:
    aks_target=ws.compute_targets[aks_name]

## Deploy Service

In [None]:
aks_service_name = service_name

if not aks_service_name in ws.webservices:
    aks_config = AksWebservice.deploy_configuration(collect_model_data=True, enable_app_insights=True)
    aks_service = Webservice.deploy_from_image(workspace=ws,
                                               name=aks_service_name,
                                               image=image,
                                               deployment_config=aks_config,
                                               deployment_target=aks_target)
    aks_service.wait_for_deployment(show_output=True)
    print(aks_service.state)
else:
    aks_service = ws.webservices[aks_service_name]

# Run DataDrift Analysis

## Send Scoring Data to Service

### Download Scoring Data

In [None]:
# Score Model on March 15, 2016 data
scoring_df = get_noaa_data(datetime(2016, 3, 15) - timedelta(days=7), datetime(2016, 3, 16),  columns, usaf_list)
# Add the window feature column
scoring_df = add_window_col(scoring_df)

# Drop features not used by the model
print("Dropping unnecessary columns")
scoring_df = scoring_df.drop(['windAngle', 'windSpeed', 'datetime', 'elevation'], axis=1).dropna()
scoring_df.head()

In [None]:
# One Hot Encode the scoring dataset to match the training dataset schema
columns = list(model.datasets["training"][0].to_pandas_dataframe().columns)
extra_cols = ['Path', 'Column1']
training_columns = [c for c in columns if c not in extra_cols]

categorical_columns = scoring_df.dtypes == object
categorical_columns = categorical_columns[categorical_columns == True]

test_df = pd.get_dummies(scoring_df[categorical_columns.keys().tolist()])
encoded_df = scoring_df.join(test_df)

# Populate missing OHE columns with 0 values to match traning dataset schema
difference = list(set(training_columns) - set(encoded_df.columns.tolist()))
for col in difference:
    encoded_df[col] = 0
encoded_df.head()

In [None]:
# Serialize dataframe to list of row dictionaries
encoded_dict = encoded_df.to_dict('records')

### Submit Scoring Data to Service

In [None]:
%%time

# retreive the API keys. AML generates two keys.
key1, key2 = aks_service.get_keys()

total_count = len(scoring_df)
i = 0
load = []
for row in encoded_dict:
    load.append(row)
    i = i + 1
    if i % 100 == 0:
        payload = json.dumps({"data": load})
        
        # construct raw HTTP request and send to the service
        payload_binary = bytes(payload,encoding = 'utf8')
        headers = {'Content-Type':'application/json', 'Authorization': 'Bearer ' + key1}
        resp = requests.post(aks_service.scoring_uri, payload_binary, headers=headers)
        
        print("prediction:", resp.content, "Progress: {}/{}".format(i, total_count))   

        load = []
        time.sleep(3)

We need to wait up to 10 minutes for the Model Data Collector to dump the model input and inference data to storage in the Workspace, where it's used by the DataDriftDetector job.

In [None]:
time.sleep(600)

## Configure DataDrift

In [None]:
services = [service_name]
start = datetime.now() - timedelta(days=2)
end = datetime(year=2020, month=1, day=22, hour=15, minute=16)
feature_list = ['latitude', 'longitude', 'sine_hourofday', 'cosine_hourofday', 'temperature-7']
alert_config = AlertConfiguration([email_address]) if email_address else None

# there will be an exception indicating using get() method if DataDrift object already exist
try:
    # With consideration for data latency, by default the scheduled jobs will process previous day's data. 
    # In this demo, scoring data will be generated from current day, therefore set schedule start time to next day to process current day's data.
    datadrift = DataDriftDetector.create(ws, model.name, model.version, services, frequency="Day", schedule_start=datetime.utcnow() + timedelta(days=1), alert_config=alert_config)
except KeyError:
    datadrift = DataDriftDetector.get(ws, model.name, model.version)
    
print("Details of DataDrift Object:\n{}".format(datadrift))

## Run an Adhoc DataDriftDetector Run

In [None]:
now = datetime.utcnow()
target_date = datetime(now.year, now.month, now.day)
run = datadrift.run(target_date, services, feature_list=feature_list, create_compute_target=True)

In [None]:
child_run = list(run.get_children())[0]
RunDetails(child_run).show()

## Get Drift Analysis Results

In [None]:
child_run.wait_for_completion(wait_post_processing=True)

drift_metrics = datadrift.get_output(run_id=run.id)
drift_metrics

In [None]:
# Show all drift figures, one per serivice.
# If setting with_details is False (by default), only drift will be shown; if it's True, all details will be shown.

drift_figures = datadrift.show()

## Enable DataDrift Schedule

In [None]:
datadrift.enable_schedule()