mirror of
https://github.com/Azure/MachineLearningNotebooks.git
synced 2025-12-19 17:17:04 -05:00
170 lines
6.4 KiB
Python
170 lines
6.4 KiB
Python
from typing import Any, Dict, Optional
|
|
|
|
import os
|
|
|
|
import azureml.train.automl.runtime._hts.hts_runtime_utilities as hru
|
|
|
|
from azureml._restclient.jasmine_client import JasmineClient
|
|
from azureml.contrib.automl.pipeline.steps import utilities
|
|
from azureml.core import RunConfiguration
|
|
from azureml.core.compute import ComputeTarget
|
|
from azureml.core.experiment import Experiment
|
|
from azureml.data import LinkTabularOutputDatasetConfig, TabularDataset
|
|
from azureml.pipeline.core import Pipeline, PipelineData, PipelineParameter
|
|
from azureml.pipeline.steps import ParallelRunConfig, ParallelRunStep, PythonScriptStep
|
|
from azureml.train.automl.constants import Scenarios
|
|
from azureml.data.dataset_consumption_config import DatasetConsumptionConfig
|
|
|
|
|
|
PROJECT_FOLDER = "assets"
|
|
SETTINGS_FILE = "automl_settings.json"
|
|
|
|
|
|
def get_backtest_pipeline(
|
|
experiment: Experiment,
|
|
dataset: TabularDataset,
|
|
process_per_node: int,
|
|
node_count: int,
|
|
compute_target: ComputeTarget,
|
|
automl_settings: Dict[str, Any],
|
|
step_size: int,
|
|
step_number: int,
|
|
model_name: Optional[str] = None,
|
|
model_uid: Optional[str] = None,
|
|
forecast_quantiles: Optional[list] = None,
|
|
) -> Pipeline:
|
|
"""
|
|
:param experiment: The experiment used to run the pipeline.
|
|
:param dataset: Tabular data set to be used for model training.
|
|
:param process_per_node: The number of processes per node. Generally it should be the number of cores
|
|
on the node divided by two.
|
|
:param node_count: The number of nodes to be used.
|
|
:param compute_target: The compute target to be used to run the pipeline.
|
|
:param model_name: The name of a model to be back tested.
|
|
:param automl_settings: The dictionary with automl settings.
|
|
:param step_size: The number of periods to step back in backtesting.
|
|
:param step_number: The number of backtesting iterations.
|
|
:param model_uid: The uid to mark models from this run of the experiment.
|
|
:param forecast_quantiles: The forecast quantiles that are required in the inference.
|
|
:return: The pipeline to be used for model retraining.
|
|
**Note:** The output will be uploaded in the pipeline output
|
|
called 'score'.
|
|
"""
|
|
jasmine_client = JasmineClient(
|
|
service_context=experiment.workspace.service_context,
|
|
experiment_name=experiment.name,
|
|
experiment_id=experiment.id,
|
|
)
|
|
env = jasmine_client.get_curated_environment(
|
|
scenario=Scenarios.AUTOML,
|
|
enable_dnn=False,
|
|
enable_gpu=False,
|
|
compute=compute_target,
|
|
compute_sku=experiment.workspace.compute_targets.get(
|
|
compute_target.name
|
|
).vm_size,
|
|
)
|
|
data_results = PipelineData(
|
|
name="results", datastore=None, pipeline_output_name="results"
|
|
)
|
|
############################################################
|
|
# Split the data set using python script.
|
|
############################################################
|
|
run_config = RunConfiguration()
|
|
run_config.docker.use_docker = True
|
|
run_config.environment = env
|
|
|
|
utilities.set_environment_variables_for_run(run_config)
|
|
|
|
split_data = PipelineData(name="split_data_output", datastore=None).as_dataset()
|
|
split_step = PythonScriptStep(
|
|
name="split_data_for_backtest",
|
|
script_name="data_split.py",
|
|
inputs=[dataset.as_named_input("training_data")],
|
|
outputs=[split_data],
|
|
source_directory=PROJECT_FOLDER,
|
|
arguments=[
|
|
"--step-size",
|
|
step_size,
|
|
"--step-number",
|
|
step_number,
|
|
"--time-column-name",
|
|
automl_settings.get("time_column_name"),
|
|
"--time-series-id-column-names",
|
|
automl_settings.get("grain_column_names"),
|
|
"--output-dir",
|
|
split_data,
|
|
],
|
|
runconfig=run_config,
|
|
compute_target=compute_target,
|
|
allow_reuse=False,
|
|
)
|
|
############################################################
|
|
# We will do the backtest the parallel run step.
|
|
############################################################
|
|
settings_path = os.path.join(PROJECT_FOLDER, SETTINGS_FILE)
|
|
hru.dump_object_to_json(automl_settings, settings_path)
|
|
mini_batch_size = PipelineParameter(name="batch_size_param", default_value=str(1))
|
|
back_test_config = ParallelRunConfig(
|
|
source_directory=PROJECT_FOLDER,
|
|
entry_script="retrain_models.py",
|
|
mini_batch_size=mini_batch_size,
|
|
error_threshold=-1,
|
|
output_action="append_row",
|
|
append_row_file_name="outputs.txt",
|
|
compute_target=compute_target,
|
|
environment=env,
|
|
process_count_per_node=process_per_node,
|
|
run_invocation_timeout=3600,
|
|
node_count=node_count,
|
|
)
|
|
utilities.set_environment_variables_for_run(back_test_config)
|
|
forecasts = PipelineData(name="forecasts", datastore=None)
|
|
if model_name:
|
|
parallel_step_name = "{}-backtest".format(model_name.replace("_", "-"))
|
|
else:
|
|
parallel_step_name = "AutoML-backtest"
|
|
|
|
prs_args = [
|
|
"--target_column_name",
|
|
automl_settings.get("label_column_name"),
|
|
"--output-dir",
|
|
forecasts,
|
|
]
|
|
if model_name is not None:
|
|
prs_args.append("--model-name")
|
|
prs_args.append(model_name)
|
|
if model_uid is not None:
|
|
prs_args.append("--model-uid")
|
|
prs_args.append(model_uid)
|
|
if forecast_quantiles:
|
|
prs_args.append("--forecast_quantiles")
|
|
prs_args.extend(forecast_quantiles)
|
|
backtest_prs = ParallelRunStep(
|
|
name=parallel_step_name,
|
|
parallel_run_config=back_test_config,
|
|
arguments=prs_args,
|
|
inputs=[split_data],
|
|
output=forecasts,
|
|
allow_reuse=False,
|
|
)
|
|
############################################################
|
|
# Then we collect the output and return it as scores output.
|
|
############################################################
|
|
collection_step = PythonScriptStep(
|
|
name="score",
|
|
script_name="score.py",
|
|
inputs=[forecasts.as_mount()],
|
|
outputs=[data_results],
|
|
source_directory=PROJECT_FOLDER,
|
|
arguments=["--forecasts", forecasts, "--output-dir", data_results],
|
|
runconfig=run_config,
|
|
compute_target=compute_target,
|
|
allow_reuse=False,
|
|
)
|
|
# Build and return the pipeline.
|
|
return Pipeline(
|
|
workspace=experiment.workspace,
|
|
steps=[split_step, backtest_prs, collection_step],
|
|
)
|