mirror of
https://github.com/Azure/MachineLearningNotebooks.git
synced 2025-12-19 17:17:04 -05:00
update RAPIDS 2
This commit is contained in:
@@ -1,9 +1,9 @@
|
||||
# License Info: https://github.com/rapidsai/notebooks/blob/master/LICENSE
|
||||
import numpy as np
|
||||
import datetime
|
||||
import dask_xgboost as dxgb_gpu
|
||||
import dask
|
||||
import dask_cudf
|
||||
from dask_cuda import LocalCUDACluster
|
||||
from dask.delayed import delayed
|
||||
from dask.distributed import Client, wait
|
||||
import xgboost as xgb
|
||||
@@ -15,53 +15,6 @@ from glob import glob
|
||||
import os
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser("rapidssample")
|
||||
parser.add_argument("--data_dir", type=str, help="location of data")
|
||||
parser.add_argument("--num_gpu", type=int, help="Number of GPUs to use", default=1)
|
||||
parser.add_argument("--part_count", type=int, help="Number of data files to train against", default=2)
|
||||
parser.add_argument("--end_year", type=int, help="Year to end the data load", default=2000)
|
||||
parser.add_argument("--cpu_predictor", type=str, help="Flag to use CPU for prediction", default='False')
|
||||
parser.add_argument('-f', type=str, default='') # added for notebook execution scenarios
|
||||
args = parser.parse_args()
|
||||
data_dir = args.data_dir
|
||||
num_gpu = args.num_gpu
|
||||
part_count = args.part_count
|
||||
end_year = args.end_year
|
||||
cpu_predictor = args.cpu_predictor.lower() in ('yes', 'true', 't', 'y', '1')
|
||||
|
||||
print('data_dir = {0}'.format(data_dir))
|
||||
print('num_gpu = {0}'.format(num_gpu))
|
||||
print('part_count = {0}'.format(part_count))
|
||||
part_count = part_count + 1 # adding one because the usage below is not inclusive
|
||||
print('end_year = {0}'.format(end_year))
|
||||
print('cpu_predictor = {0}'.format(cpu_predictor))
|
||||
|
||||
import subprocess
|
||||
|
||||
cmd = "hostname --all-ip-addresses"
|
||||
process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE)
|
||||
output, error = process.communicate()
|
||||
IPADDR = str(output.decode()).split()[0]
|
||||
print('IPADDR is {0}'.format(IPADDR))
|
||||
|
||||
cmd = "/rapids/notebooks/utils/dask-setup.sh 0"
|
||||
process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE)
|
||||
output, error = process.communicate()
|
||||
|
||||
cmd = "/rapids/notebooks/utils/dask-setup.sh rapids " + str(num_gpu) + " 8786 8787 8790 " + str(IPADDR) + " MASTER"
|
||||
process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE)
|
||||
output, error = process.communicate()
|
||||
|
||||
print(output.decode())
|
||||
|
||||
import dask
|
||||
from dask.delayed import delayed
|
||||
from dask.distributed import Client, wait
|
||||
|
||||
_client = IPADDR + str(":8786")
|
||||
|
||||
client = dask.distributed.Client(_client)
|
||||
|
||||
def initialize_rmm_pool():
|
||||
from librmm_cffi import librmm_config as rmm_cfg
|
||||
|
||||
@@ -81,15 +34,17 @@ def run_dask_task(func, **kwargs):
|
||||
task = func(**kwargs)
|
||||
return task
|
||||
|
||||
def process_quarter_gpu(year=2000, quarter=1, perf_file=""):
|
||||
def process_quarter_gpu(client, col_names_path, acq_data_path, year=2000, quarter=1, perf_file=""):
|
||||
dask_client = client
|
||||
ml_arrays = run_dask_task(delayed(run_gpu_workflow),
|
||||
col_path=col_names_path,
|
||||
acq_path=acq_data_path,
|
||||
quarter=quarter,
|
||||
year=year,
|
||||
perf_file=perf_file)
|
||||
return client.compute(ml_arrays,
|
||||
return dask_client.compute(ml_arrays,
|
||||
optimize_graph=False,
|
||||
fifo_timeout="0ms"
|
||||
)
|
||||
fifo_timeout="0ms")
|
||||
|
||||
def null_workaround(df, **kwargs):
|
||||
for column, data_type in df.dtypes.items():
|
||||
@@ -99,9 +54,9 @@ def null_workaround(df, **kwargs):
|
||||
df[column] = df[column].fillna(-1)
|
||||
return df
|
||||
|
||||
def run_gpu_workflow(quarter=1, year=2000, perf_file="", **kwargs):
|
||||
names = gpu_load_names()
|
||||
acq_gdf = gpu_load_acquisition_csv(acquisition_path= acq_data_path + "/Acquisition_"
|
||||
def run_gpu_workflow(col_path, acq_path, quarter=1, year=2000, perf_file="", **kwargs):
|
||||
names = gpu_load_names(col_path=col_path)
|
||||
acq_gdf = gpu_load_acquisition_csv(acquisition_path= acq_path + "/Acquisition_"
|
||||
+ str(year) + "Q" + str(quarter) + ".txt")
|
||||
acq_gdf = acq_gdf.merge(names, how='left', on=['seller_name'])
|
||||
acq_gdf.drop_column('seller_name')
|
||||
@@ -231,7 +186,7 @@ def gpu_load_acquisition_csv(acquisition_path, **kwargs):
|
||||
|
||||
return cudf.read_csv(acquisition_path, names=cols, delimiter='|', dtype=list(dtypes.values()), skiprows=1)
|
||||
|
||||
def gpu_load_names(**kwargs):
|
||||
def gpu_load_names(col_path):
|
||||
""" Loads names used for renaming the banks
|
||||
|
||||
Returns
|
||||
@@ -248,7 +203,7 @@ def gpu_load_names(**kwargs):
|
||||
("new", "category"),
|
||||
])
|
||||
|
||||
return cudf.read_csv(col_names_path, names=cols, delimiter='|', dtype=list(dtypes.values()), skiprows=1)
|
||||
return cudf.read_csv(col_path, names=cols, delimiter='|', dtype=list(dtypes.values()), skiprows=1)
|
||||
|
||||
def create_ever_features(gdf, **kwargs):
|
||||
everdf = gdf[['loan_id', 'current_loan_delinquency_status']]
|
||||
@@ -384,117 +339,157 @@ def last_mile_cleaning(df, **kwargs):
|
||||
df['delinquency_12'] = df['delinquency_12'].fillna(False).astype('int32')
|
||||
for column in df.columns:
|
||||
df[column] = df[column].fillna(-1)
|
||||
return df.to_arrow(index=False)
|
||||
return df.to_arrow(preserve_index=False)
|
||||
|
||||
def main():
|
||||
#print('XGBOOST_BUILD_DOC is ' + os.environ['XGBOOST_BUILD_DOC'])
|
||||
parser = argparse.ArgumentParser("rapidssample")
|
||||
parser.add_argument("--data_dir", type=str, help="location of data")
|
||||
parser.add_argument("--num_gpu", type=int, help="Number of GPUs to use", default=1)
|
||||
parser.add_argument("--part_count", type=int, help="Number of data files to train against", default=2)
|
||||
parser.add_argument("--end_year", type=int, help="Year to end the data load", default=2000)
|
||||
parser.add_argument("--cpu_predictor", type=str, help="Flag to use CPU for prediction", default='False')
|
||||
parser.add_argument('-f', type=str, default='') # added for notebook execution scenarios
|
||||
args = parser.parse_args()
|
||||
data_dir = args.data_dir
|
||||
num_gpu = args.num_gpu
|
||||
part_count = args.part_count
|
||||
end_year = args.end_year
|
||||
cpu_predictor = args.cpu_predictor.lower() in ('yes', 'true', 't', 'y', '1')
|
||||
|
||||
if cpu_predictor:
|
||||
print('Training with CPUs require num gpu = 1')
|
||||
num_gpu = 1
|
||||
|
||||
print('data_dir = {0}'.format(data_dir))
|
||||
print('num_gpu = {0}'.format(num_gpu))
|
||||
print('part_count = {0}'.format(part_count))
|
||||
#part_count = part_count + 1 # adding one because the usage below is not inclusive
|
||||
print('end_year = {0}'.format(end_year))
|
||||
print('cpu_predictor = {0}'.format(cpu_predictor))
|
||||
|
||||
import subprocess
|
||||
|
||||
cmd = "hostname --all-ip-addresses"
|
||||
process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE)
|
||||
output, error = process.communicate()
|
||||
IPADDR = str(output.decode()).split()[0]
|
||||
|
||||
cluster = LocalCUDACluster(ip=IPADDR,n_workers=num_gpu)
|
||||
client = Client(cluster)
|
||||
client
|
||||
print(client.ncores())
|
||||
|
||||
# to download data for this notebook, visit https://rapidsai.github.io/demos/datasets/mortgage-data and update the following paths accordingly
|
||||
acq_data_path = "{0}/acq".format(data_dir) #"/rapids/data/mortgage/acq"
|
||||
perf_data_path = "{0}/perf".format(data_dir) #"/rapids/data/mortgage/perf"
|
||||
col_names_path = "{0}/names.csv".format(data_dir) # "/rapids/data/mortgage/names.csv"
|
||||
start_year = 2000
|
||||
acq_data_path = "{0}/acq".format(data_dir) #"/rapids/data/mortgage/acq"
|
||||
perf_data_path = "{0}/perf".format(data_dir) #"/rapids/data/mortgage/perf"
|
||||
col_names_path = "{0}/names.csv".format(data_dir) # "/rapids/data/mortgage/names.csv"
|
||||
start_year = 2000
|
||||
#end_year = 2000 # end_year is inclusive -- converted to parameter
|
||||
#part_count = 2 # the number of data files to train against -- converted to parameter
|
||||
|
||||
client.run(initialize_rmm_pool)
|
||||
|
||||
client.run(initialize_rmm_pool)
|
||||
client
|
||||
print(client.ncores())
|
||||
# NOTE: The ETL calculates additional features which are then dropped before creating the XGBoost DMatrix.
|
||||
# This can be optimized to avoid calculating the dropped features.
|
||||
print("Reading ...")
|
||||
t1 = datetime.datetime.now()
|
||||
gpu_dfs = []
|
||||
gpu_time = 0
|
||||
quarter = 1
|
||||
year = start_year
|
||||
count = 0
|
||||
while year <= end_year:
|
||||
for file in glob(os.path.join(perf_data_path + "/Performance_" + str(year) + "Q" + str(quarter) + "*")):
|
||||
if count < part_count:
|
||||
gpu_dfs.append(process_quarter_gpu(year=year, quarter=quarter, perf_file=file))
|
||||
count += 1
|
||||
print('file: {0}'.format(file))
|
||||
print('count: {0}'.format(count))
|
||||
quarter += 1
|
||||
if quarter == 5:
|
||||
year += 1
|
||||
quarter = 1
|
||||
print("Reading ...")
|
||||
t1 = datetime.datetime.now()
|
||||
gpu_dfs = []
|
||||
gpu_time = 0
|
||||
quarter = 1
|
||||
year = start_year
|
||||
count = 0
|
||||
while year <= end_year:
|
||||
for file in glob(os.path.join(perf_data_path + "/Performance_" + str(year) + "Q" + str(quarter) + "*")):
|
||||
if count < part_count:
|
||||
gpu_dfs.append(process_quarter_gpu(client, col_names_path, acq_data_path, year=year, quarter=quarter, perf_file=file))
|
||||
count += 1
|
||||
print('file: {0}'.format(file))
|
||||
print('count: {0}'.format(count))
|
||||
quarter += 1
|
||||
if quarter == 5:
|
||||
year += 1
|
||||
quarter = 1
|
||||
|
||||
wait(gpu_dfs)
|
||||
t2 = datetime.datetime.now()
|
||||
print("Reading time ...")
|
||||
print(t2-t1)
|
||||
print('len(gpu_dfs) is {0}'.format(len(gpu_dfs)))
|
||||
|
||||
client.run(cudf._gdf.rmm_finalize)
|
||||
client.run(initialize_rmm_no_pool)
|
||||
client
|
||||
print(client.ncores())
|
||||
dxgb_gpu_params = {
|
||||
'nround': 100,
|
||||
'max_depth': 8,
|
||||
'max_leaves': 2**8,
|
||||
'alpha': 0.9,
|
||||
'eta': 0.1,
|
||||
'gamma': 0.1,
|
||||
'learning_rate': 0.1,
|
||||
'subsample': 1,
|
||||
'reg_lambda': 1,
|
||||
'scale_pos_weight': 2,
|
||||
'min_child_weight': 30,
|
||||
'tree_method': 'gpu_hist',
|
||||
'n_gpus': 1,
|
||||
'distributed_dask': True,
|
||||
'loss': 'ls',
|
||||
'objective': 'gpu:reg:linear',
|
||||
'max_features': 'auto',
|
||||
'criterion': 'friedman_mse',
|
||||
'grow_policy': 'lossguide',
|
||||
'verbose': True
|
||||
}
|
||||
|
||||
if cpu_predictor:
|
||||
print('Training using CPUs')
|
||||
dxgb_gpu_params['predictor'] = 'cpu_predictor'
|
||||
dxgb_gpu_params['tree_method'] = 'hist'
|
||||
dxgb_gpu_params['objective'] = 'reg:linear'
|
||||
|
||||
wait(gpu_dfs)
|
||||
t2 = datetime.datetime.now()
|
||||
print("Reading time ...")
|
||||
print(t2-t1)
|
||||
print('len(gpu_dfs) is {0}'.format(len(gpu_dfs)))
|
||||
|
||||
client.run(cudf._gdf.rmm_finalize)
|
||||
client.run(initialize_rmm_no_pool)
|
||||
|
||||
dxgb_gpu_params = {
|
||||
'nround': 100,
|
||||
'max_depth': 8,
|
||||
'max_leaves': 2**8,
|
||||
'alpha': 0.9,
|
||||
'eta': 0.1,
|
||||
'gamma': 0.1,
|
||||
'learning_rate': 0.1,
|
||||
'subsample': 1,
|
||||
'reg_lambda': 1,
|
||||
'scale_pos_weight': 2,
|
||||
'min_child_weight': 30,
|
||||
'tree_method': 'gpu_hist',
|
||||
'n_gpus': 1,
|
||||
'distributed_dask': True,
|
||||
'loss': 'ls',
|
||||
'objective': 'gpu:reg:linear',
|
||||
'max_features': 'auto',
|
||||
'criterion': 'friedman_mse',
|
||||
'grow_policy': 'lossguide',
|
||||
'verbose': True
|
||||
}
|
||||
|
||||
if cpu_predictor:
|
||||
print('Training using CPUs')
|
||||
dxgb_gpu_params['predictor'] = 'cpu_predictor'
|
||||
dxgb_gpu_params['tree_method'] = 'hist'
|
||||
dxgb_gpu_params['objective'] = 'reg:linear'
|
||||
|
||||
else:
|
||||
print('Training using GPUs')
|
||||
|
||||
print('Training parameters are {0}'.format(dxgb_gpu_params))
|
||||
|
||||
gpu_dfs = [delayed(DataFrame.from_arrow)(gpu_df) for gpu_df in gpu_dfs[:part_count]]
|
||||
|
||||
gpu_dfs = [gpu_df for gpu_df in gpu_dfs]
|
||||
|
||||
wait(gpu_dfs)
|
||||
tmp_map = [(gpu_df, list(client.who_has(gpu_df).values())[0]) for gpu_df in gpu_dfs]
|
||||
new_map = {}
|
||||
for key, value in tmp_map:
|
||||
if value not in new_map:
|
||||
new_map[value] = [key]
|
||||
else:
|
||||
new_map[value].append(key)
|
||||
print('Training using GPUs')
|
||||
|
||||
print('Training parameters are {0}'.format(dxgb_gpu_params))
|
||||
|
||||
gpu_dfs = [delayed(DataFrame.from_arrow)(gpu_df) for gpu_df in gpu_dfs[:part_count]]
|
||||
gpu_dfs = [gpu_df for gpu_df in gpu_dfs]
|
||||
wait(gpu_dfs)
|
||||
|
||||
tmp_map = [(gpu_df, list(client.who_has(gpu_df).values())[0]) for gpu_df in gpu_dfs]
|
||||
new_map = {}
|
||||
for key, value in tmp_map:
|
||||
if value not in new_map:
|
||||
new_map[value] = [key]
|
||||
else:
|
||||
new_map[value].append(key)
|
||||
|
||||
del(tmp_map)
|
||||
gpu_dfs = []
|
||||
for list_delayed in new_map.values():
|
||||
gpu_dfs.append(delayed(cudf.concat)(list_delayed))
|
||||
|
||||
del(new_map)
|
||||
gpu_dfs = [(gpu_df[['delinquency_12']], gpu_df[delayed(list)(gpu_df.columns.difference(['delinquency_12']))]) for gpu_df in gpu_dfs]
|
||||
gpu_dfs = [(gpu_df[0].persist(), gpu_df[1].persist()) for gpu_df in gpu_dfs]
|
||||
|
||||
gpu_dfs = [dask.delayed(xgb.DMatrix)(gpu_df[1], gpu_df[0]) for gpu_df in gpu_dfs]
|
||||
gpu_dfs = [gpu_df.persist() for gpu_df in gpu_dfs]
|
||||
gc.collect()
|
||||
wait(gpu_dfs)
|
||||
|
||||
labels = None
|
||||
t1 = datetime.datetime.now()
|
||||
bst = dxgb_gpu.train(client, dxgb_gpu_params, gpu_dfs, labels, num_boost_round=dxgb_gpu_params['nround'])
|
||||
t2 = datetime.datetime.now()
|
||||
print("Training time ...")
|
||||
print(t2-t1)
|
||||
print('str(bst) is {0}'.format(str(bst)))
|
||||
print('Exiting script')
|
||||
|
||||
del(tmp_map)
|
||||
gpu_dfs = []
|
||||
for list_delayed in new_map.values():
|
||||
gpu_dfs.append(delayed(cudf.concat)(list_delayed))
|
||||
|
||||
del(new_map)
|
||||
gpu_dfs = [(gpu_df[['delinquency_12']], gpu_df[delayed(list)(gpu_df.columns.difference(['delinquency_12']))]) for gpu_df in gpu_dfs]
|
||||
gpu_dfs = [(gpu_df[0].persist(), gpu_df[1].persist()) for gpu_df in gpu_dfs]
|
||||
gpu_dfs = [dask.delayed(xgb.DMatrix)(gpu_df[1], gpu_df[0]) for gpu_df in gpu_dfs]
|
||||
gpu_dfs = [gpu_df.persist() for gpu_df in gpu_dfs]
|
||||
|
||||
gc.collect()
|
||||
labels = None
|
||||
|
||||
print('str(gpu_dfs) is {0}'.format(str(gpu_dfs)))
|
||||
|
||||
wait(gpu_dfs)
|
||||
t1 = datetime.datetime.now()
|
||||
bst = dxgb_gpu.train(client, dxgb_gpu_params, gpu_dfs, labels, num_boost_round=dxgb_gpu_params['nround'])
|
||||
t2 = datetime.datetime.now()
|
||||
print("Training time ...")
|
||||
print(t2-t1)
|
||||
print('str(bst) is {0}'.format(str(bst)))
|
||||
print('Exiting script')
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
||||
Reference in New Issue
Block a user