From 88f6a966cc6346f75848ac13512e48ffaab6f060 Mon Sep 17 00:00:00 2001 From: Roope Astala Date: Tue, 22 Jan 2019 13:32:59 -0500 Subject: [PATCH] RAPIDS sample --- .../RAPIDS/azure-ml-with-nvidia-rapids.ipynb | 409 ++++++++++++++ contrib/RAPIDS/process_data.py | 500 ++++++++++++++++++ 2 files changed, 909 insertions(+) create mode 100644 contrib/RAPIDS/azure-ml-with-nvidia-rapids.ipynb create mode 100644 contrib/RAPIDS/process_data.py diff --git a/contrib/RAPIDS/azure-ml-with-nvidia-rapids.ipynb b/contrib/RAPIDS/azure-ml-with-nvidia-rapids.ipynb new file mode 100644 index 00000000..6440b720 --- /dev/null +++ b/contrib/RAPIDS/azure-ml-with-nvidia-rapids.ipynb @@ -0,0 +1,409 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Copyright (c) Microsoft Corporation. All rights reserved.\n", + "\n", + "Licensed under the MIT License." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# NVIDIA RAPIDS in Azure Machine Learning" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The [RAPIDS](https://www.developer.nvidia.com/rapids) suite of software libraries from NVIDIA enables the execution of end-to-end data science and analytics pipelines entirely on GPUs. In many machine learning projects, a significant portion of the model training time is spent in setting up the data; this stage of the process is known as Extraction, Transformation and Loading, or ETL. By using the DataFrame API for ETL\u00c2\u00a0and GPU-capable ML algorithms in RAPIDS, data preparation and training models can be done in GPU-accelerated end-to-end pipelines without incurring serialization costs between the pipeline stages. This notebook demonstrates how to use NVIDIA RAPIDS to prepare data and train model\u00c2\u00a0in Azure.\n", + " \n", + "In this notebook, we will do the following:\n", + " \n", + "* Create an Azure Machine Learning Workspace\n", + "* Create an AMLCompute target\n", + "* Use a script to process our data and train a model\n", + "* Obtain the data required to run this sample\n", + "* Create an AML run configuration to launch a machine learning job\n", + "* Run the script to prepare data for training and train the model\n", + " \n", + "Prerequisites:\n", + "* An Azure subscription to create a Machine Learning Workspace\n", + "* Familiarity with the Azure ML SDK (refer to [notebook samples](https://github.com/Azure/MachineLearningNotebooks))\n", + "* A Jupyter notebook environment with Azure Machine Learning SDK installed. Refer to instructions to [setup the environment](https://docs.microsoft.com/en-us/azure/machine-learning/service/how-to-configure-environment#local)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Verify if Azure ML SDK is installed" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import azureml.core\n", + "print(\"SDK version:\", azureml.core.VERSION)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "from azureml.core import Workspace, Experiment\n", + "from azureml.core.compute import AmlCompute, ComputeTarget\n", + "from azureml.data.data_reference import DataReference\n", + "from azureml.core.runconfig import RunConfiguration\n", + "from azureml.core import ScriptRunConfig\n", + "from azureml.widgets import RunDetails" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Create Azure ML Workspace" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The following step is optional if you already have a workspace. If you want to use an existing workspace, then\n", + "skip this workspace creation step and move on to the next step to load the workspace.\n", + " \n", + "Important: in the code cell below, be sure to set the correct values for the subscription_id, \n", + "resource_group, workspace_name, region before executing this code cell." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "subscription_id = os.environ.get(\"SUBSCRIPTION_ID\", \"\")\n", + "resource_group = os.environ.get(\"RESOURCE_GROUP\", \"\")\n", + "workspace_name = os.environ.get(\"WORKSPACE_NAME\", \"\")\n", + "workspace_region = os.environ.get(\"WORKSPACE_REGION\", \"\")\n", + "\n", + "ws = Workspace.create(workspace_name, subscription_id=subscription_id, resource_group=resource_group, location=workspace_region)\n", + "\n", + "# write config to a local directory for future use\n", + "ws.write_config()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Load existing Workspace" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "ws = Workspace.from_config()\n", + "# if a locally-saved configuration file for the workspace is not available, use the following to load workspace\n", + "# ws = Workspace(subscription_id=subscription_id, resource_group=resource_group, workspace_name=workspace_name)\n", + "print('Workspace name: ' + ws.name, \n", + " 'Azure region: ' + ws.location, \n", + " 'Subscription id: ' + ws.subscription_id, \n", + " 'Resource group: ' + ws.resource_group, sep = '\\n')\n", + "\n", + "scripts_folder = \"scripts_folder\"\n", + "\n", + "if not os.path.isdir(scripts_folder):\n", + " os.mkdir(scripts_folder)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Create AML Compute Target" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Because NVIDIA RAPIDS requires P40 or V100 GPUs, the user needs to specify compute targets from one of [NC_v3](https://docs.microsoft.com/en-us/azure/virtual-machines/windows/sizes-gpu#ncv3-series), [NC_v2](https://docs.microsoft.com/en-us/azure/virtual-machines/windows/sizes-gpu#ncv2-series), [ND](https://docs.microsoft.com/en-us/azure/virtual-machines/windows/sizes-gpu#nd-series) or [ND_v2](https://docs.microsoft.com/en-us/azure/virtual-machines/windows/sizes-gpu#ndv2-series-preview) virtual machine types in Azure; these are the families of virtual machines in Azure that are provisioned with these GPUs.\n", + " \n", + "Pick one of the supported VM SKUs based on the number of GPUs you want to use for ETL and training in RAPIDS.\n", + " \n", + "The script in this notebook is implemented for single-machine scenarios. An example supporting multiple nodes will be published later." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "gpu_cluster_name = \"gpucluster\"\n", + "\n", + "if gpu_cluster_name in ws.compute_targets:\n", + " gpu_cluster = ws.compute_targets[gpu_cluster_name]\n", + " if gpu_cluster and type(gpu_cluster) is AmlCompute:\n", + " print('found compute target. just use it. ' + gpu_cluster_name)\n", + "else:\n", + " print(\"creating new cluster\")\n", + " # vm_size parameter below could be modified to one of the RAPIDS-supported VM types\n", + " provisioning_config = AmlCompute.provisioning_configuration(vm_size = \"Standard_NC6s_v2\", min_nodes=1, max_nodes = 1)\n", + "\n", + " # create the cluster\n", + " gpu_cluster = ComputeTarget.create(ws, gpu_cluster_name, provisioning_config)\n", + " gpu_cluster.wait_for_completion(show_output=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Script to process data and train model" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The _process_data.py_ script used in the step below is a slightly modified implementation of [RAPIDS E2E example](https://github.com/rapidsai/notebooks/blob/master/mortgage/E2E.ipynb)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# copy process_data.py into the script folder\n", + "import shutil\n", + "shutil.copy('./process_data.py', os.path.join(scripts_folder, 'process_data.py'))\n", + "\n", + "with open(os.path.join(scripts_folder, './process_data.py'), 'r') as process_data_script:\n", + " print(process_data_script.read())" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Data required to run this sample" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "This sample uses [Fannie Mae\u00e2\u20ac\u2122s Single-Family Loan Performance Data](http://www.fanniemae.com/portal/funding-the-market/data/loan-performance-data.html). Refer to the 'Available mortgage datasets' section in [instructions](https://rapidsai.github.io/demos/datasets/mortgage-data) to get sample data.\n", + "\n", + "Once you obtain access to the data, you will need to make this data available in an [Azure Machine Learning Datastore](https://docs.microsoft.com/en-us/azure/machine-learning/service/how-to-access-data), for use in this sample." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Important: The following step assumes the data is uploaded to the Workspace's default data store under a folder named 'mortgagedata2000_01'. Note that uploading data to the Workspace's default data store is not necessary and the data can be referenced from any datastore, e.g., from Azure Blob or File service, once it is added as a datastore to the workspace. The path_on_datastore parameter needs to be updated, depending on where the data is available. The directory where the data is available should have the following folder structure, as the process_data.py script expects this directory structure:\n", + "* _<data directory>_/acq\n", + "* _<data directory>_/perf\n", + "* _names.csv_\n", + "\n", + "The 'acq' and 'perf' refer to directories containing data files. The _<data directory>_ is the path specified in _path_on_datastore_ parameter in the step below." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "ds = ws.get_default_datastore()\n", + "\n", + "# download and uncompress data in a local directory before uploading to data store\n", + "# directory specified in src_dir parameter below should have the acq, perf directories with data and names.csv file\n", + "# ds.upload(src_dir='', target_path='mortgagedata2000_01', overwrite=True, show_progress=True)\n", + "\n", + "# data already uploaded to the datastore\n", + "data_ref = DataReference(data_reference_name='data', datastore=ds, path_on_datastore='mortgagedata2000_01')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Create AML run configuration to launch a machine learning job" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "AML allows the option of using existing Docker images with prebuilt conda environments. The following step use an existing image from [Docker Hub](https://hub.docker.com/r/rapidsai/rapidsai/)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "run_config = RunConfiguration()\n", + "run_config.framework = 'python'\n", + "run_config.environment.python.user_managed_dependencies = True\n", + "# use conda environment named 'rapids' available in the Docker image\n", + "# this conda environment does not include azureml-defaults package that is required for using AML functionality like metrics tracking, model management etc.\n", + "run_config.environment.python.interpreter_path = '/conda/envs/rapids/bin/python'\n", + "run_config.target = gpu_cluster_name\n", + "run_config.environment.docker.enabled = True\n", + "run_config.environment.docker.gpu_support = True\n", + "# if registry is not mentioned the image is pulled from Docker Hub\n", + "run_config.environment.docker.base_image = \"rapidsai/rapidsai:cuda9.2_ubuntu16.04_root\"\n", + "run_config.environment.spark.precache_packages = False\n", + "run_config.data_references={'data':data_ref.to_config()}" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Wrapper function to submit Azure Machine Learning experiment" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# parameter cpu_predictor indicates if training should be done on CPU. If set to true, GPUs are used *only* for ETL and *not* for training\n", + "# parameter num_gpu indicates number of GPUs to use among the GPUs available in the VM for ETL and if cpu_predictor is false, for training as well \n", + "def run_rapids_experiment(cpu_training, gpu_count):\n", + " # any value between 1-4 is allowed here depending the type of VMs available in gpu_cluster\n", + " if gpu_count not in [1, 2, 3, 4]:\n", + " raise Exception('Value specified for the number of GPUs to use {0} is invalid'.format(gpu_count))\n", + "\n", + " # following data partition mapping is empirical (specific to GPUs used and current data partitioning scheme) and may need to be tweaked\n", + " gpu_count_data_partition_mapping = {1: 2, 2: 4, 3: 5, 4: 7}\n", + " part_count = gpu_count_data_partition_mapping[gpu_count]\n", + "\n", + " end_year = 2000\n", + " if gpu_count > 2:\n", + " end_year = 2001 # use more data with more GPUs\n", + "\n", + " src = ScriptRunConfig(source_directory=scripts_folder, \n", + " script='process_data.py', \n", + " arguments = ['--num_gpu', gpu_count, '--data_dir', str(data_ref),\n", + " '--part_count', part_count, '--end_year', end_year,\n", + " '--cpu_predictor', cpu_training\n", + " ],\n", + " run_config=run_config\n", + " )\n", + "\n", + " exp = Experiment(ws, 'rapidstest')\n", + " run = exp.submit(config=src)\n", + " RunDetails(run).show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Submit experiment (ETL & training on GPU)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "cpu_predictor = False\n", + "# the value for num_gpu should be less than or equal to the number of GPUs available in the VM\n", + "num_gpu = 1 \n", + "# train using CPU, use GPU for both ETL and training\n", + "run_rapids_experiment(cpu_predictor, num_gpu)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Submit experiment (ETL on GPU, training on CPU)\n", + "\n", + "To observe performance difference between GPU-accelerated RAPIDS based training with CPU-only training, set 'cpu_predictor' predictor to 'True' and rerun the experiment" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "cpu_predictor = True\n", + "# the value for num_gpu should be less than or equal to the number of GPUs available in the VM\n", + "num_gpu = 1\n", + "# train using CPU, use GPU for ETL\n", + "run_rapids_experiment(cpu_predictor, num_gpu)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Delete cluster" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# delete the cluster\n", + "# gpu_cluster.delete()" + ] + } + ], + "metadata": { + "authors": [ + { + "name": "ksivas" + } + ], + "kernelspec": { + "display_name": "Python 3.6", + "language": "python", + "name": "python36" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.6" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} \ No newline at end of file diff --git a/contrib/RAPIDS/process_data.py b/contrib/RAPIDS/process_data.py new file mode 100644 index 00000000..a7600b40 --- /dev/null +++ b/contrib/RAPIDS/process_data.py @@ -0,0 +1,500 @@ +# License Info: https://github.com/rapidsai/notebooks/blob/master/LICENSE +import numpy as np +import datetime +import dask_xgboost as dxgb_gpu +import dask +import dask_cudf +from dask.delayed import delayed +from dask.distributed import Client, wait +import xgboost as xgb +import cudf +from cudf.dataframe import DataFrame +from collections import OrderedDict +import gc +from glob import glob +import os +import argparse + +parser = argparse.ArgumentParser("rapidssample") +parser.add_argument("--data_dir", type=str, help="location of data") +parser.add_argument("--num_gpu", type=int, help="Number of GPUs to use", default=1) +parser.add_argument("--part_count", type=int, help="Number of data files to train against", default=2) +parser.add_argument("--end_year", type=int, help="Year to end the data load", default=2000) +parser.add_argument("--cpu_predictor", type=str, help="Flag to use CPU for prediction", default='False') +parser.add_argument('-f', type=str, default='') # added for notebook execution scenarios +args = parser.parse_args() +data_dir = args.data_dir +num_gpu = args.num_gpu +part_count = args.part_count +end_year = args.end_year +cpu_predictor = args.cpu_predictor.lower() in ('yes', 'true', 't', 'y', '1') + +print('data_dir = {0}'.format(data_dir)) +print('num_gpu = {0}'.format(num_gpu)) +print('part_count = {0}'.format(part_count)) +part_count = part_count + 1 # adding one because the usage below is not inclusive +print('end_year = {0}'.format(end_year)) +print('cpu_predictor = {0}'.format(cpu_predictor)) + +import subprocess + +cmd = "hostname --all-ip-addresses" +process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE) +output, error = process.communicate() +IPADDR = str(output.decode()).split()[0] +print('IPADDR is {0}'.format(IPADDR)) + +cmd = "/rapids/notebooks/utils/dask-setup.sh 0" +process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE) +output, error = process.communicate() + +cmd = "/rapids/notebooks/utils/dask-setup.sh rapids " + str(num_gpu) + " 8786 8787 8790 " + str(IPADDR) + " MASTER" +process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE) +output, error = process.communicate() + +print(output.decode()) + +import dask +from dask.delayed import delayed +from dask.distributed import Client, wait + +_client = IPADDR + str(":8786") + +client = dask.distributed.Client(_client) + +def initialize_rmm_pool(): + from librmm_cffi import librmm_config as rmm_cfg + + rmm_cfg.use_pool_allocator = True + #rmm_cfg.initial_pool_size = 2<<30 # set to 2GiB. Default is 1/2 total GPU memory + import cudf + return cudf._gdf.rmm_initialize() + +def initialize_rmm_no_pool(): + from librmm_cffi import librmm_config as rmm_cfg + + rmm_cfg.use_pool_allocator = False + import cudf + return cudf._gdf.rmm_initialize() + +def run_dask_task(func, **kwargs): + task = func(**kwargs) + return task + +def process_quarter_gpu(year=2000, quarter=1, perf_file=""): + ml_arrays = run_dask_task(delayed(run_gpu_workflow), + quarter=quarter, + year=year, + perf_file=perf_file) + return client.compute(ml_arrays, + optimize_graph=False, + fifo_timeout="0ms" + ) + +def null_workaround(df, **kwargs): + for column, data_type in df.dtypes.items(): + if str(data_type) == "category": + df[column] = df[column].astype('int32').fillna(-1) + if str(data_type) in ['int8', 'int16', 'int32', 'int64', 'float32', 'float64']: + df[column] = df[column].fillna(-1) + return df + +def run_gpu_workflow(quarter=1, year=2000, perf_file="", **kwargs): + names = gpu_load_names() + acq_gdf = gpu_load_acquisition_csv(acquisition_path= acq_data_path + "/Acquisition_" + + str(year) + "Q" + str(quarter) + ".txt") + acq_gdf = acq_gdf.merge(names, how='left', on=['seller_name']) + acq_gdf.drop_column('seller_name') + acq_gdf['seller_name'] = acq_gdf['new'] + acq_gdf.drop_column('new') + perf_df_tmp = gpu_load_performance_csv(perf_file) + gdf = perf_df_tmp + everdf = create_ever_features(gdf) + delinq_merge = create_delinq_features(gdf) + everdf = join_ever_delinq_features(everdf, delinq_merge) + del(delinq_merge) + joined_df = create_joined_df(gdf, everdf) + testdf = create_12_mon_features(joined_df) + joined_df = combine_joined_12_mon(joined_df, testdf) + del(testdf) + perf_df = final_performance_delinquency(gdf, joined_df) + del(gdf, joined_df) + final_gdf = join_perf_acq_gdfs(perf_df, acq_gdf) + del(perf_df) + del(acq_gdf) + final_gdf = last_mile_cleaning(final_gdf) + return final_gdf + +def gpu_load_performance_csv(performance_path, **kwargs): + """ Loads performance data + + Returns + ------- + GPU DataFrame + """ + + cols = [ + "loan_id", "monthly_reporting_period", "servicer", "interest_rate", "current_actual_upb", + "loan_age", "remaining_months_to_legal_maturity", "adj_remaining_months_to_maturity", + "maturity_date", "msa", "current_loan_delinquency_status", "mod_flag", "zero_balance_code", + "zero_balance_effective_date", "last_paid_installment_date", "foreclosed_after", + "disposition_date", "foreclosure_costs", "prop_preservation_and_repair_costs", + "asset_recovery_costs", "misc_holding_expenses", "holding_taxes", "net_sale_proceeds", + "credit_enhancement_proceeds", "repurchase_make_whole_proceeds", "other_foreclosure_proceeds", + "non_interest_bearing_upb", "principal_forgiveness_upb", "repurchase_make_whole_proceeds_flag", + "foreclosure_principal_write_off_amount", "servicing_activity_indicator" + ] + + dtypes = OrderedDict([ + ("loan_id", "int64"), + ("monthly_reporting_period", "date"), + ("servicer", "category"), + ("interest_rate", "float64"), + ("current_actual_upb", "float64"), + ("loan_age", "float64"), + ("remaining_months_to_legal_maturity", "float64"), + ("adj_remaining_months_to_maturity", "float64"), + ("maturity_date", "date"), + ("msa", "float64"), + ("current_loan_delinquency_status", "int32"), + ("mod_flag", "category"), + ("zero_balance_code", "category"), + ("zero_balance_effective_date", "date"), + ("last_paid_installment_date", "date"), + ("foreclosed_after", "date"), + ("disposition_date", "date"), + ("foreclosure_costs", "float64"), + ("prop_preservation_and_repair_costs", "float64"), + ("asset_recovery_costs", "float64"), + ("misc_holding_expenses", "float64"), + ("holding_taxes", "float64"), + ("net_sale_proceeds", "float64"), + ("credit_enhancement_proceeds", "float64"), + ("repurchase_make_whole_proceeds", "float64"), + ("other_foreclosure_proceeds", "float64"), + ("non_interest_bearing_upb", "float64"), + ("principal_forgiveness_upb", "float64"), + ("repurchase_make_whole_proceeds_flag", "category"), + ("foreclosure_principal_write_off_amount", "float64"), + ("servicing_activity_indicator", "category") + ]) + + print(performance_path) + + return cudf.read_csv(performance_path, names=cols, delimiter='|', dtype=list(dtypes.values()), skiprows=1) + +def gpu_load_acquisition_csv(acquisition_path, **kwargs): + """ Loads acquisition data + + Returns + ------- + GPU DataFrame + """ + + cols = [ + 'loan_id', 'orig_channel', 'seller_name', 'orig_interest_rate', 'orig_upb', 'orig_loan_term', + 'orig_date', 'first_pay_date', 'orig_ltv', 'orig_cltv', 'num_borrowers', 'dti', 'borrower_credit_score', + 'first_home_buyer', 'loan_purpose', 'property_type', 'num_units', 'occupancy_status', 'property_state', + 'zip', 'mortgage_insurance_percent', 'product_type', 'coborrow_credit_score', 'mortgage_insurance_type', + 'relocation_mortgage_indicator' + ] + + dtypes = OrderedDict([ + ("loan_id", "int64"), + ("orig_channel", "category"), + ("seller_name", "category"), + ("orig_interest_rate", "float64"), + ("orig_upb", "int64"), + ("orig_loan_term", "int64"), + ("orig_date", "date"), + ("first_pay_date", "date"), + ("orig_ltv", "float64"), + ("orig_cltv", "float64"), + ("num_borrowers", "float64"), + ("dti", "float64"), + ("borrower_credit_score", "float64"), + ("first_home_buyer", "category"), + ("loan_purpose", "category"), + ("property_type", "category"), + ("num_units", "int64"), + ("occupancy_status", "category"), + ("property_state", "category"), + ("zip", "int64"), + ("mortgage_insurance_percent", "float64"), + ("product_type", "category"), + ("coborrow_credit_score", "float64"), + ("mortgage_insurance_type", "float64"), + ("relocation_mortgage_indicator", "category") + ]) + + print(acquisition_path) + + return cudf.read_csv(acquisition_path, names=cols, delimiter='|', dtype=list(dtypes.values()), skiprows=1) + +def gpu_load_names(**kwargs): + """ Loads names used for renaming the banks + + Returns + ------- + GPU DataFrame + """ + + cols = [ + 'seller_name', 'new' + ] + + dtypes = OrderedDict([ + ("seller_name", "category"), + ("new", "category"), + ]) + + return cudf.read_csv(col_names_path, names=cols, delimiter='|', dtype=list(dtypes.values()), skiprows=1) + +def create_ever_features(gdf, **kwargs): + everdf = gdf[['loan_id', 'current_loan_delinquency_status']] + everdf = everdf.groupby('loan_id', method='hash').max() + del(gdf) + everdf['ever_30'] = (everdf['max_current_loan_delinquency_status'] >= 1).astype('int8') + everdf['ever_90'] = (everdf['max_current_loan_delinquency_status'] >= 3).astype('int8') + everdf['ever_180'] = (everdf['max_current_loan_delinquency_status'] >= 6).astype('int8') + everdf.drop_column('max_current_loan_delinquency_status') + return everdf + +def create_delinq_features(gdf, **kwargs): + delinq_gdf = gdf[['loan_id', 'monthly_reporting_period', 'current_loan_delinquency_status']] + del(gdf) + delinq_30 = delinq_gdf.query('current_loan_delinquency_status >= 1')[['loan_id', 'monthly_reporting_period']].groupby('loan_id', method='hash').min() + delinq_30['delinquency_30'] = delinq_30['min_monthly_reporting_period'] + delinq_30.drop_column('min_monthly_reporting_period') + delinq_90 = delinq_gdf.query('current_loan_delinquency_status >= 3')[['loan_id', 'monthly_reporting_period']].groupby('loan_id', method='hash').min() + delinq_90['delinquency_90'] = delinq_90['min_monthly_reporting_period'] + delinq_90.drop_column('min_monthly_reporting_period') + delinq_180 = delinq_gdf.query('current_loan_delinquency_status >= 6')[['loan_id', 'monthly_reporting_period']].groupby('loan_id', method='hash').min() + delinq_180['delinquency_180'] = delinq_180['min_monthly_reporting_period'] + delinq_180.drop_column('min_monthly_reporting_period') + del(delinq_gdf) + delinq_merge = delinq_30.merge(delinq_90, how='left', on=['loan_id'], type='hash') + delinq_merge['delinquency_90'] = delinq_merge['delinquency_90'].fillna(np.dtype('datetime64[ms]').type('1970-01-01').astype('datetime64[ms]')) + delinq_merge = delinq_merge.merge(delinq_180, how='left', on=['loan_id'], type='hash') + delinq_merge['delinquency_180'] = delinq_merge['delinquency_180'].fillna(np.dtype('datetime64[ms]').type('1970-01-01').astype('datetime64[ms]')) + del(delinq_30) + del(delinq_90) + del(delinq_180) + return delinq_merge + +def join_ever_delinq_features(everdf_tmp, delinq_merge, **kwargs): + everdf = everdf_tmp.merge(delinq_merge, on=['loan_id'], how='left', type='hash') + del(everdf_tmp) + del(delinq_merge) + everdf['delinquency_30'] = everdf['delinquency_30'].fillna(np.dtype('datetime64[ms]').type('1970-01-01').astype('datetime64[ms]')) + everdf['delinquency_90'] = everdf['delinquency_90'].fillna(np.dtype('datetime64[ms]').type('1970-01-01').astype('datetime64[ms]')) + everdf['delinquency_180'] = everdf['delinquency_180'].fillna(np.dtype('datetime64[ms]').type('1970-01-01').astype('datetime64[ms]')) + return everdf + +def create_joined_df(gdf, everdf, **kwargs): + test = gdf[['loan_id', 'monthly_reporting_period', 'current_loan_delinquency_status', 'current_actual_upb']] + del(gdf) + test['timestamp'] = test['monthly_reporting_period'] + test.drop_column('monthly_reporting_period') + test['timestamp_month'] = test['timestamp'].dt.month + test['timestamp_year'] = test['timestamp'].dt.year + test['delinquency_12'] = test['current_loan_delinquency_status'] + test.drop_column('current_loan_delinquency_status') + test['upb_12'] = test['current_actual_upb'] + test.drop_column('current_actual_upb') + test['upb_12'] = test['upb_12'].fillna(999999999) + test['delinquency_12'] = test['delinquency_12'].fillna(-1) + + joined_df = test.merge(everdf, how='left', on=['loan_id'], type='hash') + del(everdf) + del(test) + + joined_df['ever_30'] = joined_df['ever_30'].fillna(-1) + joined_df['ever_90'] = joined_df['ever_90'].fillna(-1) + joined_df['ever_180'] = joined_df['ever_180'].fillna(-1) + joined_df['delinquency_30'] = joined_df['delinquency_30'].fillna(-1) + joined_df['delinquency_90'] = joined_df['delinquency_90'].fillna(-1) + joined_df['delinquency_180'] = joined_df['delinquency_180'].fillna(-1) + + joined_df['timestamp_year'] = joined_df['timestamp_year'].astype('int32') + joined_df['timestamp_month'] = joined_df['timestamp_month'].astype('int32') + + return joined_df + +def create_12_mon_features(joined_df, **kwargs): + testdfs = [] + n_months = 12 + for y in range(1, n_months + 1): + tmpdf = joined_df[['loan_id', 'timestamp_year', 'timestamp_month', 'delinquency_12', 'upb_12']] + tmpdf['josh_months'] = tmpdf['timestamp_year'] * 12 + tmpdf['timestamp_month'] + tmpdf['josh_mody_n'] = ((tmpdf['josh_months'].astype('float64') - 24000 - y) / 12).floor() + tmpdf = tmpdf.groupby(['loan_id', 'josh_mody_n'], method='hash').agg({'delinquency_12': 'max','upb_12': 'min'}) + tmpdf['delinquency_12'] = (tmpdf['max_delinquency_12']>3).astype('int32') + tmpdf['delinquency_12'] +=(tmpdf['min_upb_12']==0).astype('int32') + tmpdf.drop_column('max_delinquency_12') + tmpdf['upb_12'] = tmpdf['min_upb_12'] + tmpdf.drop_column('min_upb_12') + tmpdf['timestamp_year'] = (((tmpdf['josh_mody_n'] * n_months) + 24000 + (y - 1)) / 12).floor().astype('int16') + tmpdf['timestamp_month'] = np.int8(y) + tmpdf.drop_column('josh_mody_n') + testdfs.append(tmpdf) + del(tmpdf) + del(joined_df) + + return cudf.concat(testdfs) + +def combine_joined_12_mon(joined_df, testdf, **kwargs): + joined_df.drop_column('delinquency_12') + joined_df.drop_column('upb_12') + joined_df['timestamp_year'] = joined_df['timestamp_year'].astype('int16') + joined_df['timestamp_month'] = joined_df['timestamp_month'].astype('int8') + return joined_df.merge(testdf, how='left', on=['loan_id', 'timestamp_year', 'timestamp_month'], type='hash') + +def final_performance_delinquency(gdf, joined_df, **kwargs): + merged = null_workaround(gdf) + joined_df = null_workaround(joined_df) + merged['timestamp_month'] = merged['monthly_reporting_period'].dt.month + merged['timestamp_month'] = merged['timestamp_month'].astype('int8') + merged['timestamp_year'] = merged['monthly_reporting_period'].dt.year + merged['timestamp_year'] = merged['timestamp_year'].astype('int16') + merged = merged.merge(joined_df, how='left', on=['loan_id', 'timestamp_year', 'timestamp_month'], type='hash') + merged.drop_column('timestamp_year') + merged.drop_column('timestamp_month') + return merged + +def join_perf_acq_gdfs(perf, acq, **kwargs): + perf = null_workaround(perf) + acq = null_workaround(acq) + return perf.merge(acq, how='left', on=['loan_id'], type='hash') + +def last_mile_cleaning(df, **kwargs): + drop_list = [ + 'loan_id', 'orig_date', 'first_pay_date', 'seller_name', + 'monthly_reporting_period', 'last_paid_installment_date', 'maturity_date', 'ever_30', 'ever_90', 'ever_180', + 'delinquency_30', 'delinquency_90', 'delinquency_180', 'upb_12', + 'zero_balance_effective_date','foreclosed_after', 'disposition_date','timestamp' + ] + for column in drop_list: + df.drop_column(column) + for col, dtype in df.dtypes.iteritems(): + if str(dtype)=='category': + df[col] = df[col].cat.codes + df[col] = df[col].astype('float32') + df['delinquency_12'] = df['delinquency_12'] > 0 + df['delinquency_12'] = df['delinquency_12'].fillna(False).astype('int32') + for column in df.columns: + df[column] = df[column].fillna(-1) + return df.to_arrow(index=False) + + +# to download data for this notebook, visit https://rapidsai.github.io/demos/datasets/mortgage-data and update the following paths accordingly +acq_data_path = "{0}/acq".format(data_dir) #"/rapids/data/mortgage/acq" +perf_data_path = "{0}/perf".format(data_dir) #"/rapids/data/mortgage/perf" +col_names_path = "{0}/names.csv".format(data_dir) # "/rapids/data/mortgage/names.csv" +start_year = 2000 +#end_year = 2000 # end_year is inclusive -- converted to parameter +#part_count = 2 # the number of data files to train against -- converted to parameter + +client.run(initialize_rmm_pool) + +# NOTE: The ETL calculates additional features which are then dropped before creating the XGBoost DMatrix. +# This can be optimized to avoid calculating the dropped features. +print("Reading ...") +t1 = datetime.datetime.now() +gpu_dfs = [] +gpu_time = 0 +quarter = 1 +year = start_year +count = 0 +while year <= end_year: + for file in glob(os.path.join(perf_data_path + "/Performance_" + str(year) + "Q" + str(quarter) + "*")): + if count < part_count: + gpu_dfs.append(process_quarter_gpu(year=year, quarter=quarter, perf_file=file)) + count += 1 + print('file: {0}'.format(file)) + print('count: {0}'.format(count)) + quarter += 1 + if quarter == 5: + year += 1 + quarter = 1 + +wait(gpu_dfs) +t2 = datetime.datetime.now() +print("Reading time ...") +print(t2-t1) +print('len(gpu_dfs) is {0}'.format(len(gpu_dfs))) + +client.run(cudf._gdf.rmm_finalize) +client.run(initialize_rmm_no_pool) + +dxgb_gpu_params = { + 'nround': 100, + 'max_depth': 8, + 'max_leaves': 2**8, + 'alpha': 0.9, + 'eta': 0.1, + 'gamma': 0.1, + 'learning_rate': 0.1, + 'subsample': 1, + 'reg_lambda': 1, + 'scale_pos_weight': 2, + 'min_child_weight': 30, + 'tree_method': 'gpu_hist', + 'n_gpus': 1, + 'distributed_dask': True, + 'loss': 'ls', + 'objective': 'gpu:reg:linear', + 'max_features': 'auto', + 'criterion': 'friedman_mse', + 'grow_policy': 'lossguide', + 'verbose': True +} + +if cpu_predictor: + print('Training using CPUs') + dxgb_gpu_params['predictor'] = 'cpu_predictor' + dxgb_gpu_params['tree_method'] = 'hist' + dxgb_gpu_params['objective'] = 'reg:linear' + +else: + print('Training using GPUs') + +print('Training parameters are {0}'.format(dxgb_gpu_params)) + +gpu_dfs = [delayed(DataFrame.from_arrow)(gpu_df) for gpu_df in gpu_dfs[:part_count]] + +gpu_dfs = [gpu_df for gpu_df in gpu_dfs] + +wait(gpu_dfs) +tmp_map = [(gpu_df, list(client.who_has(gpu_df).values())[0]) for gpu_df in gpu_dfs] +new_map = {} +for key, value in tmp_map: + if value not in new_map: + new_map[value] = [key] + else: + new_map[value].append(key) + +del(tmp_map) +gpu_dfs = [] +for list_delayed in new_map.values(): + gpu_dfs.append(delayed(cudf.concat)(list_delayed)) + +del(new_map) +gpu_dfs = [(gpu_df[['delinquency_12']], gpu_df[delayed(list)(gpu_df.columns.difference(['delinquency_12']))]) for gpu_df in gpu_dfs] +gpu_dfs = [(gpu_df[0].persist(), gpu_df[1].persist()) for gpu_df in gpu_dfs] +gpu_dfs = [dask.delayed(xgb.DMatrix)(gpu_df[1], gpu_df[0]) for gpu_df in gpu_dfs] +gpu_dfs = [gpu_df.persist() for gpu_df in gpu_dfs] + +gc.collect() +labels = None + +print('str(gpu_dfs) is {0}'.format(str(gpu_dfs))) + +wait(gpu_dfs) +t1 = datetime.datetime.now() +bst = dxgb_gpu.train(client, dxgb_gpu_params, gpu_dfs, labels, num_boost_round=dxgb_gpu_params['nround']) +t2 = datetime.datetime.now() +print("Training time ...") +print(t2-t1) +print('str(bst) is {0}'.format(str(bst))) +print('Exiting script') \ No newline at end of file