From 2aa39f2f4aa1e2187b0115b4169efdd2596132d2 Mon Sep 17 00:00:00 2001 From: rastala Date: Wed, 19 Jun 2019 18:55:32 -0400 Subject: [PATCH] add pipeline scripts --- .../scripts/prepdata/cleanse.py | 58 ++++++++++++ .../scripts/prepdata/filter.py | 55 ++++++++++++ .../scripts/prepdata/merge.py | 29 ++++++ .../scripts/prepdata/normalize.py | 47 ++++++++++ .../scripts/prepdata/transform.py | 88 +++++++++++++++++++ .../scripts/trainmodel/featurization.py | 31 +++++++ .../scripts/trainmodel/get_data.py | 12 +++ .../scripts/trainmodel/train_test_split.py | 48 ++++++++++ 8 files changed, 368 insertions(+) create mode 100644 how-to-use-azureml/machine-learning-pipelines/nyc-taxi-data-regression-model-building/scripts/prepdata/cleanse.py create mode 100644 how-to-use-azureml/machine-learning-pipelines/nyc-taxi-data-regression-model-building/scripts/prepdata/filter.py create mode 100644 how-to-use-azureml/machine-learning-pipelines/nyc-taxi-data-regression-model-building/scripts/prepdata/merge.py create mode 100644 how-to-use-azureml/machine-learning-pipelines/nyc-taxi-data-regression-model-building/scripts/prepdata/normalize.py create mode 100644 how-to-use-azureml/machine-learning-pipelines/nyc-taxi-data-regression-model-building/scripts/prepdata/transform.py create mode 100644 how-to-use-azureml/machine-learning-pipelines/nyc-taxi-data-regression-model-building/scripts/trainmodel/featurization.py create mode 100644 how-to-use-azureml/machine-learning-pipelines/nyc-taxi-data-regression-model-building/scripts/trainmodel/get_data.py create mode 100644 how-to-use-azureml/machine-learning-pipelines/nyc-taxi-data-regression-model-building/scripts/trainmodel/train_test_split.py diff --git a/how-to-use-azureml/machine-learning-pipelines/nyc-taxi-data-regression-model-building/scripts/prepdata/cleanse.py b/how-to-use-azureml/machine-learning-pipelines/nyc-taxi-data-regression-model-building/scripts/prepdata/cleanse.py new file mode 100644 index 00000000..0b8c4143 --- /dev/null +++ b/how-to-use-azureml/machine-learning-pipelines/nyc-taxi-data-regression-model-building/scripts/prepdata/cleanse.py @@ -0,0 +1,58 @@ +# Copyright (c) Microsoft. All rights reserved. +# Licensed under the MIT license. + +import argparse +import os +import pandas as pd +import azureml.dataprep as dprep + + +def get_dict(dict_str): + pairs = dict_str.strip("{}").split("\;") + new_dict = {} + for pair in pairs: + key, value = pair.strip('\\').split(":") + new_dict[key.strip().strip("'")] = value.strip().strip("'") + + return new_dict + + +print("Cleans the input data") + +parser = argparse.ArgumentParser("cleanse") +parser.add_argument("--input_cleanse", type=str, help="raw taxi data") +parser.add_argument("--output_cleanse", type=str, help="cleaned taxi data directory") +parser.add_argument("--useful_columns", type=str, help="useful columns to keep") +parser.add_argument("--columns", type=str, help="rename column pattern") + +args = parser.parse_args() + +print("Argument 1(input taxi data path): %s" % args.input_cleanse) +print("Argument 2(columns to keep): %s" % str(args.useful_columns.strip("[]").split("\;"))) +print("Argument 3(columns renaming mapping): %s" % str(args.columns.strip("{}").split("\;"))) +print("Argument 4(output cleansed taxi data path): %s" % args.output_cleanse) + +raw_df = dprep.read_csv(path=args.input_cleanse, header=dprep.PromoteHeadersMode.GROUPED) + +# These functions ensure that null data is removed from the data set, +# which will help increase machine learning model accuracy. +# Visit https://docs.microsoft.com/en-us/azure/machine-learning/service/tutorial-data-prep +# for more details + +useful_columns = [s.strip().strip("'") for s in args.useful_columns.strip("[]").split("\;")] +columns = get_dict(args.columns) + +all_columns = dprep.ColumnSelector(term=".*", use_regex=True) +drop_if_all_null = [all_columns, dprep.ColumnRelationship(dprep.ColumnRelationship.ALL)] + +new_df = (raw_df + .replace_na(columns=all_columns) + .drop_nulls(*drop_if_all_null) + .rename_columns(column_pairs=columns) + .keep_columns(columns=useful_columns)) + +if not (args.output_cleanse is None): + os.makedirs(args.output_cleanse, exist_ok=True) + print("%s created" % args.output_cleanse) + write_df = new_df.write_to_csv(directory_path=dprep.LocalFileOutput(args.output_cleanse)) + write_df.run_local() diff --git a/how-to-use-azureml/machine-learning-pipelines/nyc-taxi-data-regression-model-building/scripts/prepdata/filter.py b/how-to-use-azureml/machine-learning-pipelines/nyc-taxi-data-regression-model-building/scripts/prepdata/filter.py new file mode 100644 index 00000000..a7248185 --- /dev/null +++ b/how-to-use-azureml/machine-learning-pipelines/nyc-taxi-data-regression-model-building/scripts/prepdata/filter.py @@ -0,0 +1,55 @@ +import argparse +import os +import azureml.dataprep as dprep + +print("Filters out coordinates for locations that are outside the city border.", + "Chain the column filter commands within the filter() function", + "and define the minimum and maximum bounds for each field.") + +parser = argparse.ArgumentParser("filter") +parser.add_argument("--input_filter", type=str, help="merged taxi data directory") +parser.add_argument("--output_filter", type=str, help="filter out out of city locations") + +args = parser.parse_args() + +print("Argument 1(input taxi data path): %s" % args.input_filter) +print("Argument 2(output filtered taxi data path): %s" % args.output_filter) + +combined_df = dprep.read_csv(args.input_filter + '/part-*') + +# These functions filter out coordinates for locations that are outside the city border. +# Visit https://docs.microsoft.com/en-us/azure/machine-learning/service/tutorial-data-prep for more details + +# Create a condensed view of the dataflow to just show the lat/long fields, +# which makes it easier to evaluate missing or out-of-scope coordinates +decimal_type = dprep.TypeConverter(data_type=dprep.FieldType.DECIMAL) +combined_df = combined_df.set_column_types(type_conversions={ + "pickup_longitude": decimal_type, + "pickup_latitude": decimal_type, + "dropoff_longitude": decimal_type, + "dropoff_latitude": decimal_type +}) + +# Filter out coordinates for locations that are outside the city border. +# Chain the column filter commands within the filter() function +# and define the minimum and maximum bounds for each field +latlong_filtered_df = (combined_df + .drop_nulls(columns=["pickup_longitude", + "pickup_latitude", + "dropoff_longitude", + "dropoff_latitude"], + column_relationship=dprep.ColumnRelationship(dprep.ColumnRelationship.ANY)) + .filter(dprep.f_and(dprep.col("pickup_longitude") <= -73.72, + dprep.col("pickup_longitude") >= -74.09, + dprep.col("pickup_latitude") <= 40.88, + dprep.col("pickup_latitude") >= 40.53, + dprep.col("dropoff_longitude") <= -73.72, + dprep.col("dropoff_longitude") >= -74.09, + dprep.col("dropoff_latitude") <= 40.88, + dprep.col("dropoff_latitude") >= 40.53))) + +if not (args.output_filter is None): + os.makedirs(args.output_filter, exist_ok=True) + print("%s created" % args.output_filter) + write_df = latlong_filtered_df.write_to_csv(directory_path=dprep.LocalFileOutput(args.output_filter)) + write_df.run_local() diff --git a/how-to-use-azureml/machine-learning-pipelines/nyc-taxi-data-regression-model-building/scripts/prepdata/merge.py b/how-to-use-azureml/machine-learning-pipelines/nyc-taxi-data-regression-model-building/scripts/prepdata/merge.py new file mode 100644 index 00000000..4764023a --- /dev/null +++ b/how-to-use-azureml/machine-learning-pipelines/nyc-taxi-data-regression-model-building/scripts/prepdata/merge.py @@ -0,0 +1,29 @@ + +import argparse +import os +import azureml.dataprep as dprep + +print("Merge Green and Yellow taxi data") + +parser = argparse.ArgumentParser("merge") +parser.add_argument("--input_green_merge", type=str, help="cleaned green taxi data directory") +parser.add_argument("--input_yellow_merge", type=str, help="cleaned yellow taxi data directory") +parser.add_argument("--output_merge", type=str, help="green and yellow taxi data merged") + +args = parser.parse_args() + +print("Argument 1(input green taxi data path): %s" % args.input_green_merge) +print("Argument 2(input yellow taxi data path): %s" % args.input_yellow_merge) +print("Argument 3(output merge taxi data path): %s" % args.output_merge) + +green_df = dprep.read_csv(args.input_green_merge + '/part-*') +yellow_df = dprep.read_csv(args.input_yellow_merge + '/part-*') + +# Appending yellow data to green data +combined_df = green_df.append_rows([yellow_df]) + +if not (args.output_merge is None): + os.makedirs(args.output_merge, exist_ok=True) + print("%s created" % args.output_merge) + write_df = combined_df.write_to_csv(directory_path=dprep.LocalFileOutput(args.output_merge)) + write_df.run_local() diff --git a/how-to-use-azureml/machine-learning-pipelines/nyc-taxi-data-regression-model-building/scripts/prepdata/normalize.py b/how-to-use-azureml/machine-learning-pipelines/nyc-taxi-data-regression-model-building/scripts/prepdata/normalize.py new file mode 100644 index 00000000..f7b384d1 --- /dev/null +++ b/how-to-use-azureml/machine-learning-pipelines/nyc-taxi-data-regression-model-building/scripts/prepdata/normalize.py @@ -0,0 +1,47 @@ +import argparse +import os +import azureml.dataprep as dprep + +print("Replace undefined values to relavant values and rename columns to meaningful names") + +parser = argparse.ArgumentParser("normalize") +parser.add_argument("--input_normalize", type=str, help="combined and converted taxi data") +parser.add_argument("--output_normalize", type=str, help="replaced undefined values and renamed columns") + +args = parser.parse_args() + +print("Argument 1(input taxi data path): %s" % args.input_normalize) +print("Argument 2(output normalized taxi data path): %s" % args.output_normalize) + +combined_converted_df = dprep.read_csv(args.input_normalize + '/part-*') + +# These functions replace undefined values and rename to use meaningful names. +# Visit https://docs.microsoft.com/en-us/azure/machine-learning/service/tutorial-data-prep for more details + +replaced_stfor_vals_df = combined_converted_df.replace(columns="store_forward", + find="0", + replace_with="N").fill_nulls("store_forward", "N") + +replaced_distance_vals_df = replaced_stfor_vals_df.replace(columns="distance", + find=".00", + replace_with=0).fill_nulls("distance", 0) + +replaced_distance_vals_df = replaced_distance_vals_df.to_number(["distance"]) + +time_split_df = (replaced_distance_vals_df + .split_column_by_example(source_column="pickup_datetime") + .split_column_by_example(source_column="dropoff_datetime")) + +# Split the pickup and dropoff datetime values into the respective date and time columns +renamed_col_df = (time_split_df + .rename_columns(column_pairs={ + "pickup_datetime_1": "pickup_date", + "pickup_datetime_2": "pickup_time", + "dropoff_datetime_1": "dropoff_date", + "dropoff_datetime_2": "dropoff_time"})) + +if not (args.output_normalize is None): + os.makedirs(args.output_normalize, exist_ok=True) + print("%s created" % args.output_normalize) + write_df = renamed_col_df.write_to_csv(directory_path=dprep.LocalFileOutput(args.output_normalize)) + write_df.run_local() diff --git a/how-to-use-azureml/machine-learning-pipelines/nyc-taxi-data-regression-model-building/scripts/prepdata/transform.py b/how-to-use-azureml/machine-learning-pipelines/nyc-taxi-data-regression-model-building/scripts/prepdata/transform.py new file mode 100644 index 00000000..c2ac6e95 --- /dev/null +++ b/how-to-use-azureml/machine-learning-pipelines/nyc-taxi-data-regression-model-building/scripts/prepdata/transform.py @@ -0,0 +1,88 @@ +import argparse +import os +import azureml.dataprep as dprep + +print("Transforms the renamed taxi data to the required format") + +parser = argparse.ArgumentParser("transform") +parser.add_argument("--input_transform", type=str, help="renamed taxi data") +parser.add_argument("--output_transform", type=str, help="transformed taxi data") + +args = parser.parse_args() + +print("Argument 1(input taxi data path): %s" % args.input_transform) +print("Argument 2(output final transformed taxi data): %s" % args.output_transform) + +renamed_df = dprep.read_csv(args.input_transform + '/part-*') + +# These functions transform the renamed data to be used finally for training. +# Visit https://docs.microsoft.com/en-us/azure/machine-learning/service/tutorial-data-prep for more details + +# Split the pickup and dropoff date further into the day of the week, day of the month, and month values. +# To get the day of the week value, use the derive_column_by_example() function. +# The function takes an array parameter of example objects that define the input data, +# and the preferred output. The function automatically determines your preferred transformation. +# For the pickup and dropoff time columns, split the time into the hour, minute, and second by using +# the split_column_by_example() function with no example parameter. After you generate the new features, +# use the drop_columns() function to delete the original fields as the newly generated features are preferred. +# Rename the rest of the fields to use meaningful descriptions. + +transformed_features_df = (renamed_df + .derive_column_by_example( + source_columns="pickup_date", + new_column_name="pickup_weekday", + example_data=[("2009-01-04", "Sunday"), ("2013-08-22", "Thursday")]) + .derive_column_by_example( + source_columns="dropoff_date", + new_column_name="dropoff_weekday", + example_data=[("2013-08-22", "Thursday"), ("2013-11-03", "Sunday")]) + + .split_column_by_example(source_column="pickup_time") + .split_column_by_example(source_column="dropoff_time") + + .split_column_by_example(source_column="pickup_time_1") + .split_column_by_example(source_column="dropoff_time_1") + .drop_columns(columns=[ + "pickup_date", "pickup_time", "dropoff_date", "dropoff_time", + "pickup_date_1", "dropoff_date_1", "pickup_time_1", "dropoff_time_1"]) + + .rename_columns(column_pairs={ + "pickup_date_2": "pickup_month", + "pickup_date_3": "pickup_monthday", + "pickup_time_1_1": "pickup_hour", + "pickup_time_1_2": "pickup_minute", + "pickup_time_2": "pickup_second", + "dropoff_date_2": "dropoff_month", + "dropoff_date_3": "dropoff_monthday", + "dropoff_time_1_1": "dropoff_hour", + "dropoff_time_1_2": "dropoff_minute", + "dropoff_time_2": "dropoff_second"})) + +# Drop the pickup_datetime and dropoff_datetime columns because they're +# no longer needed (granular time features like hour, +# minute and second are more useful for model training). +processed_df = transformed_features_df.drop_columns(columns=["pickup_datetime", "dropoff_datetime"]) + +# Use the type inference functionality to automatically check the data type of each field, +# and display the inference results. +type_infer = processed_df.builders.set_column_types() +type_infer.learn() + +# The inference results look correct based on the data. Now apply the type conversions to the dataflow. +type_converted_df = type_infer.to_dataflow() + +# Before you package the dataflow, run two final filters on the data set. +# To eliminate incorrectly captured data points, +# filter the dataflow on records where both the cost and distance variable values are greater than zero. +# This step will significantly improve machine learning model accuracy, +# because data points with a zero cost or distance represent major outliers that throw off prediction accuracy. + +final_df = type_converted_df.filter(dprep.col("distance") > 0) +final_df = final_df.filter(dprep.col("cost") > 0) + +# Writing the final dataframe to use for training in the following steps +if not (args.output_transform is None): + os.makedirs(args.output_transform, exist_ok=True) + print("%s created" % args.output_transform) + write_df = final_df.write_to_csv(directory_path=dprep.LocalFileOutput(args.output_transform)) + write_df.run_local() diff --git a/how-to-use-azureml/machine-learning-pipelines/nyc-taxi-data-regression-model-building/scripts/trainmodel/featurization.py b/how-to-use-azureml/machine-learning-pipelines/nyc-taxi-data-regression-model-building/scripts/trainmodel/featurization.py new file mode 100644 index 00000000..bcf2338a --- /dev/null +++ b/how-to-use-azureml/machine-learning-pipelines/nyc-taxi-data-regression-model-building/scripts/trainmodel/featurization.py @@ -0,0 +1,31 @@ +import argparse +import os +import azureml.dataprep as dprep +import azureml.core + +print("Extracts important features from prepared data") + +parser = argparse.ArgumentParser("featurization") +parser.add_argument("--input_featurization", type=str, help="input featurization") +parser.add_argument("--useful_columns", type=str, help="columns to use") +parser.add_argument("--output_featurization", type=str, help="output featurization") + +args = parser.parse_args() + +print("Argument 1(input training data path): %s" % args.input_featurization) +print("Argument 2(column features to use): %s" % str(args.useful_columns.strip("[]").split("\;"))) +print("Argument 3:(output featurized training data path) %s" % args.output_featurization) + +dflow_prepared = dprep.read_csv(args.input_featurization + '/part-*') + +# These functions extracts useful features for training +# Visit https://docs.microsoft.com/en-us/azure/machine-learning/service/tutorial-auto-train-models for more detail + +useful_columns = [s.strip().strip("'") for s in args.useful_columns.strip("[]").split("\;")] +dflow = dflow_prepared.keep_columns(useful_columns) + +if not (args.output_featurization is None): + os.makedirs(args.output_featurization, exist_ok=True) + print("%s created" % args.output_featurization) + write_df = dflow.write_to_csv(directory_path=dprep.LocalFileOutput(args.output_featurization)) + write_df.run_local() diff --git a/how-to-use-azureml/machine-learning-pipelines/nyc-taxi-data-regression-model-building/scripts/trainmodel/get_data.py b/how-to-use-azureml/machine-learning-pipelines/nyc-taxi-data-regression-model-building/scripts/trainmodel/get_data.py new file mode 100644 index 00000000..6472e46a --- /dev/null +++ b/how-to-use-azureml/machine-learning-pipelines/nyc-taxi-data-regression-model-building/scripts/trainmodel/get_data.py @@ -0,0 +1,12 @@ + +import os +import pandas as pd + + +def get_data(): + print("In get_data") + print(os.environ['AZUREML_DATAREFERENCE_output_split_train_x']) + X_train = pd.read_csv(os.environ['AZUREML_DATAREFERENCE_output_split_train_x'] + "/part-00000", header=0) + y_train = pd.read_csv(os.environ['AZUREML_DATAREFERENCE_output_split_train_y'] + "/part-00000", header=0) + + return {"X": X_train.values, "y": y_train.values.flatten()} diff --git a/how-to-use-azureml/machine-learning-pipelines/nyc-taxi-data-regression-model-building/scripts/trainmodel/train_test_split.py b/how-to-use-azureml/machine-learning-pipelines/nyc-taxi-data-regression-model-building/scripts/trainmodel/train_test_split.py new file mode 100644 index 00000000..cdc80b61 --- /dev/null +++ b/how-to-use-azureml/machine-learning-pipelines/nyc-taxi-data-regression-model-building/scripts/trainmodel/train_test_split.py @@ -0,0 +1,48 @@ +import argparse +import os +import azureml.dataprep as dprep +import azureml.core +from sklearn.model_selection import train_test_split + + +def write_output(df, path): + os.makedirs(path, exist_ok=True) + print("%s created" % path) + df.to_csv(path + "/part-00000", index=False) + + +print("Split the data into train and test") + +parser = argparse.ArgumentParser("split") +parser.add_argument("--input_split_features", type=str, help="input split features") +parser.add_argument("--input_split_labels", type=str, help="input split labels") +parser.add_argument("--output_split_train_x", type=str, help="output split train features") +parser.add_argument("--output_split_train_y", type=str, help="output split train labels") +parser.add_argument("--output_split_test_x", type=str, help="output split test features") +parser.add_argument("--output_split_test_y", type=str, help="output split test labels") + +args = parser.parse_args() + +print("Argument 1(input taxi data features path): %s" % args.input_split_features) +print("Argument 2(input taxi data labels path): %s" % args.input_split_labels) +print("Argument 3(output training features split path): %s" % args.output_split_train_x) +print("Argument 4(output training labels split path): %s" % args.output_split_train_y) +print("Argument 5(output test features split path): %s" % args.output_split_test_x) +print("Argument 6(output test labels split path): %s" % args.output_split_test_y) + +x_df = dprep.read_csv(path=args.input_split_features, header=dprep.PromoteHeadersMode.GROUPED).to_pandas_dataframe() +y_df = dprep.read_csv(path=args.input_split_labels, header=dprep.PromoteHeadersMode.GROUPED).to_pandas_dataframe() + +# These functions splits the input features and labels into test and train data +# Visit https://docs.microsoft.com/en-us/azure/machine-learning/service/tutorial-auto-train-models for more detail + +x_train, x_test, y_train, y_test = train_test_split(x_df, y_df, test_size=0.2, random_state=223) + +if not (args.output_split_train_x is None and + args.output_split_test_x is None and + args.output_split_train_y is None and + args.output_split_test_y is None): + write_output(x_train, args.output_split_train_x) + write_output(y_train, args.output_split_train_y) + write_output(x_test, args.output_split_test_x) + write_output(y_test, args.output_split_test_y)