Files
2019-11-04 17:57:28 +00:00

136 lines
5.2 KiB
Python

from typing import List, Dict
import copy
import json
import pandas as pd
import re
from azureml.core import RunConfiguration
from azureml.core.compute import ComputeTarget
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.dataset import Dataset
from azureml.pipeline.core import PipelineData, PipelineParameter, TrainingOutput, StepSequence
from azureml.pipeline.steps import PythonScriptStep
from azureml.train.automl import AutoMLConfig
from azureml.train.automl import AutoMLStep
def _get_groups(data: Dataset, group_column_names: List[str]) -> pd.DataFrame:
return data._dataflow.distinct(columns=group_column_names)\
.keep_columns(columns=group_column_names).to_pandas_dataframe()
def _get_configs(automlconfig: AutoMLConfig,
data: Dataset,
target_column: str,
compute_target: ComputeTarget,
group_column_names: List[str]) -> Dict[str, AutoMLConfig]:
# remove invalid characters regex
valid_chars = re.compile('[^a-zA-Z0-9-]')
groups = _get_groups(data, group_column_names)
configs = {}
for i, group in groups.iterrows():
single = data
group_name = "#####".join(str(x) for x in group.values)
group_name = valid_chars.sub('', group_name)
for key in group.index:
single = data._dataflow.filter(data._dataflow[key] == group[key])
group_conf = copy.deepcopy(automlconfig)
group_conf.user_settings['training_data'] = single
group_conf.user_settings['label_column_name'] = target_column
group_conf.user_settings['compute_target'] = compute_target
configs[group_name] = group_conf
return configs
def build_pipeline_steps(automlconfig: AutoMLConfig,
data: Dataset,
target_column: str,
compute_target: ComputeTarget,
group_column_names: list,
time_column_name: str,
deploy: bool,
service_name: str = 'grouping-demo') -> StepSequence:
steps = []
metrics_output_name = 'metrics_{}'
best_model_output_name = 'best_model_{}'
count = 0
model_names = []
# get all automl configs by group
configs = _get_configs(automlconfig, data, target_column, compute_target, group_column_names)
# build a runconfig for register model
register_config = RunConfiguration()
cd = CondaDependencies()
cd.add_pip_package('azureml-pipeline')
register_config.environment.python.conda_dependencies = cd
# create each automl step end-to-end (train, register)
for group_name, conf in configs.items():
# create automl metrics output
metirics_data = PipelineData(
name='metrics_data_{}'.format(group_name),
pipeline_output_name=metrics_output_name.format(group_name),
training_output=TrainingOutput(type='Metrics'))
# create automl model output
model_data = PipelineData(
name='model_data_{}'.format(group_name),
pipeline_output_name=best_model_output_name.format(group_name),
training_output=TrainingOutput(type='Model', metric=conf.user_settings['primary_metric']))
automl_step = AutoMLStep(
name='automl_{}'.format(group_name),
automl_config=conf,
outputs=[metirics_data, model_data],
allow_reuse=True)
steps.append(automl_step)
# pass the group name as a parameter to the register step ->
# this will become the name of the model for this group.
group_name_param = PipelineParameter("group_name_{}".format(count), default_value=group_name)
count += 1
reg_model_step = PythonScriptStep(
'register.py',
name='register_{}'.format(group_name),
arguments=["--model_name", group_name_param, "--model_path", model_data],
inputs=[model_data],
compute_target=compute_target,
runconfig=register_config,
source_directory="register",
allow_reuse=True
)
steps.append(reg_model_step)
model_names.append(group_name)
final_steps = steps
if deploy:
# add deployment step
pp_group_column_names = PipelineParameter(
"group_column_names",
default_value="#####".join(list(reversed(group_column_names))))
pp_model_names = PipelineParameter(
"model_names",
default_value=json.dumps(model_names))
pp_service_name = PipelineParameter(
"service_name",
default_value=service_name)
deployment_step = PythonScriptStep(
'deploy.py',
name='service_deploy',
arguments=["--group_column_names", pp_group_column_names,
"--model_names", pp_model_names,
"--service_name", pp_service_name,
"--time_column_name", time_column_name],
compute_target=compute_target,
runconfig=RunConfiguration(),
source_directory="deploy"
)
final_steps = StepSequence(steps=[steps, deployment_step])
return final_steps