mirror of
https://github.com/Azure/MachineLearningNotebooks.git
synced 2025-12-19 17:17:04 -05:00
266 lines
9.6 KiB
Plaintext
266 lines
9.6 KiB
Plaintext
{
|
|
"cells": [
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
""
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"# Join\n",
|
|
"Copyright (c) Microsoft Corporation. All rights reserved.<br>\n",
|
|
"Licensed under the MIT License.<br>\n",
|
|
"\n",
|
|
"In Data Prep you can easily join two Dataflows."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"import azureml.dataprep as dprep"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"First, get the left side of the data into a shape that is ready for the join."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# get the first Dataflow and derive desired key column\n",
|
|
"dflow_left = dprep.read_csv(path='https://dpreptestfiles.blob.core.windows.net/testfiles/BostonWeather.csv')\n",
|
|
"dflow_left = dflow_left.derive_column_by_example(source_columns='DATE', new_column_name='date_timerange',\n",
|
|
" example_data=[('11/11/2015 0:54', 'Nov 11, 2015 | 12AM-2AM'),\n",
|
|
" ('2/1/2015 0:54', 'Feb 1, 2015 | 12AM-2AM'),\n",
|
|
" ('1/29/2015 20:54', 'Jan 29, 2015 | 8PM-10PM')])\n",
|
|
"dflow_left = dflow_left.drop_columns(['DATE'])\n",
|
|
"\n",
|
|
"# convert types and summarize data\n",
|
|
"dflow_left = dflow_left.set_column_types(type_conversions={'HOURLYDRYBULBTEMPF': dprep.TypeConverter(dprep.FieldType.DECIMAL)})\n",
|
|
"dflow_left = dflow_left.filter(expression=~dflow_left['HOURLYDRYBULBTEMPF'].is_error())\n",
|
|
"dflow_left = dflow_left.summarize(group_by_columns=['date_timerange'],summary_columns=[dprep.SummaryColumnsValue('HOURLYDRYBULBTEMPF', dprep.api.engineapi.typedefinitions.SummaryFunction.MEAN, 'HOURLYDRYBULBTEMPF_Mean')] )\n",
|
|
"\n",
|
|
"# cache the result so the steps above are not executed every time we pull on the data\n",
|
|
"import os\n",
|
|
"from pathlib import Path\n",
|
|
"cache_dir = str(Path(os.getcwd(), 'dataflow-cache'))\n",
|
|
"dflow_left.cache(directory_path=cache_dir)\n",
|
|
"dflow_left.head(5)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"Now let's prepare the data for the right side of the join."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# get the second Dataflow and desired key column\n",
|
|
"dflow_right = dprep.read_csv(path='https://dpreptestfiles.blob.core.windows.net/bike-share/*-hubway-tripdata.csv')\n",
|
|
"dflow_right = dflow_right.keep_columns(['starttime', 'start station id'])\n",
|
|
"dflow_right = dflow_right.derive_column_by_example(source_columns='starttime', new_column_name='l_date_timerange',\n",
|
|
" example_data=[('2015-01-01 00:21:44', 'Jan 1, 2015 | 12AM-2AM')])\n",
|
|
"dflow_right = dflow_right.drop_columns('starttime')\n",
|
|
"\n",
|
|
"# cache the results\n",
|
|
"dflow_right.cache(directory_path=cache_dir)\n",
|
|
"dflow_right.head(5)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"There are three ways you can join two Dataflows in Data Prep:\n",
|
|
"1. Create a `JoinBuilder` object for interactive join configuration.\n",
|
|
"2. Call ```join()``` on one of the Dataflows and pass in the other along with all other arguments.\n",
|
|
"3. Call ```Dataflow.join()``` method and pass in two Dataflows along with all other arguments.\n",
|
|
"\n",
|
|
"We will explore the builder object as it simplifies the determination of correct arguments. "
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# construct a builder for joining dataflow_l with dataflow_r\n",
|
|
"join_builder = dflow_left.builders.join(right_dataflow=dflow_right, left_column_prefix='l', right_column_prefix='r')\n",
|
|
"\n",
|
|
"join_builder"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"So far the builder has no properties set except default values.\n",
|
|
"From here you can set each of the options and preview its effect on the join result or use Data Prep to determine some of them.\n",
|
|
"\n",
|
|
"Let's start with determining appropriate column prefixes for left and right side of the join and lists of columns that would not conflict and therefore don't need to be prefixed."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"join_builder.detect_column_info()\n",
|
|
"join_builder"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"You can see that Data Prep has performed a pull on both Dataflows to determine the column names in them. Given that `dataflow_r` already had a column starting with `l_` new prefix got generated which would not collide with any column names that are already present.\n",
|
|
"Additionally columns in each Dataflow that won't conflict during join would remain unprefixed.\n",
|
|
"This apprach to column naming is crucial for join robustness to schema changes in the data. Let's say that at some time in future the data consumed by left Dataflow will also have `l_date_timerange` column in it.\n",
|
|
"Configured as above the join will still run as expected and the new column will be prefixed with `l2_` ensuring that ig column `l_date_timerange` was consumed by some other future transformation it remains unaffected.\n",
|
|
"\n",
|
|
"Note: `KEY_generated` is appended to both lists and is reserved for Data Prep use in case Autojoin is performed.\n",
|
|
"\n",
|
|
"### Autojoin\n",
|
|
"Autojoin is a Data prep feature that determines suitable join arguments given data on both sides. In some cases Autojoin can even derive a key column from a number of available columns in the data.\n",
|
|
"Here is how you can use Autojoin:"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# generate join suggestions\n",
|
|
"join_builder.generate_suggested_join()\n",
|
|
"\n",
|
|
"# list generated suggestions\n",
|
|
"join_builder.list_join_suggestions()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"Now let's select the first suggestion and preview the result of the join."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# apply first suggestion\n",
|
|
"join_builder.apply_suggestion(0)\n",
|
|
"\n",
|
|
"join_builder.preview(10)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"Now, get our new joined Dataflow."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"dflow_autojoined = join_builder.to_dataflow().drop_columns(['l_date_timerange'])"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"### Joining two Dataflows without pulling the data\n",
|
|
"\n",
|
|
"If you don't want to pull on data and know what join should look like, you can always use the join method on the Dataflow."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"dflow_joined = dprep.Dataflow.join(left_dataflow=dflow_left,\n",
|
|
" right_dataflow=dflow_right,\n",
|
|
" join_key_pairs=[('date_timerange', 'l_date_timerange')],\n",
|
|
" left_column_prefix='l2_',\n",
|
|
" right_column_prefix='r_')\n"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"dflow_joined.head(5)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"dflow_joined = dflow_joined.filter(expression=dflow_joined['r_start station id'] == '67')\n",
|
|
"df = dflow_joined.to_pandas_dataframe()\n",
|
|
"df"
|
|
]
|
|
}
|
|
],
|
|
"metadata": {
|
|
"authors": [
|
|
{
|
|
"name": "sihhu"
|
|
}
|
|
],
|
|
"kernelspec": {
|
|
"display_name": "Python 3.6",
|
|
"language": "python",
|
|
"name": "python36"
|
|
},
|
|
"language_info": {
|
|
"codemirror_mode": {
|
|
"name": "ipython",
|
|
"version": 3
|
|
},
|
|
"file_extension": ".py",
|
|
"mimetype": "text/x-python",
|
|
"name": "python",
|
|
"nbconvert_exporter": "python",
|
|
"pygments_lexer": "ipython3",
|
|
"version": "3.6.5"
|
|
}
|
|
},
|
|
"nbformat": 4,
|
|
"nbformat_minor": 2
|
|
} |