mirror of
https://github.com/Azure/MachineLearningNotebooks.git
synced 2025-12-20 09:37:04 -05:00
90 lines
3.6 KiB
Python
90 lines
3.6 KiB
Python
import argparse
|
|
import os
|
|
from datetime import datetime
|
|
from dateutil.relativedelta import relativedelta
|
|
import pandas as pd
|
|
import traceback
|
|
from azureml.core import Dataset
|
|
from azureml.core.run import Run, _OfflineRun
|
|
from azureml.core import Workspace
|
|
from azureml.opendatasets import NoaaIsdWeather
|
|
|
|
run = Run.get_context()
|
|
ws = None
|
|
if type(run) == _OfflineRun:
|
|
ws = Workspace.from_config()
|
|
else:
|
|
ws = run.experiment.workspace
|
|
|
|
usaf_list = ['725724', '722149', '723090', '722159', '723910', '720279',
|
|
'725513', '725254', '726430', '720381', '723074', '726682',
|
|
'725486', '727883', '723177', '722075', '723086', '724053',
|
|
'725070', '722073', '726060', '725224', '725260', '724520',
|
|
'720305', '724020', '726510', '725126', '722523', '703333',
|
|
'722249', '722728', '725483', '722972', '724975', '742079',
|
|
'727468', '722193', '725624', '722030', '726380', '720309',
|
|
'722071', '720326', '725415', '724504', '725665', '725424',
|
|
'725066']
|
|
|
|
|
|
def get_noaa_data(start_time, end_time):
|
|
columns = ['usaf', 'wban', 'datetime', 'latitude', 'longitude', 'elevation',
|
|
'windAngle', 'windSpeed', 'temperature', 'stationName', 'p_k']
|
|
isd = NoaaIsdWeather(start_time, end_time, cols=columns)
|
|
noaa_df = isd.to_pandas_dataframe()
|
|
df_filtered = noaa_df[noaa_df["usaf"].isin(usaf_list)]
|
|
df_filtered.reset_index(drop=True)
|
|
print("Received {0} rows of training data between {1} and {2}".format(
|
|
df_filtered.shape[0], start_time, end_time))
|
|
return df_filtered
|
|
|
|
|
|
print("Check for new data and prepare the data")
|
|
|
|
parser = argparse.ArgumentParser("split")
|
|
parser.add_argument("--ds_name", help="name of the Dataset to update")
|
|
args = parser.parse_args()
|
|
|
|
print("Argument 1(ds_name): %s" % args.ds_name)
|
|
|
|
dstor = ws.get_default_datastore()
|
|
register_dataset = False
|
|
try:
|
|
ds = Dataset.get_by_name(ws, args.ds_name)
|
|
end_time_last_slice = ds.data_changed_time.replace(tzinfo=None)
|
|
print("Dataset {0} last updated on {1}".format(args.ds_name,
|
|
end_time_last_slice))
|
|
except Exception as e:
|
|
print(traceback.format_exc())
|
|
print("Dataset with name {0} not found, registering new dataset.".format(args.ds_name))
|
|
register_dataset = True
|
|
end_time_last_slice = datetime.today() - relativedelta(weeks=2)
|
|
|
|
end_time = datetime.utcnow()
|
|
train_df = get_noaa_data(end_time_last_slice, end_time)
|
|
|
|
if train_df.size > 0:
|
|
print("Received {0} rows of new data after {0}.".format(
|
|
train_df.shape[0], end_time_last_slice))
|
|
folder_name = "{}/{:04d}/{:02d}/{:02d}/{:02d}/{:02d}/{:02d}".format(args.ds_name, end_time.year,
|
|
end_time.month, end_time.day,
|
|
end_time.hour, end_time.minute,
|
|
end_time.second)
|
|
file_path = "{0}/data.csv".format(folder_name)
|
|
|
|
# Add a new partition to the registered dataset
|
|
os.makedirs(folder_name, exist_ok=True)
|
|
train_df.to_csv(file_path, index=False)
|
|
|
|
dstor.upload_files(files=[file_path],
|
|
target_path=folder_name,
|
|
overwrite=True,
|
|
show_progress=True)
|
|
else:
|
|
print("No new data since {0}.".format(end_time_last_slice))
|
|
|
|
if register_dataset:
|
|
ds = Dataset.Tabular.from_delimited_files(dstor.path("{}/**/*.csv".format(
|
|
args.ds_name)), partition_format='/{partition_date:yyyy/MM/dd/HH/mm/ss}/data.csv')
|
|
ds.register(ws, name=args.ds_name)
|