Compare commits

...

8 Commits

Author SHA1 Message Date
vizhur
a7c3a0fdb8 update samples from Release-54 as a part of SDK release 2020-06-02 21:34:10 +00:00
Harneet Virk
6d11cdfa0a Merge pull request #984 from Azure/release_update/Release-53
update samples from Release-53 as a part of  SDK release
2020-05-26 19:59:58 -07:00
vizhur
11e8ed2bab update samples from Release-53 as a part of SDK release 2020-05-27 02:45:07 +00:00
Harneet Virk
12c06a4168 Merge pull request #978 from ahcan76/patch-1
Fix image paths in tutorial-1st-experiment-sdk-train.ipynb
2020-05-18 12:58:21 -07:00
ahcan76
1f75dc9725 Update tutorial-1st-experiment-sdk-train.ipynb
Fix the image path
2020-05-18 22:40:54 +03:00
Harneet Virk
1a1a42d525 Merge pull request #977 from Azure/release_update/Release-52
update samples from Release-52 as a part of  SDK release
2020-05-18 12:22:48 -07:00
vizhur
879a272a8d update samples from Release-52 as a part of SDK release 2020-05-18 19:21:05 +00:00
Harneet Virk
bc65bde097 Merge pull request #971 from Azure/release_update/Release-51
update samples from Release-51 as a part of  SDK release
2020-05-13 22:17:45 -07:00
93 changed files with 10453 additions and 716 deletions

View File

@@ -40,6 +40,7 @@ The [How to use Azure ML](./how-to-use-azureml) folder contains specific example
- [Deployment](./how-to-use-azureml/deployment) - Examples showing how to deploy and manage machine learning models and solutions
- [Azure Databricks](./how-to-use-azureml/azure-databricks) - Examples showing how to use Azure ML with Azure Databricks
- [Monitor Models](./how-to-use-azureml/monitor-models) - Examples showing how to enable model monitoring services such as DataDrift
- [Reinforcement Learning](./how-to-use-azureml/reinforcement-learning) - Examples showing how to train reinforcement learning agents
---
## Documentation

View File

@@ -103,7 +103,7 @@
"source": [
"import azureml.core\n",
"\n",
"print(\"This notebook was created using version 1.5.0 of the Azure ML SDK\")\n",
"print(\"This notebook was created using version 1.6.0 of the Azure ML SDK\")\n",
"print(\"You are currently using version\", azureml.core.VERSION, \"of the Azure ML SDK\")"
]
},

View File

@@ -144,7 +144,7 @@ jupyter notebook
- Dataset: forecasting for a bike-sharing
- Example of training an automated ML forecasting model on multiple time-series
- [auto-ml-forecasting-function.ipynb](forecasting-high-frequency/auto-ml-forecasting-function.ipynb)
- [auto-ml-forecasting-function.ipynb](forecasting-forecast-function/auto-ml-forecasting-function.ipynb)
- Example of training an automated ML forecasting model on multiple time-series
- [auto-ml-forecasting-beer-remote.ipynb](forecasting-beer-remote/auto-ml-forecasting-beer-remote.ipynb)

View File

@@ -105,7 +105,7 @@
"metadata": {},
"outputs": [],
"source": [
"print(\"This notebook was created using version 1.5.0 of the Azure ML SDK\")\n",
"print(\"This notebook was created using version 1.6.0 of the Azure ML SDK\")\n",
"print(\"You are currently using version\", azureml.core.VERSION, \"of the Azure ML SDK\")"
]
},

View File

@@ -93,7 +93,7 @@
"metadata": {},
"outputs": [],
"source": [
"print(\"This notebook was created using version 1.5.0 of the Azure ML SDK\")\n",
"print(\"This notebook was created using version 1.6.0 of the Azure ML SDK\")\n",
"print(\"You are currently using version\", azureml.core.VERSION, \"of the Azure ML SDK\")"
]
},

View File

@@ -97,7 +97,7 @@
"metadata": {},
"outputs": [],
"source": [
"print(\"This notebook was created using version 1.5.0 of the Azure ML SDK\")\n",
"print(\"This notebook was created using version 1.6.0 of the Azure ML SDK\")\n",
"print(\"You are currently using version\", azureml.core.VERSION, \"of the Azure ML SDK\")"
]
},

View File

@@ -88,7 +88,7 @@
"metadata": {},
"outputs": [],
"source": [
"print(\"This notebook was created using version 1.5.0 of the Azure ML SDK\")\n",
"print(\"This notebook was created using version 1.6.0 of the Azure ML SDK\")\n",
"print(\"You are currently using version\", azureml.core.VERSION, \"of the Azure ML SDK\")"
]
},

View File

@@ -114,7 +114,7 @@
"metadata": {},
"outputs": [],
"source": [
"print(\"This notebook was created using version 1.5.0 of the Azure ML SDK\")\n",
"print(\"This notebook was created using version 1.6.0 of the Azure ML SDK\")\n",
"print(\"You are currently using version\", azureml.core.VERSION, \"of the Azure ML SDK\")"
]
},

View File

@@ -87,7 +87,7 @@
"metadata": {},
"outputs": [],
"source": [
"print(\"This notebook was created using version 1.5.0 of the Azure ML SDK\")\n",
"print(\"This notebook was created using version 1.6.0 of the Azure ML SDK\")\n",
"print(\"You are currently using version\", azureml.core.VERSION, \"of the Azure ML SDK\")"
]
},
@@ -510,16 +510,16 @@
"metadata": {},
"outputs": [],
"source": [
"from azureml.automl.core.shared import constants, metrics\n",
"from azureml.automl.core.shared import constants\n",
"from azureml.automl.runtime.shared.score import scoring\n",
"from sklearn.metrics import mean_absolute_error, mean_squared_error\n",
"from matplotlib import pyplot as plt\n",
"\n",
"# use automl metrics module\n",
"scores = metrics.compute_metrics_regression(\n",
" df_all['predicted'],\n",
" df_all[target_column_name],\n",
" list(constants.Metric.SCALAR_REGRESSION_SET),\n",
" None, None, None)\n",
"scores = scoring.score_regression(\n",
" y_test=df_all[target_column_name],\n",
" y_pred=df_all['predicted'],\n",
" metrics=list(constants.Metric.SCALAR_REGRESSION_SET))\n",
"\n",
"print(\"[Test data scores]\\n\")\n",
"for key, value in scores.items(): \n",

View File

@@ -97,7 +97,7 @@
"metadata": {},
"outputs": [],
"source": [
"print(\"This notebook was created using version 1.5.0 of the Azure ML SDK\")\n",
"print(\"This notebook was created using version 1.6.0 of the Azure ML SDK\")\n",
"print(\"You are currently using version\", azureml.core.VERSION, \"of the Azure ML SDK\")"
]
},
@@ -465,7 +465,7 @@
"metadata": {},
"source": [
"### Forecast Function\n",
"For forecasting, we will use the forecast function instead of the predict function. Using the predict method would result in getting predictions for EVERY horizon the forecaster can predict at. This is useful when training and evaluating the performance of the forecaster at various horizons, but the level of detail is excessive for normal use. Forecast function also can handle more complicated scenarios, see notebook on [high frequency forecasting](https://github.com/Azure/MachineLearningNotebooks/blob/master/how-to-use-azureml/automated-machine-learning/forecasting-high-frequency/auto-ml-forecasting-function.ipynb)."
"For forecasting, we will use the forecast function instead of the predict function. Using the predict method would result in getting predictions for EVERY horizon the forecaster can predict at. This is useful when training and evaluating the performance of the forecaster at various horizons, but the level of detail is excessive for normal use. Forecast function also can handle more complicated scenarios, see the [forecast function notebook](../forecasting-forecast-function/auto-ml-forecasting-function.ipynb)."
]
},
{
@@ -507,15 +507,15 @@
"metadata": {},
"outputs": [],
"source": [
"from azureml.automl.core.shared import constants, metrics\n",
"from azureml.automl.core.shared import constants\n",
"from azureml.automl.runtime.shared.score import scoring\n",
"from matplotlib import pyplot as plt\n",
"\n",
"# use automl metrics module\n",
"scores = metrics.compute_metrics_regression(\n",
" df_all['predicted'],\n",
" df_all[target_column_name],\n",
" list(constants.Metric.SCALAR_REGRESSION_SET),\n",
" None, None, None)\n",
"scores = scoring.score_regression(\n",
" y_test=df_all[target_column_name],\n",
" y_pred=df_all['predicted'],\n",
" metrics=list(constants.Metric.SCALAR_REGRESSION_SET))\n",
"\n",
"print(\"[Test data scores]\\n\")\n",
"for key, value in scores.items(): \n",
@@ -667,15 +667,15 @@
"metadata": {},
"outputs": [],
"source": [
"from azureml.automl.core.shared import constants, metrics\n",
"from azureml.automl.core.shared import constants\n",
"from azureml.automl.runtime.shared.score import scoring\n",
"from matplotlib import pyplot as plt\n",
"\n",
"# use automl metrics module\n",
"scores = metrics.compute_metrics_regression(\n",
" df_all['predicted'],\n",
" df_all[target_column_name],\n",
" list(constants.Metric.SCALAR_REGRESSION_SET),\n",
" None, None, None)\n",
"scores = scoring.score_regression(\n",
" y_test=df_all[target_column_name],\n",
" y_pred=df_all['predicted'],\n",
" metrics=list(constants.Metric.SCALAR_REGRESSION_SET))\n",
"\n",
"print(\"[Test data scores]\\n\")\n",
"for key, value in scores.items(): \n",

View File

@@ -35,7 +35,6 @@
"Terminology:\n",
"* forecast origin: the last period when the target value is known\n",
"* forecast periods(s): the period(s) for which the value of the target is desired.\n",
"* forecast horizon: the number of forecast periods\n",
"* lookback: how many past periods (before forecast origin) the model function depends on. The larger of number of lags and length of rolling window.\n",
"* prediction context: `lookback` periods immediately preceding the forecast origin\n",
"\n",
@@ -95,7 +94,7 @@
"metadata": {},
"outputs": [],
"source": [
"print(\"This notebook was created using version 1.5.0 of the Azure ML SDK\")\n",
"print(\"This notebook was created using version 1.6.0 of the Azure ML SDK\")\n",
"print(\"You are currently using version\", azureml.core.VERSION, \"of the Azure ML SDK\")"
]
},
@@ -720,6 +719,90 @@
"X_show[['date', 'grain', 'ext_predictor', '_automl_target_col']]\n",
"# prediction is in _automl_target_col"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Forecasting farther than the maximum horizon <a id=\"recursive forecasting\"></a>\n",
"When the forecast destination, or the latest date in the prediction data frame, is farther into the future than the specified maximum horizon, the `forecast()` function will still make point predictions out to the later date using a recursive operation mode. Internally, the method recursively applies the regular forecaster to generate context so that we can forecast further into the future. \n",
"\n",
"To illustrate the use-case and operation of recursive forecasting, we'll consider an example with a single time-series where the forecasting period directly follows the training period and is twice as long as the maximum horizon given at training time.\n",
"\n",
"![Recursive_forecast_overview](recursive_forecast_overview_small.png)\n",
"\n",
"Internally, we apply the forecaster in an iterative manner and finish the forecast task in two interations. In the first iteration, we apply the forecaster and get the prediction for the first max-horizon periods (y_pred1). In the second iteraction, y_pred1 is used as the context to produce the prediction for the next max-horizon periods (y_pred2). The combination of (y_pred1 and y_pred2) gives the results for the total forecast periods. \n",
"\n",
"A caveat: forecast accuracy will likely be worse the farther we predict into the future since errors are compounded with recursive application of the forecaster.\n",
"\n",
"![Recursive_forecast_iter1](recursive_forecast_iter1.png)\n",
"![Recursive_forecast_iter2](recursive_forecast_iter2.png)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# generate the same kind of test data we trained on, but with a single grain/time-series and test period twice as long as the max_horizon\n",
"_, _, X_test_long, y_test_long = get_timeseries(train_len=n_train_periods,\n",
" test_len=max_horizon*2,\n",
" time_column_name=TIME_COLUMN_NAME,\n",
" target_column_name=TARGET_COLUMN_NAME,\n",
" grain_column_name=GRAIN_COLUMN_NAME,\n",
" grains=1)\n",
"\n",
"print(X_test_long.groupby(GRAIN_COLUMN_NAME)[TIME_COLUMN_NAME].min())\n",
"print(X_test_long.groupby(GRAIN_COLUMN_NAME)[TIME_COLUMN_NAME].max())"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# forecast() function will invoke the recursive forecast method internally.\n",
"y_pred_long, X_trans_long = fitted_model.forecast(X_test_long)\n",
"y_pred_long"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# What forecast() function does in this case is equivalent to iterating it twice over the test set as the following. \n",
"y_pred1, _ = fitted_model.forecast(X_test_long[:max_horizon])\n",
"y_pred_all, _ = fitted_model.forecast(X_test_long, np.concatenate((y_pred1, np.full(max_horizon, np.nan))))\n",
"np.array_equal(y_pred_all, y_pred_long)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Confidence interval and distributional forecasts\n",
"AutoML cannot currently estimate forecast errors beyond the maximum horizon set during training, so the `forecast_quantiles()` function will return missing values for quantiles not equal to 0.5 beyond the maximum horizon. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"fitted_model.forecast_quantiles(X_test_long)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Similarly with the simple senarios illustrated above, forecasting farther than the max horizon in other senarios like 'multiple grain', 'Destination-date forecast', and 'forecast away from the training data' are also automatically handled by the `forecast()` function. "
]
}
],
"metadata": {

Binary file not shown.

After

Width:  |  Height:  |  Size: 26 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 30 KiB

View File

@@ -82,7 +82,7 @@
"metadata": {},
"outputs": [],
"source": [
"print(\"This notebook was created using version 1.5.0 of the Azure ML SDK\")\n",
"print(\"This notebook was created using version 1.6.0 of the Azure ML SDK\")\n",
"print(\"You are currently using version\", azureml.core.VERSION, \"of the Azure ML SDK\")"
]
},
@@ -545,7 +545,7 @@
"source": [
"If you are used to scikit pipelines, perhaps you expected `predict(X_test)`. However, forecasting requires a more general interface that also supplies the past target `y` values. Please use `forecast(X,y)` as `predict(X)` is reserved for internal purposes on forecasting models.\n",
"\n",
"The [forecast function notebook](https://github.com/Azure/MachineLearningNotebooks/blob/master/how-to-use-azureml/automated-machine-learning/forecasting-high-frequency/auto-ml-forecasting-function.ipynb) demonstrates the use of the forecast function for a variety of use cases. Also, please see the [API documentation for the forecast function](https://docs.microsoft.com/en-us/python/api/azureml-automl-runtime/azureml.automl.runtime.shared.model_wrappers.forecastingpipelinewrapper?view=azure-ml-py#forecast-x-pred--typing-union-pandas-core-frame-dataframe--nonetype----none--y-pred--typing-union-pandas-core-frame-dataframe--numpy-ndarray--nonetype----none--forecast-destination--typing-union-pandas--libs-tslibs-timestamps-timestamp--nonetype----none--ignore-data-errors--bool---false-----typing-tuple-numpy-ndarray--pandas-core-frame-dataframe-)."
"The [forecast function notebook](../forecasting-forecast-function/auto-ml-forecasting-function.ipynb)."
]
},
{
@@ -576,15 +576,15 @@
"metadata": {},
"outputs": [],
"source": [
"from azureml.automl.core.shared import constants, metrics\n",
"from azureml.automl.core.shared import constants\n",
"from azureml.automl.runtime.shared.score import scoring\n",
"from matplotlib import pyplot as plt\n",
"\n",
"# use automl metrics module\n",
"scores = metrics.compute_metrics_regression(\n",
" df_all['predicted'],\n",
" df_all[target_column_name],\n",
" list(constants.Metric.SCALAR_REGRESSION_SET),\n",
" None, None, None)\n",
"# use automl scoring module\n",
"scores = scoring.score_regression(\n",
" y_test=df_all[target_column_name],\n",
" y_pred=df_all['predicted'],\n",
" metrics=list(constants.Metric.SCALAR_REGRESSION_SET))\n",
"\n",
"print(\"[Test data scores]\\n\")\n",
"for key, value in scores.items(): \n",

View File

@@ -95,7 +95,7 @@
"metadata": {},
"outputs": [],
"source": [
"print(\"This notebook was created using version 1.5.0 of the Azure ML SDK\")\n",
"print(\"This notebook was created using version 1.6.0 of the Azure ML SDK\")\n",
"print(\"You are currently using version\", azureml.core.VERSION, \"of the Azure ML SDK\")"
]
},

View File

@@ -98,7 +98,7 @@
"metadata": {},
"outputs": [],
"source": [
"print(\"This notebook was created using version 1.5.0 of the Azure ML SDK\")\n",
"print(\"This notebook was created using version 1.6.0 of the Azure ML SDK\")\n",
"print(\"You are currently using version\", azureml.core.VERSION, \"of the Azure ML SDK\")"
]
},

View File

@@ -7,7 +7,7 @@ import azureml.train.automl
import azureml.explain.model
from azureml.train.automl.runtime.automl_explain_utilities import AutoMLExplainerSetupClass, \
automl_setup_model_explanations
from sklearn.externals import joblib
import joblib
from azureml.core.model import Model

View File

@@ -4,15 +4,14 @@ import os
from azureml.core.run import Run
from azureml.core.experiment import Experiment
from sklearn.externals import joblib
from azureml.core.dataset import Dataset
from azureml.train.automl.runtime.automl_explain_utilities import AutoMLExplainerSetupClass, \
automl_setup_model_explanations, automl_check_model_if_explainable
from azureml.explain.model.mimic.models.lightgbm_model import LGBMExplainableModel
from azureml.explain.model.mimic_wrapper import MimicWrapper
from azureml.automl.core.shared.constants import MODEL_PATH
from azureml.explain.model.scoring.scoring_explainer import TreeScoringExplainer, save
from azureml.explain.model.scoring.scoring_explainer import TreeScoringExplainer
import joblib
OUTPUT_DIR = './outputs/'
os.makedirs(OUTPUT_DIR, exist_ok=True)
@@ -74,7 +73,8 @@ print("Engineered and raw explanations computed successfully")
scoring_explainer = TreeScoringExplainer(explainer.explainer, feature_maps=[automl_explainer_setup_obj.feature_map])
# Pickle scoring explainer locally
save(scoring_explainer, exist_ok=True)
with open('scoring_explainer.pkl', 'wb') as stream:
joblib.dump(scoring_explainer, stream)
# Upload the scoring explainer to the automl run
automl_run.upload_file('outputs/scoring_explainer.pkl', 'scoring_explainer.pkl')

View File

@@ -92,7 +92,7 @@
"metadata": {},
"outputs": [],
"source": [
"print(\"This notebook was created using version 1.5.0 of the Azure ML SDK\")\n",
"print(\"This notebook was created using version 1.6.0 of the Azure ML SDK\")\n",
"print(\"You are currently using version\", azureml.core.VERSION, \"of the Azure ML SDK\")"
]
},

View File

@@ -3,6 +3,6 @@ dependencies:
- python=3.6.2
- pip:
- azureml-defaults
- scikit-learn
- scikit-learn==0.19.1
- numpy
- inference-schema[numpy-support]

View File

@@ -233,7 +233,8 @@
" 'inference-schema[numpy-support]',\n",
" 'joblib',\n",
" 'numpy',\n",
" 'scikit-learn'\n",
" 'scikit-learn==0.19.1',\n",
" 'scipy'\n",
"])\n",
"inference_config = InferenceConfig(entry_script='score.py', environment=environment)\n",
"# if cpu and memory_in_gb parameters are not provided\n",

View File

@@ -5,7 +5,7 @@
"metadata": {},
"source": [
"# Enabling App Insights for Services in Production\n",
"With this notebook, you can learn how to enable App Insights for standard service monitoring, plus, we provide examples for doing custom logging within a scoring files in a model. \n",
"With this notebook, you can learn how to enable App Insights for standard service monitoring, plus, we provide examples for doing custom logging within a scoring files in a model.\n",
"\n",
"\n",
"## What does Application Insights monitor?\n",
@@ -45,11 +45,13 @@
"metadata": {},
"outputs": [],
"source": [
"import azureml.core\n",
"import json\n",
"\n",
"from azureml.core import Workspace\n",
"from azureml.core.compute import AksCompute, ComputeTarget\n",
"from azureml.core.webservice import AksWebservice\n",
"import azureml.core\n",
"import json\n",
"\n",
"print(azureml.core.VERSION)"
]
},
@@ -67,7 +69,7 @@
"outputs": [],
"source": [
"ws = Workspace.from_config()\n",
"print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep = '\\n')"
"print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep='\\n')"
]
},
{
@@ -84,13 +86,13 @@
"metadata": {},
"outputs": [],
"source": [
"#Register the model\n",
"from azureml.core.model import Model\n",
"model = Model.register(model_path = \"sklearn_regression_model.pkl\", # this points to a local file\n",
" model_name = \"sklearn_regression_model.pkl\", # this is the name the model is registered as\n",
" tags = {'area': \"diabetes\", 'type': \"regression\"},\n",
" description = \"Ridge regression model to predict diabetes\",\n",
" workspace = ws)\n",
"from azureml.core import Model\n",
"\n",
"model = Model.register(model_path=\"sklearn_regression_model.pkl\", # This points to a local file.\n",
" model_name=\"sklearn_regression_model.pkl\", # This is the name the model is registered as.\n",
" tags={'area': \"diabetes\", 'type': \"regression\"},\n",
" description=\"Ridge regression model to predict diabetes\",\n",
" workspace=ws)\n",
"\n",
"print(model.name, model.description, model.version)"
]
@@ -120,7 +122,7 @@
"import os\n",
"import pickle\n",
"import json\n",
"import numpy \n",
"import numpy\n",
"from sklearn.externals import joblib\n",
"from sklearn.linear_model import Ridge\n",
"import time\n",
@@ -129,15 +131,15 @@
" global model\n",
" #Print statement for appinsights custom traces:\n",
" print (\"model initialized\" + time.strftime(\"%H:%M:%S\"))\n",
" \n",
"\n",
" # AZUREML_MODEL_DIR is an environment variable created during deployment.\n",
" # It is the path to the model folder (./azureml-models/$MODEL_NAME/$VERSION)\n",
" # For multiple models, it points to the folder containing all deployed models (./azureml-models)\n",
" model_path = os.path.join(os.getenv('AZUREML_MODEL_DIR'), 'sklearn_regression_model.pkl')\n",
" \n",
"\n",
" # deserialize the model file back into a sklearn model\n",
" model = joblib.load(model_path)\n",
" \n",
"\n",
"\n",
"# note you can pass in multiple rows for scoring\n",
"def run(raw_data):\n",
@@ -168,7 +170,7 @@
"metadata": {},
"outputs": [],
"source": [
"from azureml.core.conda_dependencies import CondaDependencies \n",
"from azureml.core.conda_dependencies import CondaDependencies\n",
"\n",
"myenv = CondaDependencies.create(conda_packages=['numpy','scikit-learn'],\n",
" pip_packages=['azureml-defaults'])\n",
@@ -190,9 +192,8 @@
"metadata": {},
"outputs": [],
"source": [
"from azureml.core.model import InferenceConfig\n",
"from azureml.core.environment import Environment\n",
"\n",
"from azureml.core.model import InferenceConfig\n",
"\n",
"myenv = Environment.from_conda_specification(name=\"myenv\", file_path=\"myenv.yml\")\n",
"inference_config = InferenceConfig(entry_script=\"score.py\", environment=myenv)"
@@ -213,11 +214,11 @@
"source": [
"from azureml.core.webservice import AciWebservice\n",
"\n",
"aci_deployment_config = AciWebservice.deploy_configuration(cpu_cores = 1, \n",
" memory_gb = 1, \n",
" tags = {'area': \"diabetes\", 'type': \"regression\"}, \n",
" description = 'Predict diabetes using regression model',\n",
" enable_app_insights = True)"
"aci_deployment_config = AciWebservice.deploy_configuration(cpu_cores=1,\n",
" memory_gb=1,\n",
" tags={'area': \"diabetes\", 'type': \"regression\"},\n",
" description=\"Predict diabetes using regression model\",\n",
" enable_app_insights=True)"
]
},
{
@@ -226,29 +227,14 @@
"metadata": {},
"outputs": [],
"source": [
"from azureml.core.webservice import Webservice\n",
"aci_service_name = \"aci-service-appinsights\"\n",
"\n",
"aci_service = Model.deploy(ws, aci_service_name, [model], inference_config, aci_deployment_config, overwrite=True)\n",
"aci_service.wait_for_deployment(show_output=True)\n",
"\n",
"aci_service_name = 'my-aci-service-4'\n",
"aci_service = Model.deploy(ws, aci_service_name, [model], inference_config, aci_deployment_config)\n",
"aci_service.wait_for_deployment(True)\n",
"print(aci_service.state)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"\n",
"test_sample = json.dumps({'data': [\n",
" [1,28,13,45,54,6,57,8,8,10], \n",
" [101,9,8,37,6,45,4,3,2,41]\n",
"]})\n",
"test_sample = bytes(test_sample,encoding='utf8')"
]
},
{
"cell_type": "code",
"execution_count": null,
@@ -256,7 +242,15 @@
"outputs": [],
"source": [
"if aci_service.state == \"Healthy\":\n",
" prediction = aci_service.run(input_data=test_sample)\n",
" test_sample = json.dumps({\n",
" \"data\": [\n",
" [1,28,13,45,54,6,57,8,8,10],\n",
" [101,9,8,37,6,45,4,3,2,41]\n",
" ]\n",
" })\n",
"\n",
" prediction = aci_service.run(test_sample)\n",
"\n",
" print(prediction)\n",
"else:\n",
" raise ValueError(\"Service deployment isn't healthy, can't call the service. Error: \", aci_service.error)"
@@ -282,14 +276,21 @@
"metadata": {},
"outputs": [],
"source": [
"# Use the default configuration (can also provide parameters to customize)\n",
"prov_config = AksCompute.provisioning_configuration()\n",
"from azureml.exceptions import ComputeTargetException\n",
"\n",
"aks_name = 'my-aks-test3' \n",
"# Create the cluster\n",
"aks_target = ComputeTarget.create(workspace = ws, \n",
" name = aks_name, \n",
" provisioning_configuration = prov_config)"
"aks_name = \"my-aks\"\n",
"\n",
"try:\n",
" aks_target = ComputeTarget(ws, aks_name)\n",
" print(\"Using existing AKS cluster {}.\".format(aks_name))\n",
"except ComputeTargetException:\n",
" print(\"Creating a new AKS cluster {}.\".format(aks_name))\n",
"\n",
" # Use the default configuration (can also provide parameters to customize).\n",
" prov_config = AksCompute.provisioning_configuration()\n",
" aks_target = ComputeTarget.create(workspace=ws,\n",
" name=aks_name,\n",
" provisioning_configuration=prov_config)"
]
},
{
@@ -299,7 +300,8 @@
"outputs": [],
"source": [
"%%time\n",
"aks_target.wait_for_completion(show_output = True)"
"if aks_target.provisioning_state != \"Succeeded\":\n",
" aks_target.wait_for_completion(show_output=True)"
]
},
{
@@ -323,13 +325,13 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"```python \n",
"```python\n",
"%%time\n",
"resource_id = '/subscriptions/<subscriptionid>/resourcegroups/<resourcegroupname>/providers/Microsoft.ContainerService/managedClusters/<aksservername>'\n",
"create_name= 'myaks4'\n",
"attach_config = AksCompute.attach_configuration(resource_id=resource_id)\n",
"aks_target = ComputeTarget.attach(workspace = ws, \n",
" name = create_name, \n",
"aks_target = ComputeTarget.attach(workspace=ws,\n",
" name=create_name,\n",
" attach_configuration=attach_config)\n",
"## Wait for the operation to complete\n",
"aks_target.wait_for_provisioning(True)```"
@@ -349,7 +351,7 @@
"metadata": {},
"outputs": [],
"source": [
"#Set the web service configuration\n",
"# Set the web service configuration.\n",
"aks_deployment_config = AksWebservice.deploy_configuration(enable_app_insights=True)"
]
},
@@ -366,15 +368,16 @@
"metadata": {},
"outputs": [],
"source": [
"if aks_target.provisioning_state== \"Succeeded\": \n",
" aks_service_name ='aks-w-dc5'\n",
"if aks_target.provisioning_state == \"Succeeded\":\n",
" aks_service_name = \"aks-service-appinsights\"\n",
" aks_service = Model.deploy(ws,\n",
" aks_service_name, \n",
" [model], \n",
" inference_config, \n",
" aks_deployment_config, \n",
" deployment_target = aks_target) \n",
" aks_service.wait_for_deployment(show_output = True)\n",
" aks_service_name,\n",
" [model],\n",
" inference_config,\n",
" aks_deployment_config,\n",
" deployment_target=aks_target,\n",
" overwrite=True)\n",
" aks_service.wait_for_deployment(show_output=True)\n",
" print(aks_service.state)\n",
"else:\n",
" raise ValueError(\"AKS provisioning failed. Error: \", aks_service.error)"
@@ -395,13 +398,14 @@
"source": [
"%%time\n",
"\n",
"test_sample = json.dumps({'data': [\n",
" [1,28,13,45,54,6,57,8,8,10], \n",
" [101,9,8,37,6,45,4,3,2,41]\n",
"]})\n",
"test_sample = bytes(test_sample,encoding='utf8')\n",
"\n",
"if aks_service.state == \"Healthy\":\n",
" test_sample = json.dumps({\n",
" \"data\": [\n",
" [1,28,13,45,54,6,57,8,8,10],\n",
" [101,9,8,37,6,45,4,3,2,41]\n",
" ]\n",
" })\n",
"\n",
" prediction = aks_service.run(input_data=test_sample)\n",
" print(prediction)\n",
"else:\n",
@@ -435,7 +439,7 @@
"outputs": [],
"source": [
"aks_service.update(enable_app_insights=False)\n",
"aks_service.wait_for_deployment(show_output = True)"
"aks_service.wait_for_deployment(show_output=True)"
]
},
{

View File

@@ -115,6 +115,11 @@
"# Convert from CoreML into ONNX\n",
"onnx_model = onnxmltools.convert_coreml(coreml_model, 'TinyYOLOv2')\n",
"\n",
"# Fix the preprocessor bias in the ImageScaler\n",
"for init in onnx_model.graph.initializer:\n",
" if init.name == 'scalerPreprocessor_bias':\n",
" init.dims[1] = 1\n",
"\n",
"# Save ONNX model\n",
"onnxmltools.utils.save_model(onnx_model, 'tinyyolov2.onnx')\n",
"\n",
@@ -255,7 +260,7 @@
"source": [
"from azureml.core.conda_dependencies import CondaDependencies \n",
"\n",
"myenv = CondaDependencies.create(pip_packages=[\"numpy\", \"onnxruntime==0.4.0\", \"azureml-core\", \"azureml-defaults\"])\n",
"myenv = CondaDependencies.create(pip_packages=[\"numpy\", \"onnxruntime\", \"azureml-core\", \"azureml-defaults\"])\n",
"\n",
"with open(\"myenv.yml\",\"w\") as f:\n",
" f.write(myenv.serialize_to_string())"
@@ -316,7 +321,7 @@
"metadata": {},
"outputs": [],
"source": [
"aci_service_name = 'my-aci-service-15ad'\n",
"aci_service_name = 'my-aci-service-tiny-yolo'\n",
"print(\"Service\", aci_service_name)\n",
"aci_service = Model.deploy(ws, aci_service_name, [model], inference_config, aciconfig)\n",
"aci_service.wait_for_deployment(True)\n",

View File

@@ -4,4 +4,5 @@ dependencies:
- azureml-sdk
- numpy
- git+https://github.com/apple/coremltools@v2.1
- onnx<1.7.0
- onnxmltools

View File

@@ -5,5 +5,5 @@ dependencies:
- azureml-widgets
- matplotlib
- numpy
- onnx
- onnx<1.7.0
- opencv-python-headless

View File

@@ -5,5 +5,5 @@ dependencies:
- azureml-widgets
- matplotlib
- numpy
- onnx
- onnx<1.7.0
- opencv-python-headless

File diff suppressed because one or more lines are too long

View File

@@ -1,260 +0,0 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Copyright (c) Microsoft Corporation. All rights reserved.\n",
"\n",
"Licensed under the MIT License."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/deployment/tensorflow/tensorflow-model-register-and-deploy.png)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Register TensorFlow SavedModel and deploy as webservice\n",
"\n",
"Following this notebook, you will:\n",
"\n",
" - Learn how to register a TF SavedModel in your Azure Machine Learning Workspace.\n",
" - Deploy your model as a web service in an Azure Container Instance."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Prerequisites\n",
"\n",
"If you are using an Azure Machine Learning Notebook VM, you are all set. Otherwise, make sure you go through the [configuration notebook](../../../configuration.ipynb) to install the Azure Machine Learning Python SDK and create a workspace."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import azureml.core\n",
"\n",
"# Check core SDK version number.\n",
"print('SDK version:', azureml.core.VERSION)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Initialize workspace\n",
"\n",
"Create a [Workspace](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.workspace%28class%29?view=azure-ml-py) object from your persisted configuration."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": [
"create workspace"
]
},
"outputs": [],
"source": [
"from azureml.core import Workspace\n",
"\n",
"ws = Workspace.from_config()\n",
"print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep='\\n')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Download the Model\n",
"\n",
"Download and extract the model from https://amlsamplenotebooksdata.blob.core.windows.net/data/flowers_model.tar.gz to \"models\" directory"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"import tarfile\n",
"import urllib.request\n",
"\n",
"# create directory for model\n",
"model_dir = 'models'\n",
"if not os.path.isdir(model_dir):\n",
" os.mkdir(model_dir)\n",
"\n",
"url=\"https://amlsamplenotebooksdata.blob.core.windows.net/data/flowers_model.tar.gz\"\n",
"response = urllib.request.urlretrieve(url, model_dir + \"/flowers_model.tar.gz\")\n",
"tar = tarfile.open(model_dir + \"/flowers_model.tar.gz\", \"r:gz\")\n",
"tar.extractall(model_dir)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Register model\n",
"\n",
"Register a file or folder as a model by calling [Model.register()](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.model.model?view=azure-ml-py#register-workspace--model-path--model-name--tags-none--properties-none--description-none--datasets-none--model-framework-none--model-framework-version-none--child-paths-none-). For this example, we have provided a TensorFlow SavedModel (`flowers_model` in the notebook's directory).\n",
"\n",
"In addition to the content of the model file itself, your registered model will also store model metadata -- model description, tags, and framework information -- that will be useful when managing and deploying models in your workspace. Using tags, for instance, you can categorize your models and apply filters when listing models in your workspace. Also, marking this model with the scikit-learn framework will simplify deploying it as a web service, as we'll see later."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": [
"register model from file"
]
},
"outputs": [],
"source": [
"from azureml.core import Model\n",
"\n",
"model = Model.register(workspace=ws,\n",
" model_name='flowers', # Name of the registered model in your workspace.\n",
" model_path= model_dir + '/flowers_model', # Local Tensorflow SavedModel folder to upload and register as a model.\n",
" model_framework=Model.Framework.TENSORFLOW, # Framework used to create the model.\n",
" model_framework_version='1.14.0', # Version of Tensorflow used to create the model.\n",
" description='Flowers model')\n",
"\n",
"print('Name:', model.name)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Deploy model\n",
"\n",
"Deploy your model as a web service using [Model.deploy()](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.model.model?view=azure-ml-py#deploy-workspace--name--models--inference-config--deployment-config-none--deployment-target-none-). Web services take one or more models, load them in an environment, and run them on one of several supported deployment targets.\n",
"\n",
"For this example, we will deploy your TensorFlow SavedModel to an Azure Container Instance (ACI)."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Use a default environment (for supported models)\n",
"\n",
"The Azure Machine Learning service provides a default environment for supported model frameworks, including TensorFlow, based on the metadata you provided when registering your model. This is the easiest way to deploy your model.\n",
"\n",
"**Note**: This step can take several minutes."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.core import Webservice\n",
"from azureml.exceptions import WebserviceException\n",
"\n",
"service_name = 'tensorflow-flower-service'\n",
"\n",
"# Remove any existing service under the same name.\n",
"try:\n",
" Webservice(ws, service_name).delete()\n",
"except WebserviceException:\n",
" pass\n",
"\n",
"service = Model.deploy(ws, service_name, [model])\n",
"service.wait_for_deployment(show_output=True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"After your model is deployed, perform a call to the web service."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import requests\n",
"\n",
"headers = {'Content-Type': 'application/json'}\n",
"\n",
"if service.auth_enabled:\n",
" headers['Authorization'] = 'Bearer '+ service.get_keys()[0]\n",
"elif service.token_auth_enabled:\n",
" headers['Authorization'] = 'Bearer '+ service.get_token()[0]\n",
"\n",
"scoring_uri = service.scoring_uri # If you have a SavedModel with classify and regress, \n",
" # you can change the scoring_uri from 'uri:predict' to 'uri:classify' or 'uri:regress'.\n",
"print(scoring_uri)\n",
"\n",
"with open('tensorflow-flower-predict-input.json', 'rb') as data_file:\n",
" response = requests.post(\n",
" scoring_uri, data=data_file, headers=headers)\n",
"print(response.status_code)\n",
"print(response.elapsed)\n",
"print(response.json())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"When you are finished testing your service, clean up the deployment."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"service.delete()"
]
}
],
"metadata": {
"authors": [
{
"name": "vaidyas"
}
],
"kernelspec": {
"display_name": "Python 3.6",
"language": "python",
"name": "python36"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.0"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

View File

@@ -1,4 +0,0 @@
name: tensorflow-model-register-and-deploy
dependencies:
- pip:
- azureml-sdk

View File

@@ -58,7 +58,7 @@
"\n",
"Problem: Boston Housing Price Prediction with scikit-learn (train a model and run an explainer remotely via AMLCompute, and download and visualize the remotely-calculated explanations.)\n",
"\n",
"| ![explanations-run-history](./img/explanations-run-history.PNG) |\n",
"| ![explanations-run-history](./img/explanations-run-history.png) |\n",
"|:--:|\n"
]
},
@@ -672,7 +672,7 @@
"source": [
"# retrieve model for visualization and deployment\n",
"from azureml.core.model import Model\n",
"from sklearn.externals import joblib\n",
"import joblib\n",
"original_model = Model(ws, 'model_explain_model_on_amlcomp')\n",
"model_path = original_model.download(exist_ok=True)\n",
"original_model = joblib.load(model_path)"
@@ -692,7 +692,7 @@
"outputs": [],
"source": [
"# retrieve x_test for visualization\n",
"from sklearn.externals import joblib\n",
"import joblib\n",
"x_test_path = './x_test_boston_housing.pkl'\n",
"run.download_file('x_test_boston_housing.pkl', output_file_path=x_test_path)"
]

View File

@@ -7,7 +7,7 @@ from interpret.ext.blackbox import TabularExplainer
from azureml.contrib.interpret.explanation.explanation_client import ExplanationClient
from sklearn.model_selection import train_test_split
from azureml.core.run import Run
from sklearn.externals import joblib
import joblib
import os
import numpy as np

View File

@@ -3,7 +3,7 @@ import numpy as np
import pandas as pd
import os
import pickle
from sklearn.externals import joblib
import joblib
from sklearn.linear_model import LogisticRegression
from azureml.core.model import Model

View File

@@ -3,7 +3,7 @@ import numpy as np
import pandas as pd
import os
import pickle
from sklearn.externals import joblib
import joblib
from sklearn.linear_model import LogisticRegression
from azureml.core.model import Model

View File

@@ -165,7 +165,7 @@
"outputs": [],
"source": [
"from sklearn.model_selection import train_test_split\n",
"from sklearn.externals import joblib\n",
"import joblib\n",
"from sklearn.preprocessing import StandardScaler, OneHotEncoder\n",
"from sklearn.impute import SimpleImputer\n",
"from sklearn.pipeline import Pipeline\n",

View File

@@ -63,7 +63,7 @@
"7.\tCreate an image and register it in the image registry.\n",
"8.\tDeploy the image as a web service in Azure.\n",
"\n",
"| ![azure-machine-learning-cycle](./img/azure-machine-learning-cycle.PNG) |\n",
"| ![azure-machine-learning-cycle](./img/azure-machine-learning-cycle.png) |\n",
"|:--:|"
]
},
@@ -325,7 +325,7 @@
"source": [
"# retrieve model for visualization and deployment\n",
"from azureml.core.model import Model\n",
"from sklearn.externals import joblib\n",
"import joblib\n",
"original_model = Model(ws, 'amlcompute_deploy_model')\n",
"model_path = original_model.download(exist_ok=True)\n",
"original_svm_model = joblib.load(model_path)"
@@ -352,7 +352,7 @@
"outputs": [],
"source": [
"# retrieve x_test for visualization\n",
"from sklearn.externals import joblib\n",
"import joblib\n",
"x_test_path = './x_test.pkl'\n",
"run.download_file('x_test_ibm.pkl', output_file_path=x_test_path)\n",
"x_test = joblib.load(x_test_path)"

View File

@@ -6,7 +6,7 @@ import os
import pandas as pd
import zipfile
from sklearn.model_selection import train_test_split
from sklearn.externals import joblib
import joblib
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline

View File

@@ -252,7 +252,7 @@
"source": [
"binaries_folder = \"azurebatch/job_binaries\"\n",
"if not os.path.isdir(binaries_folder):\n",
" os.mkdir(binaries_folder)\n",
" os.makedirs(binaries_folder)\n",
"\n",
"file_name=\"azurebatch.cmd\"\n",
"with open(path.join(binaries_folder, file_name), 'w') as f:\n",

View File

@@ -0,0 +1,510 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Copyright (c) Microsoft Corporation. All rights reserved. \n",
"Licensed under the MIT License."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/machine-learning-pipelines/intro-to-pipelines/aml-pipelines-with-data-dependency-steps.png)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Showcasing Dataset and PipelineParameter\n",
"\n",
"This notebook demonstrates how a [**FileDataset**](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.filedataset?view=azure-ml-py) or [**TabularDataset**](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.tabulardataset?view=azure-ml-py) can be parametrized with [**PipelineParameters**](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipelineparameter?view=azure-ml-py) in an AML [Pipeline](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipeline(class)?view=azure-ml-py). By parametrizing datasets, you can dynamically run pipeline experiments with different datasets without any code change.\n",
"\n",
"A common use case is building a training pipeline with a sample of your training data for quick iterative development. When you're ready to test and deploy your pipeline at scale, you can pass in your full training dataset to the pipeline experiment without making any changes to your training script. \n",
" \n",
"To see more about how parameters work between steps, please refer [aml-pipelines-with-data-dependency-steps](https://aka.ms/pl-data-dep).\n",
"\n",
"* [How to create a Pipeline with a Dataset PipelineParameter](#index1)\n",
"* [How to submit a Pipeline with a Dataset PipelineParameter](#index2)\n",
"* [How to submit a Pipeline and change the Dataset PipelineParameter value from the sdk](#index3)\n",
"* [How to submit a Pipeline and change the Dataset PipelineParameter value using a REST call](#index4)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Azure Machine Learning and Pipeline SDK-specific imports"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import azureml.core\n",
"from azureml.core import Workspace, Experiment, Dataset\n",
"from azureml.core.compute import ComputeTarget, AmlCompute\n",
"from azureml.data.dataset_consumption_config import DatasetConsumptionConfig\n",
"from azureml.widgets import RunDetails\n",
"\n",
"from azureml.pipeline.core import PipelineParameter\n",
"from azureml.pipeline.core import Pipeline, PipelineRun\n",
"from azureml.pipeline.steps import PythonScriptStep\n",
"\n",
"# Check core SDK version number\n",
"print(\"SDK version:\", azureml.core.VERSION)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Initialize Workspace\n",
"\n",
"Initialize a workspace object from persisted configuration. If you are using an Azure Machine Learning Notebook VM, you are all set. Otherwise, make sure the config file is present at .\\config.json\n",
"\n",
"If you don't have a config.json file, go through the [configuration Notebook](https://aka.ms/pl-config) first.\n",
"\n",
"This sets you up with a working config file that has information on your workspace, subscription id, etc."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ws = Workspace.from_config()\n",
"print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep = '\\n')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Create an Azure ML experiment\n",
"\n",
"Let's create an experiment named \"showcasing-dataset\" and a folder to hold the training scripts. The script runs will be recorded under the experiment in Azure."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Choose a name for the run history container in the workspace.\n",
"experiment_name = 'showcasing-dataset'\n",
"source_directory = '.'\n",
"\n",
"experiment = Experiment(ws, experiment_name)\n",
"experiment"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Create or Attach an AmlCompute cluster\n",
"You will need to create a [compute target](https://docs.microsoft.com/azure/machine-learning/service/concept-azure-machine-learning-architecture#compute-target) for your AutoML run. In this tutorial, you get the default `AmlCompute` as your training compute resource."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Choose a name for your cluster.\n",
"amlcompute_cluster_name = \"cpu-cluster\"\n",
"\n",
"found = False\n",
"# Check if this compute target already exists in the workspace.\n",
"cts = ws.compute_targets\n",
"if amlcompute_cluster_name in cts and cts[amlcompute_cluster_name].type == 'AmlCompute':\n",
" found = True\n",
" print('Found existing compute target.')\n",
" compute_target = cts[amlcompute_cluster_name]\n",
" \n",
"if not found:\n",
" print('Creating a new compute target...')\n",
" provisioning_config = AmlCompute.provisioning_configuration(vm_size = \"STANDARD_D2_V2\", # for GPU, use \"STANDARD_NC6\"\n",
" #vm_priority = 'lowpriority', # optional\n",
" max_nodes = 4)\n",
"\n",
" # Create the cluster.\n",
" compute_target = ComputeTarget.create(ws, amlcompute_cluster_name, provisioning_config)\n",
" \n",
" # Can poll for a minimum number of nodes and for a specific timeout.\n",
" # If no min_node_count is provided, it will use the scale settings for the cluster.\n",
" compute_target.wait_for_completion(show_output = True, timeout_in_minutes = 10)\n",
" \n",
" # For a more detailed view of current AmlCompute status, use get_status()."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Dataset Configuration\n",
"\n",
"The following steps detail how to create a [FileDataset](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.filedataset?view=azure-ml-py) and [TabularDataset](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.tabulardataset?view=azure-ml-py) from an external CSV file, and configure them to be used by a [Pipeline](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipeline(class)?view=azure-ml-py):\n",
"\n",
"1. Create a dataset from a csv file\n",
"2. Create a [PipelineParameter](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipelineparameter?view=azure-ml-py) object and set the `default_value` to the dataset. [PipelineParameter](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipelineparameter?view=azure-ml-py) objects enabled arguments to be passed into Pipelines when they are resubmitted after creation. The `name` is referenced later on when we submit additional pipeline runs with different input datasets. \n",
"3. Create a [DatasetConsumptionConfig](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.dataset_consumption_config.datasetconsumptionconfig?view=azure-ml-py) object from the [PiepelineParameter](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipelineparameter?view=azure-ml-py). The [DatasetConsumptionConfig](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.dataset_consumption_config.datasetconsumptionconfig?view=azure-ml-py) object specifies how the dataset should be used by the remote compute where the pipeline is run. **NOTE** only [DatasetConsumptionConfig](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.dataset_consumption_config.datasetconsumptionconfig?view=azure-ml-py) objects built on [FileDataset](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.filedataset?view=azure-ml-py) can be set `as_mount()` or `as_download()` on the remote compute."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": [
"datapath-remarks-sample"
]
},
"outputs": [],
"source": [
"file_dataset = Dataset.File.from_files('https://dprepdata.blob.core.windows.net/demo/Titanic.csv')\n",
"file_pipeline_param = PipelineParameter(name=\"file_ds_param\", default_value=file_dataset)\n",
"file_ds_consumption = DatasetConsumptionConfig(\"file_dataset\", file_pipeline_param).as_mount()\n",
"\n",
"tabular_dataset = Dataset.Tabular.from_delimited_files('https://dprepdata.blob.core.windows.net/demo/Titanic.csv')\n",
"tabular_pipeline_param = PipelineParameter(name=\"tabular_ds_param\", default_value=tabular_dataset)\n",
"tabular_ds_consumption = DatasetConsumptionConfig(\"tabular_dataset\", tabular_pipeline_param)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We will setup a training script to ingest our passed-in datasets and print their contents. **NOTE** the names of the datasets referenced inside the training script correspond to the `name` of their respective [DatasetConsumptionConfig](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.dataset_consumption_config.datasetconsumptionconfig?view=azure-ml-py) objects we defined above."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%writefile train_with_dataset.py\n",
"from azureml.core import Run\n",
"\n",
"input_file_ds_path = Run.get_context().input_datasets['file_dataset']\n",
"with open(input_file_ds_path, 'r') as f:\n",
" content = f.read()\n",
" print(content)\n",
"\n",
"input_tabular_ds = Run.get_context().input_datasets['tabular_dataset']\n",
"tabular_df = input_tabular_ds.to_pandas_dataframe()\n",
"print(tabular_df)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<a id='index1'></a>"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Create a Pipeline with a Dataset PipelineParameter\n",
"\n",
"Note that the ```file_ds_consumption``` and ```tabular_ds_consumption``` are specified as both arguments and inputs to create a step."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"train_step = PythonScriptStep(\n",
" name=\"train_step\",\n",
" script_name=\"train_with_dataset.py\",\n",
" arguments=[\"--param1\", file_ds_consumption, \"--param2\", tabular_ds_consumption],\n",
" inputs=[file_ds_consumption, tabular_ds_consumption],\n",
" compute_target=compute_target,\n",
" source_directory=source_directory)\n",
"\n",
"print(\"train_step created\")\n",
"\n",
"pipeline = Pipeline(workspace=ws, steps=[train_step])\n",
"print(\"pipeline with the train_step created\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<a id='index2'></a>"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Submit a Pipeline with a Dataset PipelineParameter\n",
"\n",
"Pipelines can be submitted with default values of PipelineParameters by not specifying any parameters."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Pipeline will run with default file_ds and tabular_ds\n",
"pipeline_run = experiment.submit(pipeline)\n",
"print(\"Pipeline is submitted for execution\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"RunDetails(pipeline_run).show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"pipeline_run.wait_for_completion()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<a id='index3'></a>"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Submit a Pipeline with a different Dataset PipelineParameter value from the SDK\n",
"\n",
"The training pipeline can be reused with different input datasets by passing them in as PipelineParameters"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"iris_file_ds = Dataset.File.from_files('https://raw.githubusercontent.com/Azure/MachineLearningNotebooks/'\n",
" '4e7b3784d50e81c313c62bcdf9a330194153d9cd/how-to-use-azureml/work-with-data/'\n",
" 'datasets-tutorial/train-with-datasets/train-dataset/iris.csv')\n",
"\n",
"iris_tabular_ds = Dataset.Tabular.from_delimited_files('https://raw.githubusercontent.com/Azure/MachineLearningNotebooks/'\n",
" '4e7b3784d50e81c313c62bcdf9a330194153d9cd/how-to-use-azureml/work-with-data/'\n",
" 'datasets-tutorial/train-with-datasets/train-dataset/iris.csv')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"pipeline_run_with_params = experiment.submit(pipeline, pipeline_parameters={'file_ds_param': iris_file_ds, 'tabular_ds_param': iris_tabular_ds}) "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"RunDetails(pipeline_run_with_params).show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"pipeline_run_with_params.wait_for_completion()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<a id='index4'></a>"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Dynamically Set the Dataset PipelineParameter Values using a REST Call\n",
"\n",
"Let's publish the pipeline we created previously, so we can generate a pipeline endpoint. We can then submit the iris datasets to the pipeline REST endpoint by passing in their IDs. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"published_pipeline = pipeline.publish(name=\"Dataset_Pipeline\", description=\"Pipeline to test Dataset PipelineParameter\", continue_on_step_failure=True)\n",
"published_pipeline"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"published_pipeline.submit(ws, experiment_name=\"publishedexperiment\", pipeline_parameters={'file_ds_param': iris_file_ds, 'tabular_ds_param': iris_tabular_ds})"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.core.authentication import InteractiveLoginAuthentication\n",
"import requests\n",
"\n",
"auth = InteractiveLoginAuthentication()\n",
"aad_token = auth.get_authentication_header()\n",
"\n",
"rest_endpoint = published_pipeline.endpoint\n",
"\n",
"print(\"You can perform HTTP POST on URL {} to trigger this pipeline\".format(rest_endpoint))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# specify the param when running the pipeline\n",
"response = requests.post(rest_endpoint, \n",
" headers=aad_token, \n",
" json={\"ExperimentName\": \"MyRestPipeline\",\n",
" \"RunSource\": \"SDK\",\n",
" \"DataSetDefinitionValueAssignments\": {\"file_ds_param\": {\"SavedDataSetReference\": {\"Id\": iris_file_ds.id}},\n",
" \"tabular_ds_param\": {\"SavedDataSetReference\": {\"Id\": iris_tabular_ds.id}}}\n",
" }\n",
" )"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"try:\n",
" response.raise_for_status()\n",
"except Exception: \n",
" raise Exception('Received bad response from the endpoint: {}\\n'\n",
" 'Response Code: {}\\n'\n",
" 'Headers: {}\\n'\n",
" 'Content: {}'.format(rest_endpoint, response.status_code, response.headers, response.content))\n",
"\n",
"run_id = response.json().get('Id')\n",
"print('Submitted pipeline run: ', run_id)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"published_pipeline_run_via_rest = PipelineRun(ws.experiments[\"MyRestPipeline\"], run_id)\n",
"RunDetails(published_pipeline_run_via_rest).show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"published_pipeline_run_via_rest.wait_for_completion()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<a id='index5'></a>"
]
}
],
"metadata": {
"authors": [
{
"name": "rafarmah"
}
],
"category": "tutorial",
"compute": [
"AML Compute"
],
"datasets": [
"Custom"
],
"deployment": [
"None"
],
"exclude_from_index": false,
"framework": [
"Azure ML"
],
"friendly_name": "How to use Dataset as a PipelineParameter",
"kernelspec": {
"display_name": "Python 3.6",
"language": "python",
"name": "python36"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.7"
},
"order_index": 13,
"star_tag": [
"featured"
],
"tags": [
"None"
],
"task": "Demonstrates the use of Dataset as a PipelineParameter"
},
"nbformat": 4,
"nbformat_minor": 2
}

View File

@@ -0,0 +1,5 @@
name: aml-pipelines-showcasing-dataset-and-pipelineparameter
dependencies:
- pip:
- azureml-sdk
- azureml-widgets

View File

@@ -510,7 +510,7 @@
" inputs=[step_1_input],\n",
" num_workers=1,\n",
" python_script_path=python_script_path,\n",
" python_script_params={'arg1', pipeline_param, 'arg2},\n",
" python_script_params={'arg1', pipeline_param, 'arg2'},\n",
" run_name='DB_Python_demo',\n",
" compute_target=databricks_compute,\n",
" allow_reuse=True\n",

View File

@@ -279,8 +279,7 @@
"# Specify CondaDependencies obj, add necessary packages\n",
"aml_run_config.environment.python.conda_dependencies = CondaDependencies.create(\n",
" conda_packages=['pandas','scikit-learn'], \n",
" pip_packages=['azureml-sdk[automl,explain]', 'pyarrow'], \n",
" pin_sdk_version=False)\n",
" pip_packages=['azureml-sdk[automl,explain]', 'pyarrow'])\n",
"\n",
"print (\"Run configuration created.\")"
]
@@ -692,7 +691,6 @@
" debug_log = 'automated_ml_errors.log',\n",
" path = train_model_folder,\n",
" compute_target = aml_compute,\n",
" run_configuration = aml_run_config,\n",
" featurization = 'auto',\n",
" training_data = training_dataset,\n",
" label_column_name = 'cost',\n",

View File

@@ -2,18 +2,16 @@
Azure Machine Learning Batch Inference targets large inference jobs that are not time-sensitive. Batch Inference provides cost-effective inference compute scaling, with unparalleled throughput for asynchronous applications. It is optimized for high-throughput, fire-and-forget inference over large collections of data.
# Getting Started with Batch Inference Public Preview
# Getting Started with Batch Inference
Batch inference public preview offers a platform in which to do large inference or generic parallel map-style operations. Below introduces the major steps to use this new functionality. For a quick try, please follow the prerequisites and simply run the sample notebooks provided in this directory.
Batch inference offers a platform in which to do large inference or generic parallel map-style operations. Below introduces the major steps to use this new functionality. For a quick try, please follow the prerequisites and simply run the sample notebooks provided in this directory.
## Prerequisites
### Python package installation
Following the convention of most AzureML Public Preview features, Batch Inference SDK is currently available as a contrib package.
If you're unfamiliar with creating a new Python environment, you may follow this example for [creating a conda environment](https://docs.microsoft.com/en-us/azure/machine-learning/service/how-to-configure-environment#local). Batch Inference package can be installed through the following pip command.
```
pip install azureml-contrib-pipeline-steps
pip install azureml-pipeline-steps
```
### Creation of Azure Machine Learning Workspace
@@ -66,9 +64,8 @@ base_image_registry.password = "password"
## Create a batch inference job
**ParallelRunStep** is a newly added step in the azureml.contrib.pipeline.steps package. You will use it to add a step to create a batch inference job with your Azure machine learning pipeline. (Use batch inference without an Azure machine learning pipeline is not supported yet). ParallelRunStep has all the following parameters:
**ParallelRunStep** is a newly added step in the azureml.pipeline.steps package. You will use it to add a step to create a batch inference job with your Azure machine learning pipeline. (Use batch inference without an Azure machine learning pipeline is not supported yet). ParallelRunStep has all the following parameters:
- **name**: this name will be used to register batch inference service, has the following naming restrictions: (unique, 3-32 chars and regex ^\[a-z\]([-a-z0-9]*[a-z0-9])?$)
- **models**: zero or more model names already registered in Azure Machine Learning model registry.
- **parallel_run_config**: ParallelRunConfig as defined above.
- **inputs**: one or more Dataset objects.
- **output**: this should be a PipelineData object encapsulating an Azure BLOB container path.

View File

@@ -23,11 +23,6 @@
"\n",
"In this notebook, we will demonstrate how to make predictions on large quantities of data asynchronously using the ML pipelines with Azure Machine Learning. Batch inference (or batch scoring) provides cost-effective inference, with unparalleled throughput for asynchronous applications. Batch prediction pipelines can scale to perform inference on terabytes of production data. Batch prediction is optimized for high throughput, fire-and-forget predictions for a large collection of data.\n",
"\n",
"> **Note**\n",
"This notebook uses public preview functionality (ParallelRunStep). Please install azureml-contrib-pipeline-steps package before running this notebook. Pandas is used to display job results.\n",
"```\n",
"pip install azureml-contrib-pipeline-steps pandas\n",
"```\n",
"> **Tip**\n",
"If your system requires low-latency processing (to process a single document or small set of documents quickly), use [real-time scoring](https://docs.microsoft.com/en-us/azure/machine-learning/service/how-to-consume-web-service) instead of batch prediction.\n",
"\n",
@@ -86,7 +81,6 @@
"source": [
"import os\n",
"from azureml.core.compute import AmlCompute, ComputeTarget\n",
"from azureml.core.compute_target import ComputeTargetException\n",
"\n",
"# choose a name for your cluster\n",
"compute_name = os.environ.get(\"AML_COMPUTE_CLUSTER_NAME\", \"cpu-cluster\")\n",
@@ -184,9 +178,20 @@
"mnist_ds_name = 'mnist_sample_data'\n",
"\n",
"path_on_datastore = mnist_data.path('mnist')\n",
"input_mnist_ds = Dataset.File.from_files(path=path_on_datastore, validate=False)\n",
"registered_mnist_ds = input_mnist_ds.register(ws, mnist_ds_name, create_new_version=True)\n",
"named_mnist_ds = registered_mnist_ds.as_named_input(mnist_ds_name)"
"input_mnist_ds = Dataset.File.from_files(path=path_on_datastore, validate=False)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.data.dataset_consumption_config import DatasetConsumptionConfig\n",
"from azureml.pipeline.core import PipelineParameter\n",
"\n",
"pipeline_param = PipelineParameter(name=\"mnist_param\", default_value=input_mnist_ds)\n",
"input_mnist_ds_consumption = DatasetConsumptionConfig(\"minist_param_config\", pipeline_param).as_mount()"
]
},
{
@@ -306,8 +311,6 @@
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"\n",
"scripts_folder = \"Code\"\n",
"script_file = \"digit_identification.py\"\n",
"\n",
@@ -341,8 +344,8 @@
"from azureml.core import Environment\n",
"from azureml.core.runconfig import CondaDependencies, DEFAULT_CPU_IMAGE\n",
"\n",
"batch_conda_deps = CondaDependencies.create(pip_packages=[\"tensorflow==1.15.2\", \"pillow\"])\n",
"\n",
"batch_conda_deps = CondaDependencies.create(pip_packages=[\"tensorflow==1.15.2\", \"pillow\", \n",
" \"azureml-core\", \"azureml-dataprep[fuse]\"])\n",
"batch_env = Environment(name=\"batch_environment\")\n",
"batch_env.python.conda_dependencies = batch_conda_deps\n",
"batch_env.docker.enabled = True\n",
@@ -362,17 +365,21 @@
"metadata": {},
"outputs": [],
"source": [
"from azureml.contrib.pipeline.steps import ParallelRunStep, ParallelRunConfig\n",
"from azureml.pipeline.core import PipelineParameter\n",
"from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig\n",
"\n",
"parallel_run_config = ParallelRunConfig(\n",
" source_directory=scripts_folder,\n",
" entry_script=script_file,\n",
" mini_batch_size=\"5\",\n",
" mini_batch_size=PipelineParameter(name=\"batch_size_param\", default_value=\"5\"),\n",
" error_threshold=10,\n",
" output_action=\"append_row\",\n",
" append_row_file_name=\"mnist_outputs.txt\",\n",
" environment=batch_env,\n",
" compute_target=compute_target,\n",
" node_count=2)"
" process_count_per_node=PipelineParameter(name=\"process_count_param\", default_value=2),\n",
" node_count=2\n",
")"
]
},
{
@@ -392,10 +399,8 @@
"parallelrun_step = ParallelRunStep(\n",
" name=\"predict-digits-mnist\",\n",
" parallel_run_config=parallel_run_config,\n",
" inputs=[ named_mnist_ds ],\n",
" inputs=[ input_mnist_ds_consumption ],\n",
" output=output_dir,\n",
" models=[ model ],\n",
" arguments=[ ],\n",
" allow_reuse=True\n",
")"
]
@@ -454,6 +459,47 @@
"pipeline_run.wait_for_completion(show_output=True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Resubmit a with different dataset\n",
"Since we made the input a `PipelineParameter`, we can resubmit with a different dataset without having to create an entirely new experiment. We'll use the same datastore but use only a single image."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"path_on_datastore = mnist_data.path('mnist/0.png')\n",
"single_image_ds = Dataset.File.from_files(path=path_on_datastore, validate=False)\n",
"single_image_ds._ensure_saved(ws)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"pipeline_run_2 = experiment.submit(pipeline, \n",
" pipeline_parameters={\"mnist_param\": single_image_ds, \n",
" \"batch_size_param\": \"1\",\n",
" \"process_count_param\": 1}\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"pipeline_run_2.wait_for_completion(show_output=True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
@@ -480,7 +526,7 @@
"\n",
"for root, dirs, files in os.walk(\"mnist_results\"):\n",
" for file in files:\n",
" if file.endswith('parallel_run_step.txt'):\n",
" if file.endswith('mnist_outputs.txt'):\n",
" result_file = os.path.join(root,file)\n",
"\n",
"df = pd.read_csv(result_file, delimiter=\":\", header=None)\n",

View File

@@ -2,6 +2,6 @@ name: file-dataset-image-inference-mnist
dependencies:
- pip:
- azureml-sdk
- azureml-contrib-pipeline-steps
- azureml-pipeline-steps
- azureml-widgets
- pandas

View File

@@ -23,11 +23,6 @@
"\n",
"In this notebook, we will demonstrate how to make predictions on large quantities of data asynchronously using the ML pipelines with Azure Machine Learning. Batch inference (or batch scoring) provides cost-effective inference, with unparalleled throughput for asynchronous applications. Batch prediction pipelines can scale to perform inference on terabytes of production data. Batch prediction is optimized for high throughput, fire-and-forget predictions for a large collection of data.\n",
"\n",
"> **Note**\n",
"This notebook uses public preview functionality (ParallelRunStep). Please install azureml-contrib-pipeline-steps package before running this notebook. Pandas is used to display job results.\n",
"```\n",
"pip install azureml-contrib-pipeline-steps pandas\n",
"```\n",
"> **Tip**\n",
"If your system requires low-latency processing (to process a single document or small set of documents quickly), use [real-time scoring](https://docs.microsoft.com/en-us/azure/machine-learning/service/how-to-consume-web-service) instead of batch prediction.\n",
"\n",
@@ -84,7 +79,6 @@
"source": [
"import os\n",
"from azureml.core.compute import AmlCompute, ComputeTarget\n",
"from azureml.core.compute_target import ComputeTargetException\n",
"\n",
"# choose a name for your cluster\n",
"compute_name = os.environ.get(\"AML_COMPUTE_CLUSTER_NAME\", \"cpu-cluster\")\n",
@@ -304,7 +298,8 @@
"from azureml.core import Environment\n",
"from azureml.core.runconfig import CondaDependencies\n",
"\n",
"predict_conda_deps = CondaDependencies.create(pip_packages=[ \"scikit-learn==0.20.3\" ])\n",
"predict_conda_deps = CondaDependencies.create(pip_packages=[\"scikit-learn==0.20.3\",\n",
" \"azureml-core\", \"azureml-dataprep[pandas,fuse]\"])\n",
"\n",
"predict_env = Environment(name=\"predict_environment\")\n",
"predict_env.python.conda_dependencies = predict_conda_deps\n",
@@ -325,19 +320,21 @@
"metadata": {},
"outputs": [],
"source": [
"from azureml.contrib.pipeline.steps import ParallelRunStep, ParallelRunConfig\n",
"from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig\n",
"\n",
"# In a real-world scenario, you'll want to shape your process per node and nodes to fit your problem domain.\n",
"parallel_run_config = ParallelRunConfig(\n",
" source_directory=scripts_folder,\n",
" entry_script=script_file, # the user script to run against each input\n",
" mini_batch_size='5MB',\n",
" error_threshold=5,\n",
" output_action='append_row',\n",
" environment=predict_env,\n",
" compute_target=compute_target, \n",
" node_count=3,\n",
" run_invocation_timeout=600)"
" source_directory=scripts_folder,\n",
" entry_script=script_file, # the user script to run against each input\n",
" mini_batch_size='5MB',\n",
" error_threshold=5,\n",
" output_action='append_row',\n",
" append_row_file_name=\"iris_outputs.txt\",\n",
" environment=predict_env,\n",
" compute_target=compute_target, \n",
" node_count=3,\n",
" run_invocation_timeout=600\n",
")"
]
},
{
@@ -359,7 +356,6 @@
" inputs=[named_iris_ds],\n",
" output=output_folder,\n",
" parallel_run_config=parallel_run_config,\n",
" models=[model],\n",
" arguments=['--model_name', 'iris'],\n",
" allow_reuse=True\n",
")"
@@ -453,7 +449,7 @@
"\n",
"for root, dirs, files in os.walk(\"iris_results\"):\n",
" for file in files:\n",
" if file.endswith('parallel_run_step.txt'):\n",
" if file.endswith('iris_outputs.txt'):\n",
" result_file = os.path.join(root,file)\n",
"\n",
"# cleanup output format\n",

View File

@@ -2,6 +2,6 @@ name: tabular-dataset-inference-iris
dependencies:
- pip:
- azureml-sdk
- azureml-contrib-pipeline-steps
- azureml-pipeline-steps
- azureml-widgets
- pandas

View File

@@ -26,11 +26,8 @@
"2. Run neural style on each image using one of the provided models (from `pytorch` pretrained models for this example).\n",
"3. Stitch the image back into a video.\n",
"\n",
"> **Note**\n",
"This notebook uses public preview functionality (ParallelRunStep). Please install azureml-contrib-pipeline-steps package before running this notebook.\n",
"```\n",
"pip install azureml-contrib-pipeline-steps\n",
"```"
"> **Tip**\n",
"If your system requires low-latency processing (to process a single document or small set of documents quickly), use [real-time scoring](https://docs.microsoft.com/en-us/azure/machine-learning/service/how-to-consume-web-service) instead of batch prediction."
]
},
{
@@ -356,7 +353,9 @@
"source": [
"from azureml.pipeline.core.graph import PipelineParameter\n",
"# create a parameter for style (one of \"candy\", \"mosaic\") to transfer the images to\n",
"style_param = PipelineParameter(name=\"style\", default_value=\"mosaic\")"
"style_param = PipelineParameter(name=\"style\", default_value=\"mosaic\")\n",
"# create a parameter for the number of nodes to use in step no. 2 (style transfer)\n",
"nodecount_param = PipelineParameter(name=\"nodecount\", default_value=2)"
]
},
{
@@ -415,6 +414,8 @@
"parallel_cd.add_conda_package(\"pytorch\")\n",
"parallel_cd.add_conda_package(\"torchvision\")\n",
"parallel_cd.add_conda_package(\"pillow<7\") # needed for torchvision==0.4.0\n",
"parallel_cd.add_pip_package(\"azureml-core\")\n",
"parallel_cd.add_pip_package(\"azureml-dataprep[fuse]\")\n",
"\n",
"styleenvironment = Environment(name=\"styleenvironment\")\n",
"styleenvironment.python.conda_dependencies=parallel_cd\n",
@@ -427,17 +428,20 @@
"metadata": {},
"outputs": [],
"source": [
"from azureml.contrib.pipeline.steps import ParallelRunConfig\n",
"from azureml.pipeline.core import PipelineParameter\n",
"from azureml.pipeline.steps import ParallelRunConfig\n",
"\n",
"parallel_run_config = ParallelRunConfig(\n",
" environment=styleenvironment,\n",
" entry_script='transform.py',\n",
" output_action='summary_only',\n",
" mini_batch_size=\"1\",\n",
" error_threshold=1,\n",
" source_directory=scripts_folder,\n",
" compute_target=gpu_cluster, \n",
" node_count=3)"
" environment=styleenvironment,\n",
" entry_script='transform.py',\n",
" output_action='summary_only',\n",
" mini_batch_size=\"1\",\n",
" error_threshold=1,\n",
" source_directory=scripts_folder,\n",
" compute_target=gpu_cluster, \n",
" node_count=nodecount_param,\n",
" process_count_per_node=2\n",
")"
]
},
{
@@ -446,7 +450,7 @@
"metadata": {},
"outputs": [],
"source": [
"from azureml.contrib.pipeline.steps import ParallelRunStep\n",
"from azureml.pipeline.steps import ParallelRunStep\n",
"from datetime import datetime\n",
"\n",
"parallel_step_name = 'styletransfer-' + datetime.now().strftime('%Y%m%d%H%M')\n",
@@ -455,9 +459,6 @@
" name=parallel_step_name,\n",
" inputs=[ffmpeg_images_file_dataset], # Input file share/blob container/file dataset\n",
" output=processed_images, # Output file share/blob container\n",
" models=[mosaic_model, candy_model],\n",
" tags = {'scenario': \"batch inference\", 'type': \"demo\"},\n",
" properties = {'area': \"style transfer\"},\n",
" arguments=[\"--style\", style_param],\n",
" parallel_run_config=parallel_run_config,\n",
" allow_reuse=True #[optional - default value True]\n",
@@ -666,7 +667,8 @@
"response = requests.post(rest_endpoint, \n",
" headers=aad_token,\n",
" json={\"ExperimentName\": experiment_name,\n",
" \"ParameterAssignments\": {\"style\": \"candy\", \"aml_node_count\": 2}})\n",
" \"ParameterAssignments\": {\"style\": \"candy\", \"NodeCount\": 3}})\n",
"\n",
"run_id = response.json()[\"Id\"]\n",
"\n",
"from azureml.pipeline.core.run import PipelineRun\n",

View File

@@ -2,7 +2,6 @@ name: pipeline-style-transfer
dependencies:
- pip:
- azureml-sdk
- azureml-contrib-pipeline-steps
- azureml-pipeline-steps
- azureml-widgets
- requests

View File

@@ -20,11 +20,11 @@ Using these samples, you will be able to do the following.
| File/folder | Description |
|-------------------|--------------------------------------------|
| [README.md](README.md) | This README file. |
| [devenv_setup.ipynb](setup/devenv_setup.ipynb) | Notebook to setup development environment for Azure ML RL |
| [cartpole_ci.ipynb](cartpole-on-compute-instance/cartpole_ci.ipynb) | Notebook to train a Cartpole playing agent on an Azure ML Compute Instance |
| [cartpole_cc.ipynb](cartpole-on-single-compute/cartpole_cc.ipynb) | Notebook to train a Cartpole playing agent on an Azure ML Compute Cluster (single node) |
| [cartpole_sc.ipynb](cartpole-on-single-compute/cartpole_sc.ipynb) | Notebook to train a Cartpole playing agent on an Azure ML Compute Cluster (single node) |
| [pong_rllib.ipynb](atari-on-distributed-compute/pong_rllib.ipynb) | Notebook to train Pong agent using RLlib on multiple compute targets |
| [minecraft.ipynb](minecraft-on-distributed-compute/minecraft.ipynb) | Notebook to train an agent to navigate through a lava maze in the Minecraft game |
## Prerequisites
@@ -111,7 +111,7 @@ contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additio
For more on SDK concepts, please refer to [notebooks](https://github.com/Azure/MachineLearningNotebooks).
**Please let us know your feedback.**
**Please let us know your [feedback](https://github.com/Azure/MachineLearningNotebooks/labels/Reinforcement%20Learning).**

View File

@@ -23,17 +23,18 @@ if __name__ == "__main__":
ray.init(address=args.ray_address)
tune.run(run_or_experiment=args.run,
config={
"env": args.env,
"num_gpus": args.config["num_gpus"],
"num_workers": args.config["num_workers"],
"callbacks": {"on_train_result": callbacks.on_train_result},
"sample_batch_size": 50,
"train_batch_size": 1000,
"num_sgd_iter": 2,
"num_data_loader_buffers": 2,
"model": {"dim": 42},
},
stop=args.stop,
local_dir='./logs')
tune.run(
run_or_experiment=args.run,
config={
"env": args.env,
"num_gpus": args.config["num_gpus"],
"num_workers": args.config["num_workers"],
"callbacks": {"on_train_result": callbacks.on_train_result},
"sample_batch_size": 50,
"train_batch_size": 1000,
"num_sgd_iter": 2,
"num_data_loader_buffers": 2,
"model": {"dim": 42},
},
stop=args.stop,
local_dir='./logs')

View File

@@ -20,8 +20,8 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"# Azure ML Reinforcement Learning Sample - Pong problem\n",
"Azure ML Reinforcement Learning (Azure ML RL) is a managed service for running distributed RL (reinforcement learning) simulation and training using the Ray framework.\n",
"# Reinforcement Learning in Azure Machine Learning - Pong problem\n",
"Reinforcement Learning in Azure Machine Learning is a managed service for running distributed reinforcement learning training and simulation using the open source Ray framework.\n",
"This example uses Ray RLlib to train a Pong playing agent on a multi-node cluster.\n",
"\n",
"## Pong problem\n",
@@ -48,7 +48,7 @@
"source": [
"The goal here is to train an agent to win an episode of Pong game against opponent with the score of at least 18 points. An episode in Pong runs until one of the players reaches a score of 21. Episodes are a terminology that is used across all the [OpenAI gym](https://gym.openai.com/envs/Pong-v0/) environments that contains a strictly defined task.\n",
"\n",
"Training a Pong agent is a CPU intensive task and this example demonstrates the use of Azure ML RL service to train an agent faster in a distributed, parallel environment. You'll learn more about using the head and the worker compute targets to train an agent in this notebook below."
"Training a Pong agent is a compute-intensive task and this example demonstrates the use of Reinforcement Learning in Azure Machine Learning service to train an agent faster in a distributed, parallel environment. You'll learn more about using the head and the worker compute targets to train an agent in this notebook below."
]
},
{
@@ -57,7 +57,7 @@
"source": [
"## Prerequisite\n",
"\n",
"The user should have completed the [Azure ML Reinforcement Learning Sample - Setting Up Development Environment](../setup/devenv_setup.ipynb) to setup a virtual network. This virtual network will be used here for head and worker compute targets. It is highly recommended that the user should go through the [Azure ML Reinforcement Learning Sample - Cartpole Problem](../cartpole-on-single-compute/cartpole_cc.ipynb) to understand the basics of Azure ML RL and Ray RLlib used in this notebook."
"The user should have completed the [Reinforcement Learning in Azure Machine Learning - Setting Up Development Environment](../setup/devenv_setup.ipynb) to setup a virtual network. This virtual network will be used here for head and worker compute targets. It is highly recommended that the user should go through the [Reinforcement Learning in Azure Machine Learning - Cartpole Problem on Single Compute](../cartpole-on-single-compute/cartpole_sc.ipynb) to understand the basics of Reinforcement Learning in Azure Machine Learning and Ray RLlib used in this notebook."
]
},
{
@@ -69,7 +69,7 @@
"\n",
"* Connecting to a workspace to enable communication between your local machine and remote resources\n",
"* Creating an experiment to track all your runs\n",
"* Creating a remote head and worker compute target on a vnet to use for training"
"* Creating remote head and worker compute target on a virtual network to use for training"
]
},
{
@@ -88,19 +88,19 @@
"source": [
"%matplotlib inline\n",
"\n",
"# Azure ML core imports\n",
"# Azure Machine Learning core imports\n",
"import azureml.core\n",
"\n",
"# Check core SDK version number\n",
"print(\"Azure ML SDK Version: \", azureml.core.VERSION)"
"print(\"Azure Machine Learning SDK Version: \", azureml.core.VERSION)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Get Azure ML workspace\n",
"Get a reference to an existing Azure ML workspace."
"### Get Azure Machine Learning workspace\n",
"Get a reference to an existing Azure Machine Learning workspace."
]
},
{
@@ -119,7 +119,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"### Create Azure ML experiment\n",
"### Create Azure Machine Learning experiment\n",
"Create an experiment to track the runs in your workspace."
]
},
@@ -140,9 +140,9 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"### Specify the name of your vnet\n",
"### Specify the name of your virtual network\n",
"\n",
"The resource group you use must contain a vnet. Specify the name of the vnet here created in the [Azure ML Reinforcement Learning Sample - Setting Up Development Environment](../setup/devenv_setup.ipynb)."
"The resource group you use must contain a virtual network. Specify the name of the virtual network here created in the [Azure Machine Learning Reinforcement Learning Sample - Setting Up Development Environment](../setup/devenv_setup.ipynb)."
]
},
{
@@ -159,9 +159,9 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"### Create head computing cluster\n",
"### Create head compute target\n",
"\n",
"In this example, we show how to set up separate compute clusters for the Ray head and Ray worker nodes. First we define the head cluster with GPU for the Ray head node. One CPU of the head node will be used for the Ray head process and the rest of the CPUs will be used by the Ray worker processes."
"In this example, we show how to set up separate compute targets for the Ray head and Ray worker nodes. First we define the head cluster with GPU for the Ray head node. One CPU of the head node will be used for the Ray head process and the rest of the CPUs will be used by the Ray worker processes."
]
},
{
@@ -186,15 +186,17 @@
" if head_compute_target.provisioning_state == 'Succeeded':\n",
" print('found head compute target. just use it', head_compute_name)\n",
" else: \n",
" raise Exception('found head compute target but it is in state', head_compute_target.provisioning_state)\n",
" raise Exception(\n",
" 'found head compute target but it is in state', head_compute_target.provisioning_state)\n",
"else:\n",
" print('creating a new head compute target...')\n",
" provisioning_config = AmlCompute.provisioning_configuration(vm_size=head_vm_size,\n",
" min_nodes=head_compute_min_nodes, \n",
" max_nodes=head_compute_max_nodes,\n",
" vnet_resourcegroup_name=ws.resource_group,\n",
" vnet_name=vnet_name,\n",
" subnet_name='default')\n",
" provisioning_config = AmlCompute.provisioning_configuration(\n",
" vm_size=head_vm_size,\n",
" min_nodes=head_compute_min_nodes, \n",
" max_nodes=head_compute_max_nodes,\n",
" vnet_resourcegroup_name=ws.resource_group,\n",
" vnet_name=vnet_name,\n",
" subnet_name='default')\n",
"\n",
" # Create the cluster\n",
" head_compute_target = ComputeTarget.create(ws, head_compute_name, provisioning_config)\n",
@@ -203,7 +205,7 @@
" # If no min node count is provided it will use the scale settings for the cluster\n",
" head_compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)\n",
" \n",
" # For a more detailed view of current AmlCompute status, use get_status()\n",
" # For a more detailed view of current AmlCompute status, use get_status()\n",
" print(head_compute_target.get_status().serialize())"
]
},
@@ -211,9 +213,9 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"### Create worker computing cluster\n",
"### Create worker compute target\n",
"\n",
"Now we create a compute cluster with CPUs for the additional Ray worker nodes. CPUs in these worker nodes are used by Ray worker processes. Each Ray worker node may have multiple Ray worker processes depending on CPUs on the worker node. Ray can distribute multiple worker tasks on each worker node."
"Now we create a compute target with CPUs for the additional Ray worker nodes. CPUs in these worker nodes are used by Ray worker processes. Each Ray worker node, depending on the CPUs on the node, may have multiple Ray worker processes. There can be multiple worker tasks on each worker process (core)."
]
},
{
@@ -222,7 +224,7 @@
"metadata": {},
"outputs": [],
"source": [
"# Choose a name for your Ray worker cluster\n",
"# Choose a name for your Ray worker compute target\n",
"worker_compute_name = 'worker-cpu'\n",
"worker_compute_min_nodes = 0 \n",
"worker_compute_max_nodes = 4\n",
@@ -237,24 +239,26 @@
" if worker_compute_target.provisioning_state == 'Succeeded':\n",
" print('found worker compute target. just use it', worker_compute_name)\n",
" else: \n",
" raise Exception('found worker compute target but it is in state', head_compute_target.provisioning_state)\n",
" raise Exception(\n",
" 'found worker compute target but it is in state', head_compute_target.provisioning_state)\n",
"else:\n",
" print('creating a new worker compute target...')\n",
" provisioning_config = AmlCompute.provisioning_configuration(vm_size=worker_vm_size,\n",
" min_nodes=worker_compute_min_nodes, \n",
" max_nodes=worker_compute_max_nodes,\n",
" vnet_resourcegroup_name=ws.resource_group,\n",
" vnet_name=vnet_name,\n",
" subnet_name='default')\n",
" provisioning_config = AmlCompute.provisioning_configuration(\n",
" vm_size=worker_vm_size,\n",
" min_nodes=worker_compute_min_nodes,\n",
" max_nodes=worker_compute_max_nodes,\n",
" vnet_resourcegroup_name=ws.resource_group,\n",
" vnet_name=vnet_name,\n",
" subnet_name='default')\n",
"\n",
" # Create the cluster\n",
" # Create the compute target\n",
" worker_compute_target = ComputeTarget.create(ws, worker_compute_name, provisioning_config)\n",
" \n",
" # Can poll for a minimum number of nodes and for a specific timeout. \n",
" # If no min node count is provided it will use the scale settings for the cluster\n",
" worker_compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)\n",
" \n",
" # For a more detailed view of current AmlCompute status, use get_status()\n",
" # For a more detailed view of current AmlCompute status, use get_status()\n",
" print(worker_compute_target.get_status().serialize())"
]
},
@@ -262,12 +266,12 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"## Train Pong Agent Using Azure ML RL\n",
"To facilitate reinforcement learning, Azure Machine Learning Python SDK provides a high level abstraction, the _ReinforcementLearningEstimator_ class, which allows users to easily construct RL run configurations for the underlying RL framework. Azure ML RL initially supports the [Ray framework](https://ray.io/) and its highly customizable [RLLib](https://ray.readthedocs.io/en/latest/rllib.html#rllib-scalable-reinforcement-learning). In this section we show how to use _ReinforcementLearningEstimator_ and Ray/RLLib framework to train a Pong playing agent.\n",
"## Train Pong Agent\n",
"To facilitate reinforcement learning, Azure Machine Learning Python SDK provides a high level abstraction, the _ReinforcementLearningEstimator_ class, which allows users to easily construct reinforcement learning run configurations for the underlying reinforcement learning framework. Reinforcement Learning in Azure Machine Learning supports the open source [Ray framework](https://ray.io/) and its highly customizable [RLLib](https://ray.readthedocs.io/en/latest/rllib.html#rllib-scalable-reinforcement-learning). In this section we show how to use _ReinforcementLearningEstimator_ and Ray/RLLib framework to train a Pong playing agent.\n",
"\n",
"\n",
"### Define worker configuration\n",
"Define a `WorkerConfiguration` using your worker compute target. We also specify the number of nodes in the worker compute target to be used for training and additional PIP packages to install on those nodes as a part of setup.\n",
"Define a `WorkerConfiguration` using your worker compute target. We specify the number of nodes in the worker compute target to be used for training and additional PIP packages to install on those nodes as a part of setup.\n",
"In this case, we define the PIP packages as dependencies for both head and worker nodes. With this setup, the game simulations will run directly on the worker compute nodes."
]
},
@@ -285,7 +289,7 @@
"# Specify the Ray worker configuration\n",
"worker_conf = WorkerConfiguration(\n",
" \n",
" # Azure ML compute cluster to run Ray workers\n",
" # Azure Machine Learning compute target to run Ray workers\n",
" compute_target=worker_compute_target, \n",
" \n",
" # Number of worker nodes\n",
@@ -305,7 +309,7 @@
"source": [
"### Create reinforcement learning estimator\n",
"\n",
"The `ReinforcementLearningEstimator` is used to submit a job to Azure Machine Learning to start the Ray experiment run. We define the training script parameters here that will be passed to estimator. \n",
"The `ReinforcementLearningEstimator` is used to submit a job to Azure Machine Learning to start the Ray experiment run. We define the training script parameters here that will be passed to the estimator. \n",
"\n",
"We specify `episode_reward_mean` to 18 as we want to stop the training as soon as the trained agent reaches an average win margin of at least 18 point over opponent over all episodes in the training epoch.\n",
"Number of Ray worker processes are defined by parameter `num_workers`. We set it to 13 as we have 13 CPUs available in our compute targets. Multiple Ray worker processes parallelizes agent training and helps in achieving our goal faster. \n",
@@ -348,7 +352,7 @@
" \"--stop\": '\\'{\"episode_reward_mean\": 18, \"time_total_s\": 3600}\\'',\n",
"}\n",
"\n",
"# RL estimator\n",
"# Reinforcement learning estimator\n",
"rl_estimator = ReinforcementLearningEstimator(\n",
" \n",
" # Location of source files\n",
@@ -361,7 +365,7 @@
" # Defined above.\n",
" script_params=script_params,\n",
" \n",
" # The Azure ML compute target set up for Ray head nodes\n",
" # The Azure Machine Learning compute target set up for Ray head nodes\n",
" compute_target=head_compute_target,\n",
" \n",
" # Pip packages\n",
@@ -370,7 +374,7 @@
" # GPU usage\n",
" use_gpu=True,\n",
" \n",
" # RL framework. Currently must be Ray.\n",
" # Reinforcement learning framework. Currently must be Ray.\n",
" rl_framework=Ray(),\n",
" \n",
" # Ray worker configuration defined above.\n",
@@ -394,23 +398,24 @@
"metadata": {},
"source": [
"### Training script\n",
"As recommended in [RLLib](https://ray.readthedocs.io/en/latest/rllib.html) documentations, we use Ray [Tune](https://ray.readthedocs.io/en/latest/tune.html) API to run training algorithm. All the RLLib built-in trainers are compatible with the Tune API. Here we use tune.run() to execute a built-in training algorithm. For convenience, down below you can see part of the entry script where we make this call.\n",
"As recommended in [RLlib](https://ray.readthedocs.io/en/latest/rllib.html) documentations, we use Ray [Tune](https://ray.readthedocs.io/en/latest/tune.html) API to run the training algorithm. All the RLlib built-in trainers are compatible with the Tune API. Here we use tune.run() to execute a built-in training algorithm. For convenience, down below you can see part of the entry script where we make this call.\n",
"\n",
"```python\n",
" tune.run(run_or_experiment=args.run,\n",
" config={\n",
" \"env\": args.env,\n",
" \"num_gpus\": args.config[\"num_gpus\"],\n",
" \"num_workers\": args.config[\"num_workers\"],\n",
" \"callbacks\": {\"on_train_result\": callbacks.on_train_result},\n",
" \"sample_batch_size\": 50,\n",
" \"train_batch_size\": 1000,\n",
" \"num_sgd_iter\": 2,\n",
" \"num_data_loader_buffers\": 2,\n",
" \"model\": {\"dim\": 42},\n",
" },\n",
" stop=args.stop,\n",
" local_dir='./logs')\n",
" tune.run(\n",
" run_or_experiment=args.run,\n",
" config={\n",
" \"env\": args.env,\n",
" \"num_gpus\": args.config[\"num_gpus\"],\n",
" \"num_workers\": args.config[\"num_workers\"],\n",
" \"callbacks\": {\"on_train_result\": callbacks.on_train_result},\n",
" \"sample_batch_size\": 50,\n",
" \"train_batch_size\": 1000,\n",
" \"num_sgd_iter\": 2,\n",
" \"num_data_loader_buffers\": 2,\n",
" \"model\": {\"dim\": 42},\n",
" },\n",
" stop=args.stop,\n",
" local_dir='./logs')\n",
"```"
]
},
@@ -437,7 +442,7 @@
"source": [
"### Monitor the run\n",
"\n",
"Azure ML provides a Jupyter widget to show the real-time status of an experiment run. You could use this widget to monitor the status of runs. The widget shows the list of two child runs, one for head compute target run and one for worker compute target run, as well. You can click on the link under Status to see the details of the child run."
"Azure Machine Learning provides a Jupyter widget to show the status of an experiment run. You could use this widget to monitor the status of the runs. The widget shows the list of two child runs, one for head compute target run and one for worker compute target run. You can click on the link under **Status** to see the details of the child run. It will also show the metrics being logged."
]
},
{
@@ -455,9 +460,29 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"### Stop the run\n",
"\n",
"To stop the run, call `run.cancel()`."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Uncomment line below to cancel the run\n",
"# run.cancel()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Wait for completion\n",
"Wait for the run to complete before proceeding. If you want to stop the run, you may skip this and move to next section below. \n",
"\n",
"**Note: the run may take anywhere from 30 minutes to 45 minutes to complete.**"
"**Note: The run may take anywhere from 30 minutes to 45 minutes to complete.**"
]
},
{
@@ -469,24 +494,6 @@
"run.wait_for_completion()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Stop the run\n",
"\n",
"To cancel the run, call run.cancel()."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# run.cancel()"
]
},
{
"cell_type": "markdown",
"metadata": {},
@@ -539,8 +546,8 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"We observe that during the training over multiple episodes, the agent learn to win the Pong game against opponent with our target of 18 points in each episode of 21 points.\n",
"**Congratulations!! You have trained your Pong agent to win a game marvelously.**"
"We observe that during the training over multiple episodes, the agent learns to win the Pong game against opponent with our target of 18 points in each episode of 21 points.\n",
"**Congratulations!! You have trained your Pong agent to win a game.**"
]
},
{
@@ -570,7 +577,7 @@
"metadata": {},
"source": [
"## Next\n",
"In this example, you learnt how to solve distributed RL training problems using head and worker compute targets. This is currently the last introductory tutorial for Azure Machine Learning service's Reinforcement Learning offering. We would love to hear your feedback to build the features you need!"
"In this example, you learned how to solve distributed reinforcement learning training problems using head and worker compute targets. This was an introductory tutorial on Reinforement Learning in Azure Machine Learning service offering. We would love to hear your feedback to build the features you need!"
]
}
],
@@ -595,7 +602,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.4"
"version": "3.6.9"
},
"notice": "Copyright (c) Microsoft Corporation. All rights reserved.\u00e2\u20ac\u00afLicensed under the MIT License.\u00e2\u20ac\u00af "
},

View File

@@ -20,11 +20,11 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"# Azure ML Reinforcement Learning Sample - Cartpole Problem on Compute Instance\n",
"# Reinforcement Learning in Azure Machine Learning - Cartpole Problem on Compute Instance\n",
"\n",
"Azure ML Reinforcement Learning (Azure ML RL) is a managed service for running reinforcement learning training and simulation. With Azure MLRL, data scientists can start developing RL systems on one machine, and scale to compute clusters with 100\u00e2\u20ac\u2122s of nodes if needed.\n",
"Reinforcement Learning in Azure Machine Learning is a managed service for running reinforcement learning training and simulation. With Reinforcement Learning in Azure Machine Learning, data scientists can start developing reinforcement learning systems on one machine, and scale to compute targets with 100\u00e2\u20ac\u2122s of nodes if needed.\n",
"\n",
"This example shows how to use Azure ML RL to train a Cartpole playing agent on a compute instance."
"This example shows how to use Reinforcement Learning in Azure Machine Learning to train a Cartpole playing agent on a compute instance."
]
},
{
@@ -56,7 +56,7 @@
"metadata": {},
"source": [
"### Prerequisite\n",
"The user should have completed the Azure Machine Learning Tutorial: [Get started creating your first ML experiment with the Python SDK](https://docs.microsoft.com/en-us/azure/machine-learning/tutorial-1st-experiment-sdk-setup). You will need to make sure that you have a valid subscription id, a resource group and a workspace. All datastores and datasets you use should be associated with your workspace."
"The user should have completed the Azure Machine Learning Tutorial: [Get started creating your first ML experiment with the Python SDK](https://docs.microsoft.com/en-us/azure/machine-learning/tutorial-1st-experiment-sdk-setup). You will need to make sure that you have a valid subscription ID, a resource group, and an Azure Machine Learning workspace. All datastores and datasets you use should be associated with your workspace."
]
},
{
@@ -75,8 +75,8 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"### Azure ML SDK \n",
"Display the Azure ML SDK version."
"### Azure Machine Learning SDK \n",
"Display the Azure Machine Learning SDK version."
]
},
{
@@ -86,15 +86,15 @@
"outputs": [],
"source": [
"import azureml.core\n",
"print(\"Azure ML SDK Version: \", azureml.core.VERSION)"
"print(\"Azure Machine Learning SDK Version: \", azureml.core.VERSION)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Get Azure ML workspace\n",
"Get a reference to an existing Azure ML workspace."
"### Get Azure Machine Learning workspace\n",
"Get a reference to an existing Azure Machine Learning workspace."
]
},
{
@@ -163,18 +163,22 @@
"source": [
"# Load current compute instance info\n",
"current_compute_instance = load_nbvm()\n",
"print(\"Current compute instance:\", current_compute_instance)\n",
"\n",
"# For this demo, let's use the current compute instance as the compute target, if available\n",
"if current_compute_instance:\n",
" print(\"Current compute instance:\", current_compute_instance)\n",
" instance_name = current_compute_instance['instance']\n",
"else:\n",
" instance_name = next(iter(ws.compute_targets))\n",
" print(\"Instance name:\", instance_name)\n",
"\n",
"compute_target = ws.compute_targets[instance_name]\n",
"\n",
"print(\"Compute target status:\")\n",
"print(compute_target.get_status().serialize())\n",
"try:\n",
" print(compute_target.get_status().serialize())\n",
"except:\n",
" print(compute_target.get_status())\n",
"\n",
"print(\"Compute target size:\")\n",
"print(compute_target.size(ws))"
@@ -184,7 +188,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"### Create Azure ML experiment\n",
"### Create Azure Machine Learning experiment\n",
"Create an experiment to track the runs in your workspace. "
]
},
@@ -204,8 +208,8 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"## Train Cartpole Agent Using Azure ML RL\n",
"To facilitate reinforcement learning, Azure Machine Learning Python SDK provides a high level abstraction, the _ReinforcementLearningEstimator_ class, which allows users to easily construct RL run configurations for the underlying RL framework. Azure ML RL initially supports the [Ray framework](https://ray.io/) and its highly customizable [RLlib](https://ray.readthedocs.io/en/latest/rllib.html#rllib-scalable-reinforcement-learning). In this section we show how to use _ReinforcementLearningEstimator_ and Ray/RLlib framework to train a cartpole playing agent. "
"## Train Cartpole Agent\n",
"To facilitate reinforcement learning, Azure Machine Learning Python SDK provides a high level abstraction, the _ReinforcementLearningEstimator_ class, which allows users to easily construct reinforcement learning run configurations for the underlying reinforcement learning framework. Reinforcement Learning in Azure Machine Learning supports the open source [Ray framework](https://ray.io/) and its highly customizable [RLlib](https://ray.readthedocs.io/en/latest/rllib.html#rllib-scalable-reinforcement-learning). In this section we show how to use _ReinforcementLearningEstimator_ and Ray/RLlib framework to train a cartpole playing agent. "
]
},
{
@@ -222,7 +226,7 @@
"- `entry_script`, path to your entry script relative to the source directory,\n",
"- `script_params`, constant parameters to be passed to each run of training script,\n",
"- `compute_target`, reference to the compute target in which the trainer and worker(s) jobs will be executed,\n",
"- `rl_framework`, the RL framework to be used (currently must be Ray).\n",
"- `rl_framework`, the reinforcement learning framework to be used (currently must be Ray).\n",
"\n",
"We use the `script_params` parameter to pass in general and algorithm-specific parameters to the training script.\n"
]
@@ -273,10 +277,10 @@
" # A dictionary of arguments to pass to the training script specified in ``entry_script``\n",
" script_params=script_params,\n",
" \n",
" # The Azure ML compute target set up for Ray head nodes\n",
" # The Azure Machine Learning compute target set up for Ray head nodes\n",
" compute_target=compute_target,\n",
" \n",
" # RL framework. Currently must be Ray.\n",
" # Reinforcement learning framework. Currently must be Ray.\n",
" rl_framework=Ray()\n",
")"
]
@@ -345,11 +349,11 @@
"metadata": {},
"source": [
"### Monitor experiment\n",
"Azure ML provides a Jupyter widget to show the real-time status of an experiment run. You could use this widget to monitor status of the runs.\n",
"Azure Machine Learning provides a Jupyter widget to show the status of an experiment run. You could use this widget to monitor the status of the runs.\n",
"\n",
"Note that _ReinforcementLearningEstimator_ creates at least two runs: (a) A parent run, i.e. the run returned above, and (b) a collection of child runs. The number of the child runs depends on the configuration of the reinforcement learning estimator. In our simple scenario, configured above, only one child run will be created.\n",
"\n",
"The widget will show a list of the child runs as well. You can click on the link under **Status** to see the details of a child run."
"The widget will show a list of the child runs as well. You can click on the link under **Status** to see the details of a child run. It will also show the metrics being logged."
]
},
{
@@ -369,7 +373,7 @@
"source": [
"### Stop the run\n",
"\n",
"To cancel the run, call `training_run.cancel()`."
"To stop the run, call `training_run.cancel()`."
]
},
{
@@ -577,10 +581,10 @@
" training_artifacts_ds.as_named_input('artifacts_dataset'),\n",
" training_artifacts_ds.as_named_input('artifacts_path').as_mount()],\n",
" \n",
" # The Azure ML compute target\n",
" # The Azure Machine Learning compute target\n",
" compute_target=compute_target,\n",
" \n",
" # RL framework. Currently must be Ray.\n",
" # Reinforcement learning framework. Currently must be Ray.\n",
" rl_framework=Ray(),\n",
" \n",
" # Additional pip packages to install\n",
@@ -662,7 +666,7 @@
"metadata": {},
"source": [
"## Next\n",
"This example was about running Azure ML RL (Ray/RLlib Framework) on compute instance. Please see [Cartpole problem](../cartpole-on-single-compute/cartpole_cc.ipynb)\n",
"This example was about running Reinforcement Learning in Azure Machine Learning (Ray/RLlib Framework) on a compute instance. Please see [Cartpole Problem on Single Compute](../cartpole-on-single-compute/cartpole_sc.ipynb)\n",
"example which uses Ray RLlib to train a Cartpole playing agent on a single node remote compute.\n"
]
}

View File

@@ -13,18 +13,18 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/tutorials/how-to-use-azureml/reinforcement-learning/cartpole_on_single_compute/cartpole_cc.png)"
"![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/tutorials/how-to-use-azureml/reinforcement-learning/cartpole_on_single_compute/cartpole_sc.png)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Azure ML Reinforcement Learning Sample - Cartpole Problem\n",
"# Reinforcement Learning in Azure Machine Learning - Cartpole Problem on Single Compute\n",
"\n",
"Azure ML Reinforcement Learning (Azure ML RL) is a managed service for running reinforcement learning training and simulation. With Azure MLRL, data scientists can start developing RL systems on one machine, and scale to compute clusters with 100\u00e2\u20ac\u2122s of nodes if needed.\n",
"Reinforcement Learning in Azure Machine Learning is a managed service for running reinforcement learning training and simulation. With Reinforcement Learning in Azure Machine Learning, data scientists can start developing reinforcement learning systems on one machine, and scale to compute targets with 100\u00e2\u20ac\u2122s of nodes if needed.\n",
"\n",
"This example shows how to use Azure ML RL to train a Cartpole playing agent on a single machine. "
"This example shows how to use Reinforcement Learning in Azure Machine Learning to train a Cartpole playing agent on a single compute. "
]
},
{
@@ -56,7 +56,7 @@
"metadata": {},
"source": [
"### Prerequisite\n",
"The user should have completed the Azure Machine Learning Tutorial: [Get started creating your first ML experiment with the Python SDK](https://docs.microsoft.com/en-us/azure/machine-learning/tutorial-1st-experiment-sdk-setup). You will need to make sure that you have a valid subscription id, a resource group and a workspace. All datastores and datasets you use should be associated with your workspace."
"The user should have completed the Azure Machine Learning Tutorial: [Get started creating your first ML experiment with the Python SDK](https://docs.microsoft.com/en-us/azure/machine-learning/tutorial-1st-experiment-sdk-setup). You will need to make sure that you have a valid subscription ID, a resource group, and an Azure Machine Learning workspace. All datastores and datasets you use should be associated with your workspace."
]
},
{
@@ -75,8 +75,8 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"### Azure ML SDK \n",
"Display the Azure ML SDK version."
"### Azure Machine Learning SDK \n",
"Display the Azure Machine Learning SDK version."
]
},
{
@@ -87,15 +87,15 @@
"source": [
"import azureml.core\n",
"\n",
"print(\"Azure ML SDK Version: \", azureml.core.VERSION)"
"print(\"Azure Machine Learning SDK Version: \", azureml.core.VERSION)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Get Azure ML workspace\n",
"Get a reference to an existing Azure ML workspace."
"### Get Azure Machine Learning workspace\n",
"Get a reference to an existing Azure Machine Learning workspace."
]
},
{
@@ -118,7 +118,7 @@
"\n",
"A compute target is a designated compute resource where you run your training and simulation scripts. This location may be your local machine or a cloud-based compute resource. The code below shows how to create a cloud-based compute target. For more information see [What are compute targets in Azure Machine Learning?](https://docs.microsoft.com/en-us/azure/machine-learning/concept-compute-target)\n",
"\n",
"**Note: Creation of a compute resource can take several minutes**"
"**Note: Creation of a compute resource can take several minutes**. Please make sure to change `STANDARD_D2_V2` to a [size available in your region](https://azure.microsoft.com/en-us/global-infrastructure/services/?products=virtual-machines)."
]
},
{
@@ -158,7 +158,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"### Create Azure ML experiment\n",
"### Create Azure Machine Learning experiment\n",
"Create an experiment to track the runs in your workspace. "
]
},
@@ -178,8 +178,8 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"## Train Cartpole Agent Using Azure ML RL\n",
"To facilitate reinforcement learning, Azure Machine Learning Python SDK provides a high level abstraction, the _ReinforcementLearningEstimator_ class, which allows users to easily construct RL run configurations for the underlying RL framework. Azure ML RL initially supports the [Ray framework](https://ray.io/) and its highly customizable [RLlib](https://ray.readthedocs.io/en/latest/rllib.html#rllib-scalable-reinforcement-learning). In this section we show how to use _ReinforcementLearningEstimator_ and Ray/RLlib framework to train a cartpole playing agent. "
"## Train Cartpole Agent\n",
"To facilitate reinforcement learning, Azure Machine Learning Python SDK provides a high level abstraction, the _ReinforcementLearningEstimator_ class, which allows users to easily construct reinforcement learning run configurations for the underlying reinforcement learning framework. Reinforcement Learning in Azure Machine Learning supports the open source [Ray framework](https://ray.io/) and its highly customizable [RLlib](https://ray.readthedocs.io/en/latest/rllib.html#rllib-scalable-reinforcement-learning). In this section we show how to use _ReinforcementLearningEstimator_ and Ray/RLlib framework to train a cartpole playing agent. "
]
},
{
@@ -196,7 +196,7 @@
"- `entry_script`, path to your entry script relative to the source directory,\n",
"- `script_params`, constant parameters to be passed to each run of training script,\n",
"- `compute_target`, reference to the compute target in which the trainer and worker(s) jobs will be executed,\n",
"- `rl_framework`, the RL framework to be used (currently must be Ray).\n",
"- `rl_framework`, the reinforcement learning framework to be used (currently must be Ray).\n",
"\n",
"We use the `script_params` parameter to pass in general and algorithm-specific parameters to the training script.\n"
]
@@ -249,7 +249,7 @@
" # There are two parts to this:\n",
" # 1. Use a custom docker file with proper instructions to install xvfb, ffmpeg, python-opengl\n",
" # and other dependencies. \n",
" # TODO: Add these instructions to default rl base image and drop this docker file.\n",
" # TODO: Add these instructions to default reinforcement learning base image and drop this docker file.\n",
" \n",
" with open(\"files/docker/Dockerfile\", \"r\") as f:\n",
" dockerfile=f.read()\n",
@@ -274,10 +274,10 @@
" # A dictionary of arguments to pass to the training script specified in ``entry_script``\n",
" script_params=script_params,\n",
" \n",
" # The Azure ML compute target set up for Ray head nodes\n",
" # The Azure Machine Learning compute target set up for Ray head nodes\n",
" compute_target=compute_target,\n",
" \n",
" # RL framework. Currently must be Ray.\n",
" # Reinforcement learning framework. Currently must be Ray.\n",
" rl_framework=Ray(),\n",
" \n",
" # Custom environmnet for Xvfb\n",
@@ -350,11 +350,11 @@
"source": [
"### Monitor experiment\n",
"\n",
"Azure ML provides a Jupyter widget to show the real-time status of an experiment run. You could use this widget to monitor status of the runs.\n",
"Azure Machine Learning provides a Jupyter widget to show the status of an experiment run. You could use this widget to monitor the status of the runs.\n",
"\n",
"Note that _ReinforcementLearningEstimator_ creates at least two runs: (a) A parent run, i.e. the run returned above, and (b) a collection of child runs. The number of the child runs depends on the configuration of the reinforcement learning estimator. In our simple scenario, configured above, only one child run will be created.\n",
"\n",
"The widget will show a list of the child runs as well. You can click on the link under **Status** to see the details of a child run."
"The widget will show a list of the child runs as well. You can click on the link under **Status** to see the details of a child run. It will also show the metrics being logged."
]
},
{
@@ -373,7 +373,7 @@
"metadata": {},
"source": [
"### Stop the run\n",
"To cancel the run, call `training_run.cancel()`."
"To stop the run, call `training_run.cancel()`."
]
},
{
@@ -393,7 +393,7 @@
"### Wait for completion\n",
"Wait for the run to complete before proceeding.\n",
"\n",
"**Note: The length of the run depends on the provisioning time of the compute target and may take several minutes to complete.**"
"**Note: The length of the run depends on the provisioning time of the compute target and it may take several minutes to complete.**"
]
},
{
@@ -560,18 +560,20 @@
" dir_util.mkpath(destination)\n",
" \n",
" try:\n",
" # Mount dataset and copy movies\n",
" pirnt(\"Trying mounting dataset and copying movies.\")\n",
" # Note: We assume movie paths start with '\\'\n",
" mount_context = artifacts_ds.mount()\n",
" mount_context.start()\n",
" print('Download started.')\n",
" for movie in movies:\n",
" print('Copying {} ...'.format(movie))\n",
" shutil.copy2(path.join(mount_context.mount_point, movie[1:]), destination)\n",
" mount_context.stop()\n",
" except:\n",
" print(\"Mounting error! Downloading all artifacts ...\")\n",
" artifacts_ds.download(target_path=destination, overwrite=True)\n",
" print(\"Mounting failed! Going with dataset download.\")\n",
" for i, file in enumerate(artifacts_ds.to_path()):\n",
" if file in movies:\n",
" print('Downloading {} ...'.format(file))\n",
" artifacts_ds.skip(i).take(1).download(target_path=destination, overwrite=True)\n",
" \n",
" print('Downloading movies completed!')\n",
"\n",
@@ -625,7 +627,7 @@
"print(\"Last movie:\", last_movie)\n",
"\n",
"# Download movies\n",
"training_movies_path = \"training\"\n",
"training_movies_path = path.join(\"training\", \"videos\")\n",
"download_movies(training_artifacts_ds, [first_movie, last_movie], training_movies_path)"
]
},
@@ -781,7 +783,7 @@
"# 1. Use a custom docker file with proper instructions to install xvfb, ffmpeg, python-opengl\n",
"# and other dependencies.\n",
"# Note: Even when the rendering is off pyhton-opengl is needed.\n",
"# TODO: Add these instructions to default rl base image and drop this docker file.\n",
"# TODO: Add these instructions to default reinforcement learning base image and drop this docker file.\n",
"\n",
"with open(\"files/docker/Dockerfile\", \"r\") as f:\n",
" dockerfile=f.read()\n",
@@ -811,10 +813,10 @@
" training_artifacts_ds.as_named_input('artifacts_dataset'),\n",
" training_artifacts_ds.as_named_input('artifacts_path').as_mount()],\n",
" \n",
" # The Azure ML compute target set up for Ray head nodes\n",
" # The Azure Machine Learning compute target set up for Ray head nodes\n",
" compute_target=compute_target,\n",
" \n",
" # RL framework. Currently must be Ray.\n",
" # Reinforcement learning framework. Currently must be Ray.\n",
" rl_framework=Ray(),\n",
" \n",
" # Custom environmnet for Xvfb\n",
@@ -928,7 +930,7 @@
"print(\"Last movie:\", last_movie)\n",
"\n",
"# Download last movie\n",
"rollout_movies_path = \"rollout\"\n",
"rollout_movies_path = path.join(\"rollout\", \"videos\")\n",
"download_movies(rollout_artifacts_ds, [last_movie], rollout_movies_path)\n",
"\n",
"# Look for the downloaded movie in local directory\n",
@@ -996,7 +998,7 @@
"metadata": {},
"source": [
"## Next\n",
"This example was about running Azure ML RL (Ray/RLlib Framework) on a single node. Please see [Pong problem](../atari-on-distributed-compute/pong_rllib.ipynb)\n",
"This example was about running Reinforcement Learning in Azure Machine Learning (Ray/RLlib Framework) on a single compute. Please see [Pong Problem](../atari-on-distributed-compute/pong_rllib.ipynb)\n",
"example which uses Ray RLlib to train a Pong playing agent on a multi-node cluster."
]
}

View File

@@ -0,0 +1,70 @@
FROM mcr.microsoft.com/azureml/base:openmpi3.1.2-ubuntu18.04
# Install some basic utilities
RUN apt-get update && apt-get install -y \
curl \
ca-certificates \
sudo \
cpio \
git \
bzip2 \
libx11-6 \
tmux \
htop \
gcc \
xvfb \
python-opengl \
x11-xserver-utils \
ffmpeg \
mesa-utils \
nano \
vim \
rsync \
&& rm -rf /var/lib/apt/lists/*
# Create a working directory
RUN mkdir /app
WORKDIR /app
# Install Minecraft needed libraries
RUN mkdir -p /usr/share/man/man1 && \
sudo apt-get update && \
sudo apt-get install -y \
openjdk-8-jre-headless=8u162-b12-1 \
openjdk-8-jdk-headless=8u162-b12-1 \
openjdk-8-jre=8u162-b12-1 \
openjdk-8-jdk=8u162-b12-1
# Create a Python 3.7 environment
RUN conda install conda-build \
&& conda create -y --name py37 python=3.7.3 \
&& conda clean -ya
ENV CONDA_DEFAULT_ENV=py37
# Install minerl
RUN pip install --upgrade --user minerl
RUN pip install \
pandas \
matplotlib \
numpy \
scipy \
azureml-defaults \
tensorboardX \
tensorflow==1.15rc2 \
tabulate \
dm_tree \
lz4 \
ray==0.8.3 \
ray[rllib]==0.8.3 \
ray[tune]==0.8.3
COPY patch_files/* /root/.local/lib/python3.7/site-packages/minerl/env/Malmo/Minecraft/src/main/java/com/microsoft/Malmo/Client/
# Start minerl to pre-fetch minerl files (saves time when starting minerl during training)
RUN xvfb-run -a -s "-screen 0 1400x900x24" python -c "import gym; import minerl; env = gym.make('MineRLTreechop-v0'); env.close();"
RUN pip install --index-url https://test.pypi.org/simple/ malmo && \
python -c "import malmo.minecraftbootstrap; malmo.minecraftbootstrap.download();"
ENV MALMO_XSD_PATH="/app/MalmoPlatform/Schemas"

View File

@@ -0,0 +1,939 @@
// --------------------------------------------------------------------------------------------------
// Copyright (c) 2016 Microsoft Corporation
//
// Permission is hereby granted, free of charge, to any person obtaining a copy of this software and
// associated documentation files (the "Software"), to deal in the Software without restriction,
// including without limitation the rights to use, copy, modify, merge, publish, distribute,
// sublicense, and/or l copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all copies or
// substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT
// NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
// --------------------------------------------------------------------------------------------------
package com.microsoft.Malmo.Client;
import com.microsoft.Malmo.MalmoMod;
import com.microsoft.Malmo.MissionHandlerInterfaces.IWantToQuit;
import com.microsoft.Malmo.Schemas.MissionInit;
import com.microsoft.Malmo.Utils.TCPUtils;
import net.minecraft.profiler.Profiler;
import com.microsoft.Malmo.Utils.TimeHelper;
import net.minecraftforge.common.config.Configuration;
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.Hashtable;
import com.microsoft.Malmo.Utils.TCPInputPoller;
import java.util.logging.Level;
import java.util.LinkedList;
import java.util.List;
/**
* MalmoEnvServer - service supporting OpenAI gym "environment" for multi-agent Malmo missions.
*/
public class MalmoEnvServer implements IWantToQuit {
private static Profiler profiler = new Profiler();
private static int nsteps = 0;
private static boolean debug = false;
private static String hello = "<MalmoEnv" ;
private class EnvState {
// Mission parameters:
String missionInit = null;
String token = null;
String experimentId = null;
int agentCount = 0;
int reset = 0;
boolean quit = false;
boolean synchronous = false;
Long seed = null;
// OpenAI gym state:
boolean done = false;
double reward = 0.0;
byte[] obs = null;
String info = "";
LinkedList<String> commands = new LinkedList<String>();
}
private static boolean envPolicy = false; // Are we configured by config policy?
// Synchronize on EnvStateasd
private Lock lock = new ReentrantLock();
private Condition cond = lock.newCondition();
private EnvState envState = new EnvState();
private Hashtable<String, Integer> initTokens = new Hashtable<String, Integer>();
static final long COND_WAIT_SECONDS = 3; // Max wait in seconds before timing out (and replying to RPC).
static final int BYTES_INT = 4;
static final int BYTES_DOUBLE = 8;
private static final Charset utf8 = Charset.forName("UTF-8");
// Service uses a single per-environment client connection - initiated by the remote environment.
private int port;
private TCPInputPoller missionPoller; // Used for command parsing and not actual communication.
private String version;
// AOG: From running experiments, I've found that MineRL can get stuck resetting the
// environment which causes huge delays while we wait for the Python side to time
// out and restart the Minecraft instace. Minecraft itself is normally in a recoverable
// state, but the MalmoEnvServer instance will be blocked in a tight spin loop trying
// handling a Peek request from the Python client. To unstick things, I've added this
// flag that can be set when we know things are in a bad state to abort the peek request.
// WARNING: THIS IS ONLY TREATING THE SYMPTOM AND NOT THE ROOT CAUSE
// The reason things are getting stuck is because the player is either dying or we're
// receiving a quit request while an episode reset is in progress.
private boolean abortRequest;
public void abort() {
System.out.println("AOG: MalmoEnvServer.abort");
abortRequest = true;
}
/***
* Malmo "Env" service.
* @param port the port the service listens on.
* @param missionPoller for plugging into existing comms handling.
*/
public MalmoEnvServer(String version, int port, TCPInputPoller missionPoller) {
this.version = version;
this.missionPoller = missionPoller;
this.port = port;
// AOG - Assume we don't wan't to be aborting in the first place
this.abortRequest = false;
}
/** Initialize malmo env configuration. For now either on or "legacy" AgentHost protocol.*/
static public void update(Configuration configs) {
envPolicy = configs.get(MalmoMod.ENV_CONFIGS, "env", "false").getBoolean();
}
public static boolean isEnv() {
return envPolicy;
}
/**
* Start servicing the MalmoEnv protocol.
* @throws IOException
*/
public void serve() throws IOException {
ServerSocket serverSocket = new ServerSocket(port);
serverSocket.setPerformancePreferences(0,2,1);
while (true) {
try {
final Socket socket = serverSocket.accept();
socket.setTcpNoDelay(true);
Thread thread = new Thread("EnvServerSocketHandler") {
public void run() {
boolean running = false;
try {
checkHello(socket);
while (true) {
DataInputStream din = new DataInputStream(socket.getInputStream());
int hdr = din.readInt();
byte[] data = new byte[hdr];
din.readFully(data);
String command = new String(data, utf8);
if (command.startsWith("<Step")) {
profiler.startSection("root");
long start = System.nanoTime();
step(command, socket, din);
profiler.endSection();
if (nsteps % 100 == 0 && debug){
List<Profiler.Result> dat = profiler.getProfilingData("root");
for(int qq = 0; qq < dat.size(); qq++){
Profiler.Result res = dat.get(qq);
System.out.println(res.profilerName + " " + res.totalUsePercentage + " "+ res.usePercentage);
}
}
} else if (command.startsWith("<Peek")) {
peek(command, socket, din);
} else if (command.startsWith("<Init")) {
init(command, socket);
} else if (command.startsWith("<Find")) {
find(command, socket);
} else if (command.startsWith("<MissionInit")) {
if (missionInit(din, command, socket))
{
running = true;
}
} else if (command.startsWith("<Quit")) {
quit(command, socket);
profiler.profilingEnabled = false;
} else if (command.startsWith("<Exit")) {
exit(command, socket);
profiler.profilingEnabled = false;
} else if (command.startsWith("<Close")) {
close(command, socket);
profiler.profilingEnabled = false;
} else if (command.startsWith("<Status")) {
status(command, socket);
} else if (command.startsWith("<Echo")) {
command = "<Echo>" + command + "</Echo>";
data = command.getBytes(utf8);
hdr = data.length;
DataOutputStream dout = new DataOutputStream(socket.getOutputStream());
dout.writeInt(hdr);
dout.write(data, 0, hdr);
dout.flush();
} else {
throw new IOException("Unknown env service command");
}
}
} catch (IOException ioe) {
// ioe.printStackTrace();
TCPUtils.Log(Level.SEVERE, "MalmoEnv socket error: " + ioe + " (can be on disconnect)");
// System.out.println("[ERROR] " + "MalmoEnv socket error: " + ioe + " (can be on disconnect)");
// TimeHelper.SyncManager.debugLog("[MALMO_ENV_SERVER] MalmoEnv socket error");
try {
if (running) {
TCPUtils.Log(Level.INFO,"Want to quit on disconnect.");
System.out.println("[LOGTOPY] " + "Want to quit on disconnect.");
setWantToQuit();
}
socket.close();
} catch (IOException ioe2) {
}
}
}
};
thread.start();
} catch (IOException ioe) {
TCPUtils.Log(Level.SEVERE, "MalmoEnv service exits on " + ioe);
}
}
}
private void checkHello(Socket socket) throws IOException {
DataInputStream din = new DataInputStream(socket.getInputStream());
int hdr = din.readInt();
if (hdr <= 0 || hdr > hello.length() + 8) // Version number may be somewhat longer in future.
throw new IOException("Invalid MalmoEnv hello header length");
byte[] data = new byte[hdr];
din.readFully(data);
if (!new String(data).startsWith(hello + version))
throw new IOException("MalmoEnv invalid protocol or version - expected " + hello + version);
}
// Handler for <MissionInit> messages.
private boolean missionInit(DataInputStream din, String command, Socket socket) throws IOException {
String ipOriginator = socket.getInetAddress().getHostName();
int hdr;
byte[] data;
hdr = din.readInt();
data = new byte[hdr];
din.readFully(data);
String id = new String(data, utf8);
TCPUtils.Log(Level.INFO,"Mission Init" + id);
String[] token = id.split(":");
String experimentId = token[0];
int role = Integer.parseInt(token[1]);
int reset = Integer.parseInt(token[2]);
int agentCount = Integer.parseInt(token[3]);
Boolean isSynchronous = Boolean.parseBoolean(token[4]);
Long seed = null;
if(token.length > 5)
seed = Long.parseLong(token[5]);
if(isSynchronous && agentCount > 1){
throw new IOException("Synchronous mode currently does not support multiple agents.");
}
port = -1;
boolean allTokensConsumed = true;
boolean started = false;
lock.lock();
try {
if (role == 0) {
String previousToken = experimentId + ":0:" + (reset - 1);
initTokens.remove(previousToken);
String myToken = experimentId + ":0:" + reset;
if (!initTokens.containsKey(myToken)) {
TCPUtils.Log(Level.INFO,"(Pre)Start " + role + " reset " + reset);
started = startUp(command, ipOriginator, experimentId, reset, agentCount, myToken, seed, isSynchronous);
if (started)
initTokens.put(myToken, 0);
} else {
started = true; // Pre-started previously.
}
// Check that all previous tokens have been consumed. If not don't proceed to mission.
allTokensConsumed = areAllTokensConsumed(experimentId, reset, agentCount);
if (!allTokensConsumed) {
try {
cond.await(COND_WAIT_SECONDS, TimeUnit.SECONDS);
} catch (InterruptedException ie) {
}
allTokensConsumed = areAllTokensConsumed(experimentId, reset, agentCount);
}
} else {
TCPUtils.Log(Level.INFO, "Start " + role + " reset " + reset);
started = startUp(command, ipOriginator, experimentId, reset, agentCount, experimentId + ":" + role + ":" + reset, seed, isSynchronous);
}
} finally {
lock.unlock();
}
DataOutputStream dout = new DataOutputStream(socket.getOutputStream());
dout.writeInt(BYTES_INT);
dout.writeInt(allTokensConsumed && started ? 1 : 0);
dout.flush();
dout.flush();
return allTokensConsumed && started;
}
private boolean areAllTokensConsumed(String experimentId, int reset, int agentCount) {
boolean allTokensConsumed = true;
for (int i = 1; i < agentCount; i++) {
String tokenForAgent = experimentId + ":" + i + ":" + (reset - 1);
if (initTokens.containsKey(tokenForAgent)) {
TCPUtils.Log(Level.FINE,"Mission init - unconsumed " + tokenForAgent);
allTokensConsumed = false;
}
}
return allTokensConsumed;
}
private boolean startUp(String command, String ipOriginator, String experimentId, int reset, int agentCount, String myToken, Long seed, Boolean isSynchronous) throws IOException {
// Clear out mission state
envState.reward = 0.0;
envState.commands.clear();
envState.obs = null;
envState.info = "";
envState.missionInit = command;
envState.done = false;
envState.quit = false;
envState.token = myToken;
envState.experimentId = experimentId;
envState.agentCount = agentCount;
envState.reset = reset;
envState.synchronous = isSynchronous;
envState.seed = seed;
return startUpMission(command, ipOriginator);
}
private boolean startUpMission(String command, String ipOriginator) throws IOException {
if (missionPoller == null)
return false;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
missionPoller.commandReceived(command, ipOriginator, dos);
dos.flush();
byte[] reply = baos.toByteArray();
ByteArrayInputStream bais = new ByteArrayInputStream(reply);
DataInputStream dis = new DataInputStream(bais);
int hdr = dis.readInt();
byte[] replyBytes = new byte[hdr];
dis.readFully(replyBytes);
String replyStr = new String(replyBytes);
if (replyStr.equals("MALMOOK")) {
TCPUtils.Log(Level.INFO, "MalmoEnvServer Mission starting ...");
return true;
} else if (replyStr.equals("MALMOBUSY")) {
TCPUtils.Log(Level.INFO, "MalmoEnvServer Busy - I want to quit");
this.envState.quit = true;
}
return false;
}
private static final int stepTagLength = "<Step_>".length(); // Step with option code.
private synchronized void stepSync(String command, Socket socket, DataInputStream din) throws IOException
{
// TimeHelper.SyncManager.debugLog("[MALMO_ENV_SERVER] <STEP> Entering synchronous step.");
nsteps += 1;
profiler.startSection("commandProcessing");
String actions = command.substring(stepTagLength, command.length() - (stepTagLength + 2));
int options = Character.getNumericValue(command.charAt(stepTagLength - 2));
boolean withInfo = options == 0 || options == 2;
// Prepare to write data to the client.
DataOutputStream dout = new DataOutputStream(socket.getOutputStream());
double reward = 0.0;
boolean done;
byte[] obs;
String info = "";
boolean sent = false;
// TimeHelper.SyncManager.debugLog("[MALMO_ENV_SERVER] <STEP> Acquiring lock for synchronous step.");
lock.lock();
try {
// TimeHelper.SyncManager.debugLog("[MALMO_ENV_SERVER] <STEP> Lock is acquired.");
done = envState.done;
// TODO Handle when the environment is done.
// Process the actions.
if (actions.contains("\n")) {
String[] cmds = actions.split("\\n");
for(String cmd : cmds) {
envState.commands.add(cmd);
}
} else {
if (!actions.isEmpty())
envState.commands.add(actions);
}
sent = true;
profiler.endSection(); //cmd
profiler.startSection("requestTick");
// TimeHelper.SyncManager.debugLog("[MALMO_ENV_SERVER] <STEP> Received: " + actions);
// TimeHelper.SyncManager.debugLog("[MALMO_ENV_SERVER] <STEP> Requesting tick.");
// Now wait to run a tick
// If synchronous mode is off then we should see if want to quit is true.
while(!TimeHelper.SyncManager.requestTick() && !done ){Thread.yield();}
// TimeHelper.SyncManager.debugLog("[MALMO_ENV_SERVER] <STEP> Tick request granted.");
profiler.endSection();
profiler.startSection("waitForTick");
// TimeHelper.SyncManager.debugLog("[MALMO_ENV_SERVER] <STEP> Waiting for tick.");
// Then wait until the tick is finished
while(!TimeHelper.SyncManager.isTickCompleted() && !done ){ Thread.yield();}
// TimeHelper.SyncManager.debugLog("[MALMO_ENV_SERVER] <STEP> TICK DONE. Getting observation.");
profiler.endSection();
profiler.startSection("getObservation");
// After which, get the observations.
obs = getObservation(done);
// TimeHelper.SyncManager.debugLog("[MALMO_ENV_SERVER] <STEP> Observation received. Getting info.");
profiler.endSection();
profiler.startSection("getInfo");
// Pick up rewards.
reward = envState.reward;
if (withInfo) {
info = envState.info;
// if(info == null)
// TimeHelper.SyncManager.debugLog("[MALMO_ENV_SERVER] <STEP> FILLING INFO: NULL");
// else
// TimeHelper.SyncManager.debugLog("[MALMO_ENV_SERVER] <STEP> FILLING " + info.toString());
}
done = envState.done;
// TimeHelper.SyncManager.debugLog("[MALMO_ENV_SERVER] <STEP> STATUS " + Boolean.toString(done));
envState.info = null;
envState.obs = null;
envState.reward = 0.0;
// TimeHelper.SyncManager.debugLog("[MALMO_ENV_SERVER] <STEP> Info received..");
profiler.endSection();
} finally {
lock.unlock();
}
// TimeHelper.SyncManager.debugLog("[MALMO_ENV_SERVER] <STEP> Lock released. Writing observation, info, done.");
profiler.startSection("writeObs");
dout.writeInt(obs.length);
dout.write(obs);
dout.writeInt(BYTES_DOUBLE + 2);
dout.writeDouble(reward);
dout.writeByte(done ? 1 : 0);
dout.writeByte(sent ? 1 : 0);
if (withInfo) {
byte[] infoBytes = info.getBytes(utf8);
dout.writeInt(infoBytes.length);
dout.write(infoBytes);
}
profiler.endSection(); //write obs
profiler.startSection("flush");
// TimeHelper.SyncManager.debugLog("[MALMO_ENV_SERVER] <STEP> Packets written. Flushing.");
dout.flush();
profiler.endSection(); // flush
// TimeHelper.SyncManager.debugLog("[MALMO_ENV_SERVER] <STEP> Done with step.");
}
// Handler for <Step_> messages. Single digit option code after _ specifies if turnkey and info are included in message.
private void step(String command, Socket socket, DataInputStream din) throws IOException {
if(envState.synchronous){
stepSync(command, socket, din);
}
else{
System.out.println("[ERROR] Asynchronous stepping is not supported in MineRL.");
}
}
// Handler for <Peek> messages.
private void peek(String command, Socket socket, DataInputStream din) throws IOException {
DataOutputStream dout = new DataOutputStream(socket.getOutputStream());
byte[] obs;
boolean done;
String info = "";
// AOG - As we've only seen issues with the peek reqest, I've focused my changes to just
// this function. Initially we want to be optimistic and assume we're not going to abort
// the request and my observations of event timings indicate that there is plenty of time
// between the peek request being received and the reset failing, so a race condition is
// unlikely.
abortRequest = false;
lock.lock();
try {
// TimeHelper.SyncManager.debugLog("[MALMO_ENV_SERVER] <PEEK> Waiting for pistol to fire.");
while(!TimeHelper.SyncManager.hasServerFiredPistol() && !abortRequest){
// Now wait to run a tick
while(!TimeHelper.SyncManager.requestTick() && !abortRequest){Thread.yield();}
// Then wait until the tick is finished
while(!TimeHelper.SyncManager.isTickCompleted() && !abortRequest){ Thread.yield();}
Thread.yield();
}
if (abortRequest) {
System.out.println("AOG: Aborting peek request");
// AOG - We detect the lack of observation within our Python wrapper and throw a slightly
// diferent exception that by-passes MineRLs automatic clean up code. If we were to report
// 'done', the MineRL detects this as a runtime error and kills the Minecraft process
// triggering a lengthy restart. So far from testing, Minecraft itself is fine can we can
// retry the reset, it's only the tight loops above that were causing things to stall and
// timeout.
// No observation
dout.writeInt(0);
// No info
dout.writeInt(0);
// Done
dout.writeInt(1);
dout.writeByte(0);
dout.flush();
return;
}
// TimeHelper.SyncManager.debugLog("[MALMO_ENV_SERVER] <PEEK> Pistol fired!.");
// Wait two ticks for the first observation from server to be propagated.
while(!TimeHelper.SyncManager.requestTick() ){Thread.yield();}
// Then wait until the tick is finished
while(!TimeHelper.SyncManager.isTickCompleted()){ Thread.yield();}
while(!TimeHelper.SyncManager.requestTick() ){Thread.yield();}
// Then wait until the tick is finished
while(!TimeHelper.SyncManager.isTickCompleted()){ Thread.yield();}
// TimeHelper.SyncManager.debugLog("[MALMO_ENV_SERVER] <PEEK> Getting observation.");
obs = getObservation(false);
// TimeHelper.SyncManager.debugLog("[MALMO_ENV_SERVER] <PEEK> Observation acquired.");
done = envState.done;
info = envState.info;
} finally {
lock.unlock();
}
dout.writeInt(obs.length);
dout.write(obs);
byte[] infoBytes = info.getBytes(utf8);
dout.writeInt(infoBytes.length);
dout.write(infoBytes);
dout.writeInt(1);
dout.writeByte(done ? 1 : 0);
dout.flush();
}
// Get the current observation. If none and not done wait for a short time.
public byte[] getObservation(boolean done) {
byte[] obs = envState.obs;
if (obs == null){
System.out.println("[ERROR] Video observation is null; please notify the developer.");
}
return obs;
}
// Handler for <Find> messages - used by non-zero roles to discover integrated server port from primary (role 0) service.
private final static int findTagLength = "<Find>".length();
private void find(String command, Socket socket) throws IOException {
Integer port;
lock.lock();
try {
String token = command.substring(findTagLength, command.length() - (findTagLength + 1));
TCPUtils.Log(Level.INFO, "Find token? " + token);
// Purge previous token.
String[] tokenSplits = token.split(":");
String experimentId = tokenSplits[0];
int role = Integer.parseInt(tokenSplits[1]);
int reset = Integer.parseInt(tokenSplits[2]);
String previousToken = experimentId + ":" + role + ":" + (reset - 1);
initTokens.remove(previousToken);
cond.signalAll();
// Check for next token. Wait for a short time if not already produced.
port = initTokens.get(token);
if (port == null) {
try {
cond.await(COND_WAIT_SECONDS, TimeUnit.SECONDS);
} catch (InterruptedException ie) {
}
port = initTokens.get(token);
if (port == null) {
port = 0;
TCPUtils.Log(Level.INFO,"Role " + role + " reset " + reset + " waiting for token.");
}
}
} finally {
lock.unlock();
}
DataOutputStream dout = new DataOutputStream(socket.getOutputStream());
dout.writeInt(BYTES_INT);
dout.writeInt(port);
dout.flush();
}
public boolean isSynchronous(){
return envState.synchronous;
}
// Handler for <Init> messages. These reset the service so use with care!
private void init(String command, Socket socket) throws IOException {
lock.lock();
try {
initTokens = new Hashtable<String, Integer>();
DataOutputStream dout = new DataOutputStream(socket.getOutputStream());
dout.writeInt(BYTES_INT);
dout.writeInt(1);
dout.flush();
} finally {
lock.unlock();
}
}
// Handler for <Quit> (quit mission) messages.
private void quit(String command, Socket socket) throws IOException {
lock.lock();
try {
if (!envState.done){
envState.quit = true;
}
// TimeHelper.SyncManager.debugLog("[MALMO_ENV_SERVER] <PEEK> Pistol fired!.");
// Wait two ticks for the first observation from server to be propagated.
while(!TimeHelper.SyncManager.requestTick() ){Thread.yield();}
// Then wait until the tick is finished
while(!TimeHelper.SyncManager.isTickCompleted()){ Thread.yield();}
DataOutputStream dout = new DataOutputStream(socket.getOutputStream());
dout.writeInt(BYTES_INT);
dout.writeInt(envState.done ? 1 : 0);
dout.flush();
} finally {
lock.unlock();
}
}
private final static int closeTagLength = "<Close>".length();
// Handler for <Close> messages.
private void close(String command, Socket socket) throws IOException {
lock.lock();
try {
String token = command.substring(closeTagLength, command.length() - (closeTagLength + 1));
initTokens.remove(token);
DataOutputStream dout = new DataOutputStream(socket.getOutputStream());
dout.writeInt(BYTES_INT);
dout.writeInt(1);
dout.flush();
} finally {
lock.unlock();
}
}
// Handler for <Status> messages.
private void status(String command, Socket socket) throws IOException {
lock.lock();
try {
String status = "{}"; // TODO Possibly have something more interesting to report.
DataOutputStream dout = new DataOutputStream(socket.getOutputStream());
byte[] statusBytes = status.getBytes(utf8);
dout.writeInt(statusBytes.length);
dout.write(statusBytes);
dout.flush();
} finally {
lock.unlock();
}
}
// Handler for <Exit> messages. These "kill the service" temporarily so use with care!f
private void exit(String command, Socket socket) throws IOException {
// lock.lock();
try {
// We may exit before we get a chance to reply.
TimeHelper.SyncManager.setSynchronous(false);
DataOutputStream dout = new DataOutputStream(socket.getOutputStream());
dout.writeInt(BYTES_INT);
dout.writeInt(1);
dout.flush();
ClientStateMachine.exitJava();
} finally {
// lock.unlock();
}
}
// Malmo client state machine interface methods:
public String getCommand() {
try {
String command = envState.commands.poll();
if (command == null)
return "";
else
return command;
} finally {
}
}
public void endMission() {
// lock.lock();
try {
// AOG - If the mission is ending, we always want to abort requests and they won't
// be able to progress to completion and will stall.
System.out.println("AOG: MalmoEnvServer.endMission");
abort();
envState.done = true;
envState.quit = false;
envState.missionInit = null;
if (envState.token != null) {
initTokens.remove(envState.token);
envState.token = null;
envState.experimentId = null;
envState.agentCount = 0;
envState.reset = 0;
// cond.signalAll();
}
// lock.unlock();
} finally {
}
}
// Record a Malmo "observation" json - as the env info since an environment "obs" is a video frame.
public void observation(String info) {
// Parsing obs as JSON would be slower but less fragile than extracting the turn_key using string search.
// lock.lock();
try {
// TimeHelper.SyncManager.debugLog("[MALMO_ENV_SERVER] <OBSERVATION> Inserting: " + info);
envState.info = info;
// cond.signalAll();
} finally {
// lock.unlock();
}
}
public void addRewards(double rewards) {
// lock.lock();
try {
envState.reward += rewards;
} finally {
// lock.unlock();
}
}
public void addFrame(byte[] frame) {
// lock.lock();
try {
envState.obs = frame; // Replaces current.
// cond.signalAll();
} finally {
// lock.unlock();
}
}
public void notifyIntegrationServerStarted(int integrationServerPort) {
lock.lock();
try {
if (envState.token != null) {
TCPUtils.Log(Level.INFO,"Integration server start up - token: " + envState.token);
addTokens(integrationServerPort, envState.token, envState.experimentId, envState.agentCount, envState.reset);
cond.signalAll();
} else {
TCPUtils.Log(Level.WARNING,"No mission token on integration server start up!");
}
} finally {
lock.unlock();
}
}
private void addTokens(int integratedServerPort, String myToken, String experimentId, int agentCount, int reset) {
initTokens.put(myToken, integratedServerPort);
// Place tokens for other agents to find.
for (int i = 1; i < agentCount; i++) {
String tokenForAgent = experimentId + ":" + i + ":" + reset;
initTokens.put(tokenForAgent, integratedServerPort);
}
}
// IWantToQuit implementation.
@Override
public boolean doIWantToQuit(MissionInit missionInit) {
// lock.lock();
try {
return envState.quit;
} finally {
// lock.unlock();
}
}
public Long getSeed(){
return envState.seed;
}
private void setWantToQuit() {
// lock.lock();
try {
envState.quit = true;
} finally {
if(TimeHelper.SyncManager.isSynchronous()){
// We want to dsynchronize everything.
TimeHelper.SyncManager.setSynchronous(false);
}
// lock.unlock();
}
}
@Override
public void prepare(MissionInit missionInit) {
}
@Override
public void cleanup() {
}
@Override
public String getOutcome() {
return "Env quit";
}
}

View File

@@ -0,0 +1,78 @@
FROM mcr.microsoft.com/azureml/base-gpu:openmpi3.1.2-cuda10.0-cudnn7-ubuntu18.04
# Install some basic utilities
RUN apt-get update && apt-get install -y \
curl \
ca-certificates \
sudo \
cpio \
git \
bzip2 \
libx11-6 \
tmux \
htop \
gcc \
xvfb \
python-opengl \
x11-xserver-utils \
ffmpeg \
mesa-utils \
nano \
vim \
rsync \
&& rm -rf /var/lib/apt/lists/*
# Create a working directory
RUN mkdir /app
WORKDIR /app
# Create a Python 3.7 environment
RUN conda install conda-build \
&& conda create -y --name py37 python=3.7.3 \
&& conda clean -ya
ENV CONDA_DEFAULT_ENV=py37
# Install Minecraft needed libraries
RUN mkdir -p /usr/share/man/man1 && \
sudo apt-get update && \
sudo apt-get install -y \
openjdk-8-jre-headless=8u162-b12-1 \
openjdk-8-jdk-headless=8u162-b12-1 \
openjdk-8-jre=8u162-b12-1 \
openjdk-8-jdk=8u162-b12-1
RUN pip install --upgrade --user minerl
# PyTorch with CUDA 10 installation
RUN conda install -y -c pytorch \
cuda100=1.0 \
magma-cuda100=2.4.0 \
"pytorch=1.1.0=py3.7_cuda10.0.130_cudnn7.5.1_0" \
torchvision=0.3.0 \
&& conda clean -ya
RUN pip install \
pandas \
matplotlib \
numpy \
scipy \
azureml-defaults \
tensorboardX \
tensorflow-gpu==1.15rc2 \
GPUtil \
tabulate \
dm_tree \
lz4 \
ray==0.8.3 \
ray[rllib]==0.8.3 \
ray[tune]==0.8.3
COPY patch_files/* /root/.local/lib/python3.7/site-packages/minerl/env/Malmo/Minecraft/src/main/java/com/microsoft/Malmo/Client/
# Start minerl to pre-fetch minerl files (saves time when starting minerl during training)
RUN xvfb-run -a -s "-screen 0 1400x900x24" python -c "import gym; import minerl; env = gym.make('MineRLTreechop-v0'); env.close();"
RUN pip install --index-url https://test.pypi.org/simple/ malmo && \
python -c "import malmo.minecraftbootstrap; malmo.minecraftbootstrap.download();"
ENV MALMO_XSD_PATH="/app/MalmoPlatform/Schemas"

View File

@@ -0,0 +1,939 @@
// --------------------------------------------------------------------------------------------------
// Copyright (c) 2016 Microsoft Corporation
//
// Permission is hereby granted, free of charge, to any person obtaining a copy of this software and
// associated documentation files (the "Software"), to deal in the Software without restriction,
// including without limitation the rights to use, copy, modify, merge, publish, distribute,
// sublicense, and/or l copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all copies or
// substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT
// NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
// --------------------------------------------------------------------------------------------------
package com.microsoft.Malmo.Client;
import com.microsoft.Malmo.MalmoMod;
import com.microsoft.Malmo.MissionHandlerInterfaces.IWantToQuit;
import com.microsoft.Malmo.Schemas.MissionInit;
import com.microsoft.Malmo.Utils.TCPUtils;
import net.minecraft.profiler.Profiler;
import com.microsoft.Malmo.Utils.TimeHelper;
import net.minecraftforge.common.config.Configuration;
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.Hashtable;
import com.microsoft.Malmo.Utils.TCPInputPoller;
import java.util.logging.Level;
import java.util.LinkedList;
import java.util.List;
/**
* MalmoEnvServer - service supporting OpenAI gym "environment" for multi-agent Malmo missions.
*/
public class MalmoEnvServer implements IWantToQuit {
private static Profiler profiler = new Profiler();
private static int nsteps = 0;
private static boolean debug = false;
private static String hello = "<MalmoEnv" ;
private class EnvState {
// Mission parameters:
String missionInit = null;
String token = null;
String experimentId = null;
int agentCount = 0;
int reset = 0;
boolean quit = false;
boolean synchronous = false;
Long seed = null;
// OpenAI gym state:
boolean done = false;
double reward = 0.0;
byte[] obs = null;
String info = "";
LinkedList<String> commands = new LinkedList<String>();
}
private static boolean envPolicy = false; // Are we configured by config policy?
// Synchronize on EnvStateasd
private Lock lock = new ReentrantLock();
private Condition cond = lock.newCondition();
private EnvState envState = new EnvState();
private Hashtable<String, Integer> initTokens = new Hashtable<String, Integer>();
static final long COND_WAIT_SECONDS = 3; // Max wait in seconds before timing out (and replying to RPC).
static final int BYTES_INT = 4;
static final int BYTES_DOUBLE = 8;
private static final Charset utf8 = Charset.forName("UTF-8");
// Service uses a single per-environment client connection - initiated by the remote environment.
private int port;
private TCPInputPoller missionPoller; // Used for command parsing and not actual communication.
private String version;
// AOG: From running experiments, I've found that MineRL can get stuck resetting the
// environment which causes huge delays while we wait for the Python side to time
// out and restart the Minecraft instace. Minecraft itself is normally in a recoverable
// state, but the MalmoEnvServer instance will be blocked in a tight spin loop trying
// handling a Peek request from the Python client. To unstick things, I've added this
// flag that can be set when we know things are in a bad state to abort the peek request.
// WARNING: THIS IS ONLY TREATING THE SYMPTOM AND NOT THE ROOT CAUSE
// The reason things are getting stuck is because the player is either dying or we're
// receiving a quit request while an episode reset is in progress.
private boolean abortRequest;
public void abort() {
System.out.println("AOG: MalmoEnvServer.abort");
abortRequest = true;
}
/***
* Malmo "Env" service.
* @param port the port the service listens on.
* @param missionPoller for plugging into existing comms handling.
*/
public MalmoEnvServer(String version, int port, TCPInputPoller missionPoller) {
this.version = version;
this.missionPoller = missionPoller;
this.port = port;
// AOG - Assume we don't wan't to be aborting in the first place
this.abortRequest = false;
}
/** Initialize malmo env configuration. For now either on or "legacy" AgentHost protocol.*/
static public void update(Configuration configs) {
envPolicy = configs.get(MalmoMod.ENV_CONFIGS, "env", "false").getBoolean();
}
public static boolean isEnv() {
return envPolicy;
}
/**
* Start servicing the MalmoEnv protocol.
* @throws IOException
*/
public void serve() throws IOException {
ServerSocket serverSocket = new ServerSocket(port);
serverSocket.setPerformancePreferences(0,2,1);
while (true) {
try {
final Socket socket = serverSocket.accept();
socket.setTcpNoDelay(true);
Thread thread = new Thread("EnvServerSocketHandler") {
public void run() {
boolean running = false;
try {
checkHello(socket);
while (true) {
DataInputStream din = new DataInputStream(socket.getInputStream());
int hdr = din.readInt();
byte[] data = new byte[hdr];
din.readFully(data);
String command = new String(data, utf8);
if (command.startsWith("<Step")) {
profiler.startSection("root");
long start = System.nanoTime();
step(command, socket, din);
profiler.endSection();
if (nsteps % 100 == 0 && debug){
List<Profiler.Result> dat = profiler.getProfilingData("root");
for(int qq = 0; qq < dat.size(); qq++){
Profiler.Result res = dat.get(qq);
System.out.println(res.profilerName + " " + res.totalUsePercentage + " "+ res.usePercentage);
}
}
} else if (command.startsWith("<Peek")) {
peek(command, socket, din);
} else if (command.startsWith("<Init")) {
init(command, socket);
} else if (command.startsWith("<Find")) {
find(command, socket);
} else if (command.startsWith("<MissionInit")) {
if (missionInit(din, command, socket))
{
running = true;
}
} else if (command.startsWith("<Quit")) {
quit(command, socket);
profiler.profilingEnabled = false;
} else if (command.startsWith("<Exit")) {
exit(command, socket);
profiler.profilingEnabled = false;
} else if (command.startsWith("<Close")) {
close(command, socket);
profiler.profilingEnabled = false;
} else if (command.startsWith("<Status")) {
status(command, socket);
} else if (command.startsWith("<Echo")) {
command = "<Echo>" + command + "</Echo>";
data = command.getBytes(utf8);
hdr = data.length;
DataOutputStream dout = new DataOutputStream(socket.getOutputStream());
dout.writeInt(hdr);
dout.write(data, 0, hdr);
dout.flush();
} else {
throw new IOException("Unknown env service command");
}
}
} catch (IOException ioe) {
// ioe.printStackTrace();
TCPUtils.Log(Level.SEVERE, "MalmoEnv socket error: " + ioe + " (can be on disconnect)");
// System.out.println("[ERROR] " + "MalmoEnv socket error: " + ioe + " (can be on disconnect)");
// TimeHelper.SyncManager.debugLog("[MALMO_ENV_SERVER] MalmoEnv socket error");
try {
if (running) {
TCPUtils.Log(Level.INFO,"Want to quit on disconnect.");
System.out.println("[LOGTOPY] " + "Want to quit on disconnect.");
setWantToQuit();
}
socket.close();
} catch (IOException ioe2) {
}
}
}
};
thread.start();
} catch (IOException ioe) {
TCPUtils.Log(Level.SEVERE, "MalmoEnv service exits on " + ioe);
}
}
}
private void checkHello(Socket socket) throws IOException {
DataInputStream din = new DataInputStream(socket.getInputStream());
int hdr = din.readInt();
if (hdr <= 0 || hdr > hello.length() + 8) // Version number may be somewhat longer in future.
throw new IOException("Invalid MalmoEnv hello header length");
byte[] data = new byte[hdr];
din.readFully(data);
if (!new String(data).startsWith(hello + version))
throw new IOException("MalmoEnv invalid protocol or version - expected " + hello + version);
}
// Handler for <MissionInit> messages.
private boolean missionInit(DataInputStream din, String command, Socket socket) throws IOException {
String ipOriginator = socket.getInetAddress().getHostName();
int hdr;
byte[] data;
hdr = din.readInt();
data = new byte[hdr];
din.readFully(data);
String id = new String(data, utf8);
TCPUtils.Log(Level.INFO,"Mission Init" + id);
String[] token = id.split(":");
String experimentId = token[0];
int role = Integer.parseInt(token[1]);
int reset = Integer.parseInt(token[2]);
int agentCount = Integer.parseInt(token[3]);
Boolean isSynchronous = Boolean.parseBoolean(token[4]);
Long seed = null;
if(token.length > 5)
seed = Long.parseLong(token[5]);
if(isSynchronous && agentCount > 1){
throw new IOException("Synchronous mode currently does not support multiple agents.");
}
port = -1;
boolean allTokensConsumed = true;
boolean started = false;
lock.lock();
try {
if (role == 0) {
String previousToken = experimentId + ":0:" + (reset - 1);
initTokens.remove(previousToken);
String myToken = experimentId + ":0:" + reset;
if (!initTokens.containsKey(myToken)) {
TCPUtils.Log(Level.INFO,"(Pre)Start " + role + " reset " + reset);
started = startUp(command, ipOriginator, experimentId, reset, agentCount, myToken, seed, isSynchronous);
if (started)
initTokens.put(myToken, 0);
} else {
started = true; // Pre-started previously.
}
// Check that all previous tokens have been consumed. If not don't proceed to mission.
allTokensConsumed = areAllTokensConsumed(experimentId, reset, agentCount);
if (!allTokensConsumed) {
try {
cond.await(COND_WAIT_SECONDS, TimeUnit.SECONDS);
} catch (InterruptedException ie) {
}
allTokensConsumed = areAllTokensConsumed(experimentId, reset, agentCount);
}
} else {
TCPUtils.Log(Level.INFO, "Start " + role + " reset " + reset);
started = startUp(command, ipOriginator, experimentId, reset, agentCount, experimentId + ":" + role + ":" + reset, seed, isSynchronous);
}
} finally {
lock.unlock();
}
DataOutputStream dout = new DataOutputStream(socket.getOutputStream());
dout.writeInt(BYTES_INT);
dout.writeInt(allTokensConsumed && started ? 1 : 0);
dout.flush();
dout.flush();
return allTokensConsumed && started;
}
private boolean areAllTokensConsumed(String experimentId, int reset, int agentCount) {
boolean allTokensConsumed = true;
for (int i = 1; i < agentCount; i++) {
String tokenForAgent = experimentId + ":" + i + ":" + (reset - 1);
if (initTokens.containsKey(tokenForAgent)) {
TCPUtils.Log(Level.FINE,"Mission init - unconsumed " + tokenForAgent);
allTokensConsumed = false;
}
}
return allTokensConsumed;
}
private boolean startUp(String command, String ipOriginator, String experimentId, int reset, int agentCount, String myToken, Long seed, Boolean isSynchronous) throws IOException {
// Clear out mission state
envState.reward = 0.0;
envState.commands.clear();
envState.obs = null;
envState.info = "";
envState.missionInit = command;
envState.done = false;
envState.quit = false;
envState.token = myToken;
envState.experimentId = experimentId;
envState.agentCount = agentCount;
envState.reset = reset;
envState.synchronous = isSynchronous;
envState.seed = seed;
return startUpMission(command, ipOriginator);
}
private boolean startUpMission(String command, String ipOriginator) throws IOException {
if (missionPoller == null)
return false;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
missionPoller.commandReceived(command, ipOriginator, dos);
dos.flush();
byte[] reply = baos.toByteArray();
ByteArrayInputStream bais = new ByteArrayInputStream(reply);
DataInputStream dis = new DataInputStream(bais);
int hdr = dis.readInt();
byte[] replyBytes = new byte[hdr];
dis.readFully(replyBytes);
String replyStr = new String(replyBytes);
if (replyStr.equals("MALMOOK")) {
TCPUtils.Log(Level.INFO, "MalmoEnvServer Mission starting ...");
return true;
} else if (replyStr.equals("MALMOBUSY")) {
TCPUtils.Log(Level.INFO, "MalmoEnvServer Busy - I want to quit");
this.envState.quit = true;
}
return false;
}
private static final int stepTagLength = "<Step_>".length(); // Step with option code.
private synchronized void stepSync(String command, Socket socket, DataInputStream din) throws IOException
{
// TimeHelper.SyncManager.debugLog("[MALMO_ENV_SERVER] <STEP> Entering synchronous step.");
nsteps += 1;
profiler.startSection("commandProcessing");
String actions = command.substring(stepTagLength, command.length() - (stepTagLength + 2));
int options = Character.getNumericValue(command.charAt(stepTagLength - 2));
boolean withInfo = options == 0 || options == 2;
// Prepare to write data to the client.
DataOutputStream dout = new DataOutputStream(socket.getOutputStream());
double reward = 0.0;
boolean done;
byte[] obs;
String info = "";
boolean sent = false;
// TimeHelper.SyncManager.debugLog("[MALMO_ENV_SERVER] <STEP> Acquiring lock for synchronous step.");
lock.lock();
try {
// TimeHelper.SyncManager.debugLog("[MALMO_ENV_SERVER] <STEP> Lock is acquired.");
done = envState.done;
// TODO Handle when the environment is done.
// Process the actions.
if (actions.contains("\n")) {
String[] cmds = actions.split("\\n");
for(String cmd : cmds) {
envState.commands.add(cmd);
}
} else {
if (!actions.isEmpty())
envState.commands.add(actions);
}
sent = true;
profiler.endSection(); //cmd
profiler.startSection("requestTick");
// TimeHelper.SyncManager.debugLog("[MALMO_ENV_SERVER] <STEP> Received: " + actions);
// TimeHelper.SyncManager.debugLog("[MALMO_ENV_SERVER] <STEP> Requesting tick.");
// Now wait to run a tick
// If synchronous mode is off then we should see if want to quit is true.
while(!TimeHelper.SyncManager.requestTick() && !done ){Thread.yield();}
// TimeHelper.SyncManager.debugLog("[MALMO_ENV_SERVER] <STEP> Tick request granted.");
profiler.endSection();
profiler.startSection("waitForTick");
// TimeHelper.SyncManager.debugLog("[MALMO_ENV_SERVER] <STEP> Waiting for tick.");
// Then wait until the tick is finished
while(!TimeHelper.SyncManager.isTickCompleted() && !done ){ Thread.yield();}
// TimeHelper.SyncManager.debugLog("[MALMO_ENV_SERVER] <STEP> TICK DONE. Getting observation.");
profiler.endSection();
profiler.startSection("getObservation");
// After which, get the observations.
obs = getObservation(done);
// TimeHelper.SyncManager.debugLog("[MALMO_ENV_SERVER] <STEP> Observation received. Getting info.");
profiler.endSection();
profiler.startSection("getInfo");
// Pick up rewards.
reward = envState.reward;
if (withInfo) {
info = envState.info;
// if(info == null)
// TimeHelper.SyncManager.debugLog("[MALMO_ENV_SERVER] <STEP> FILLING INFO: NULL");
// else
// TimeHelper.SyncManager.debugLog("[MALMO_ENV_SERVER] <STEP> FILLING " + info.toString());
}
done = envState.done;
// TimeHelper.SyncManager.debugLog("[MALMO_ENV_SERVER] <STEP> STATUS " + Boolean.toString(done));
envState.info = null;
envState.obs = null;
envState.reward = 0.0;
// TimeHelper.SyncManager.debugLog("[MALMO_ENV_SERVER] <STEP> Info received..");
profiler.endSection();
} finally {
lock.unlock();
}
// TimeHelper.SyncManager.debugLog("[MALMO_ENV_SERVER] <STEP> Lock released. Writing observation, info, done.");
profiler.startSection("writeObs");
dout.writeInt(obs.length);
dout.write(obs);
dout.writeInt(BYTES_DOUBLE + 2);
dout.writeDouble(reward);
dout.writeByte(done ? 1 : 0);
dout.writeByte(sent ? 1 : 0);
if (withInfo) {
byte[] infoBytes = info.getBytes(utf8);
dout.writeInt(infoBytes.length);
dout.write(infoBytes);
}
profiler.endSection(); //write obs
profiler.startSection("flush");
// TimeHelper.SyncManager.debugLog("[MALMO_ENV_SERVER] <STEP> Packets written. Flushing.");
dout.flush();
profiler.endSection(); // flush
// TimeHelper.SyncManager.debugLog("[MALMO_ENV_SERVER] <STEP> Done with step.");
}
// Handler for <Step_> messages. Single digit option code after _ specifies if turnkey and info are included in message.
private void step(String command, Socket socket, DataInputStream din) throws IOException {
if(envState.synchronous){
stepSync(command, socket, din);
}
else{
System.out.println("[ERROR] Asynchronous stepping is not supported in MineRL.");
}
}
// Handler for <Peek> messages.
private void peek(String command, Socket socket, DataInputStream din) throws IOException {
DataOutputStream dout = new DataOutputStream(socket.getOutputStream());
byte[] obs;
boolean done;
String info = "";
// AOG - As we've only seen issues with the peek reqest, I've focused my changes to just
// this function. Initially we want to be optimistic and assume we're not going to abort
// the request and my observations of event timings indicate that there is plenty of time
// between the peek request being received and the reset failing, so a race condition is
// unlikely.
abortRequest = false;
lock.lock();
try {
// TimeHelper.SyncManager.debugLog("[MALMO_ENV_SERVER] <PEEK> Waiting for pistol to fire.");
while(!TimeHelper.SyncManager.hasServerFiredPistol() && !abortRequest){
// Now wait to run a tick
while(!TimeHelper.SyncManager.requestTick() && !abortRequest){Thread.yield();}
// Then wait until the tick is finished
while(!TimeHelper.SyncManager.isTickCompleted() && !abortRequest){ Thread.yield();}
Thread.yield();
}
if (abortRequest) {
System.out.println("AOG: Aborting peek request");
// AOG - We detect the lack of observation within our Python wrapper and throw a slightly
// diferent exception that by-passes MineRLs automatic clean up code. If we were to report
// 'done', the MineRL detects this as a runtime error and kills the Minecraft process
// triggering a lengthy restart. So far from testing, Minecraft itself is fine can we can
// retry the reset, it's only the tight loops above that were causing things to stall and
// timeout.
// No observation
dout.writeInt(0);
// No info
dout.writeInt(0);
// Done
dout.writeInt(1);
dout.writeByte(0);
dout.flush();
return;
}
// TimeHelper.SyncManager.debugLog("[MALMO_ENV_SERVER] <PEEK> Pistol fired!.");
// Wait two ticks for the first observation from server to be propagated.
while(!TimeHelper.SyncManager.requestTick() ){Thread.yield();}
// Then wait until the tick is finished
while(!TimeHelper.SyncManager.isTickCompleted()){ Thread.yield();}
while(!TimeHelper.SyncManager.requestTick() ){Thread.yield();}
// Then wait until the tick is finished
while(!TimeHelper.SyncManager.isTickCompleted()){ Thread.yield();}
// TimeHelper.SyncManager.debugLog("[MALMO_ENV_SERVER] <PEEK> Getting observation.");
obs = getObservation(false);
// TimeHelper.SyncManager.debugLog("[MALMO_ENV_SERVER] <PEEK> Observation acquired.");
done = envState.done;
info = envState.info;
} finally {
lock.unlock();
}
dout.writeInt(obs.length);
dout.write(obs);
byte[] infoBytes = info.getBytes(utf8);
dout.writeInt(infoBytes.length);
dout.write(infoBytes);
dout.writeInt(1);
dout.writeByte(done ? 1 : 0);
dout.flush();
}
// Get the current observation. If none and not done wait for a short time.
public byte[] getObservation(boolean done) {
byte[] obs = envState.obs;
if (obs == null){
System.out.println("[ERROR] Video observation is null; please notify the developer.");
}
return obs;
}
// Handler for <Find> messages - used by non-zero roles to discover integrated server port from primary (role 0) service.
private final static int findTagLength = "<Find>".length();
private void find(String command, Socket socket) throws IOException {
Integer port;
lock.lock();
try {
String token = command.substring(findTagLength, command.length() - (findTagLength + 1));
TCPUtils.Log(Level.INFO, "Find token? " + token);
// Purge previous token.
String[] tokenSplits = token.split(":");
String experimentId = tokenSplits[0];
int role = Integer.parseInt(tokenSplits[1]);
int reset = Integer.parseInt(tokenSplits[2]);
String previousToken = experimentId + ":" + role + ":" + (reset - 1);
initTokens.remove(previousToken);
cond.signalAll();
// Check for next token. Wait for a short time if not already produced.
port = initTokens.get(token);
if (port == null) {
try {
cond.await(COND_WAIT_SECONDS, TimeUnit.SECONDS);
} catch (InterruptedException ie) {
}
port = initTokens.get(token);
if (port == null) {
port = 0;
TCPUtils.Log(Level.INFO,"Role " + role + " reset " + reset + " waiting for token.");
}
}
} finally {
lock.unlock();
}
DataOutputStream dout = new DataOutputStream(socket.getOutputStream());
dout.writeInt(BYTES_INT);
dout.writeInt(port);
dout.flush();
}
public boolean isSynchronous(){
return envState.synchronous;
}
// Handler for <Init> messages. These reset the service so use with care!
private void init(String command, Socket socket) throws IOException {
lock.lock();
try {
initTokens = new Hashtable<String, Integer>();
DataOutputStream dout = new DataOutputStream(socket.getOutputStream());
dout.writeInt(BYTES_INT);
dout.writeInt(1);
dout.flush();
} finally {
lock.unlock();
}
}
// Handler for <Quit> (quit mission) messages.
private void quit(String command, Socket socket) throws IOException {
lock.lock();
try {
if (!envState.done){
envState.quit = true;
}
// TimeHelper.SyncManager.debugLog("[MALMO_ENV_SERVER] <PEEK> Pistol fired!.");
// Wait two ticks for the first observation from server to be propagated.
while(!TimeHelper.SyncManager.requestTick() ){Thread.yield();}
// Then wait until the tick is finished
while(!TimeHelper.SyncManager.isTickCompleted()){ Thread.yield();}
DataOutputStream dout = new DataOutputStream(socket.getOutputStream());
dout.writeInt(BYTES_INT);
dout.writeInt(envState.done ? 1 : 0);
dout.flush();
} finally {
lock.unlock();
}
}
private final static int closeTagLength = "<Close>".length();
// Handler for <Close> messages.
private void close(String command, Socket socket) throws IOException {
lock.lock();
try {
String token = command.substring(closeTagLength, command.length() - (closeTagLength + 1));
initTokens.remove(token);
DataOutputStream dout = new DataOutputStream(socket.getOutputStream());
dout.writeInt(BYTES_INT);
dout.writeInt(1);
dout.flush();
} finally {
lock.unlock();
}
}
// Handler for <Status> messages.
private void status(String command, Socket socket) throws IOException {
lock.lock();
try {
String status = "{}"; // TODO Possibly have something more interesting to report.
DataOutputStream dout = new DataOutputStream(socket.getOutputStream());
byte[] statusBytes = status.getBytes(utf8);
dout.writeInt(statusBytes.length);
dout.write(statusBytes);
dout.flush();
} finally {
lock.unlock();
}
}
// Handler for <Exit> messages. These "kill the service" temporarily so use with care!f
private void exit(String command, Socket socket) throws IOException {
// lock.lock();
try {
// We may exit before we get a chance to reply.
TimeHelper.SyncManager.setSynchronous(false);
DataOutputStream dout = new DataOutputStream(socket.getOutputStream());
dout.writeInt(BYTES_INT);
dout.writeInt(1);
dout.flush();
ClientStateMachine.exitJava();
} finally {
// lock.unlock();
}
}
// Malmo client state machine interface methods:
public String getCommand() {
try {
String command = envState.commands.poll();
if (command == null)
return "";
else
return command;
} finally {
}
}
public void endMission() {
// lock.lock();
try {
// AOG - If the mission is ending, we always want to abort requests and they won't
// be able to progress to completion and will stall.
System.out.println("AOG: MalmoEnvServer.endMission");
abort();
envState.done = true;
envState.quit = false;
envState.missionInit = null;
if (envState.token != null) {
initTokens.remove(envState.token);
envState.token = null;
envState.experimentId = null;
envState.agentCount = 0;
envState.reset = 0;
// cond.signalAll();
}
// lock.unlock();
} finally {
}
}
// Record a Malmo "observation" json - as the env info since an environment "obs" is a video frame.
public void observation(String info) {
// Parsing obs as JSON would be slower but less fragile than extracting the turn_key using string search.
// lock.lock();
try {
// TimeHelper.SyncManager.debugLog("[MALMO_ENV_SERVER] <OBSERVATION> Inserting: " + info);
envState.info = info;
// cond.signalAll();
} finally {
// lock.unlock();
}
}
public void addRewards(double rewards) {
// lock.lock();
try {
envState.reward += rewards;
} finally {
// lock.unlock();
}
}
public void addFrame(byte[] frame) {
// lock.lock();
try {
envState.obs = frame; // Replaces current.
// cond.signalAll();
} finally {
// lock.unlock();
}
}
public void notifyIntegrationServerStarted(int integrationServerPort) {
lock.lock();
try {
if (envState.token != null) {
TCPUtils.Log(Level.INFO,"Integration server start up - token: " + envState.token);
addTokens(integrationServerPort, envState.token, envState.experimentId, envState.agentCount, envState.reset);
cond.signalAll();
} else {
TCPUtils.Log(Level.WARNING,"No mission token on integration server start up!");
}
} finally {
lock.unlock();
}
}
private void addTokens(int integratedServerPort, String myToken, String experimentId, int agentCount, int reset) {
initTokens.put(myToken, integratedServerPort);
// Place tokens for other agents to find.
for (int i = 1; i < agentCount; i++) {
String tokenForAgent = experimentId + ":" + i + ":" + reset;
initTokens.put(tokenForAgent, integratedServerPort);
}
}
// IWantToQuit implementation.
@Override
public boolean doIWantToQuit(MissionInit missionInit) {
// lock.lock();
try {
return envState.quit;
} finally {
// lock.unlock();
}
}
public Long getSeed(){
return envState.seed;
}
private void setWantToQuit() {
// lock.lock();
try {
envState.quit = true;
} finally {
if(TimeHelper.SyncManager.isSynchronous()){
// We want to dsynchronize everything.
TimeHelper.SyncManager.setSynchronous(false);
}
// lock.unlock();
}
}
@Override
public void prepare(MissionInit missionInit) {
}
@Override
public void cleanup() {
}
@Override
public String getOutcome() {
return "Env quit";
}
}

View File

@@ -0,0 +1,173 @@
import time
import glob
import pathlib
from malmo import MalmoPython, malmoutils
from malmo.launch_minecraft_in_background import launch_minecraft_in_background
class MalmoVideoRecorder:
DEFAULT_RECORDINGS_DIR = './logs/videos'
def __init__(self):
self.agent_host_bot = None
self.agent_host_camera = None
self.client_pool = None
self.is_malmo_initialized = False
def init_malmo(self, recordings_directory=DEFAULT_RECORDINGS_DIR):
if self.is_malmo_initialized:
return
launch_minecraft_in_background(
'/app/MalmoPlatform/Minecraft',
ports=[10000, 10001])
# Set up two agent hosts
self.agent_host_bot = MalmoPython.AgentHost()
self.agent_host_camera = MalmoPython.AgentHost()
# Create list of Minecraft clients to attach to. The agents must
# have been launched before calling record_malmo_video using
# init_malmo()
self.client_pool = MalmoPython.ClientPool()
self.client_pool.add(MalmoPython.ClientInfo('127.0.0.1', 10000))
self.client_pool.add(MalmoPython.ClientInfo('127.0.0.1', 10001))
# Use bot's agenthost to hold the command-line options
malmoutils.parse_command_line(
self.agent_host_bot,
['--record_video', '--recording_dir', recordings_directory])
self.is_malmo_initialized = True
def _start_mission(self, agent_host, mission, recording_spec, role):
used_attempts = 0
max_attempts = 5
while True:
try:
agent_host.startMission(
mission,
self.client_pool,
recording_spec,
role,
'')
break
except MalmoPython.MissionException as e:
errorCode = e.details.errorCode
if errorCode == (MalmoPython.MissionErrorCode
.MISSION_SERVER_WARMING_UP):
time.sleep(2)
elif errorCode == (MalmoPython.MissionErrorCode
.MISSION_INSUFFICIENT_CLIENTS_AVAILABLE):
print('Not enough Minecraft instances running.')
used_attempts += 1
if used_attempts < max_attempts:
print('Will wait in case they are starting up.')
time.sleep(300)
elif errorCode == (MalmoPython.MissionErrorCode
.MISSION_SERVER_NOT_FOUND):
print('Server not found.')
used_attempts += 1
if used_attempts < max_attempts:
print('Will wait and retry.')
time.sleep(2)
else:
used_attempts = max_attempts
if used_attempts >= max_attempts:
raise e
def _wait_for_start(self, agent_hosts):
start_flags = [False for a in agent_hosts]
start_time = time.time()
time_out = 120
while not all(start_flags) and time.time() - start_time < time_out:
states = [a.peekWorldState() for a in agent_hosts]
start_flags = [w.has_mission_begun for w in states]
errors = [e for w in states for e in w.errors]
if len(errors) > 0:
print("Errors waiting for mission start:")
for e in errors:
print(e.text)
raise Exception("Encountered errors while starting mission.")
if time.time() - start_time >= time_out:
raise Exception("Timed out while waiting for mission to start.")
def _get_xml(self, xml_file, seed):
with open(xml_file, 'r') as mission_file:
return mission_file.read().format(SEED_PLACEHOLDER=seed)
def _is_mission_running(self):
return self.agent_host_bot.peekWorldState().is_mission_running or \
self.agent_host_camera.peekWorldState().is_mission_running
def record_malmo_video(self, instructions, xml_file, seed):
'''
Replays a set of instructions through Malmo using two players. The
first player will navigate the specified mission based on the given
instructions. The second player observes the first player's moves,
which is captured in a video.
'''
if not self.is_malmo_initialized:
raise Exception('Malmo not initialized. Call init_malmo() first.')
# Set up the mission
my_mission = MalmoPython.MissionSpec(
self._get_xml(xml_file, seed),
True)
bot_recording_spec = MalmoPython.MissionRecordSpec()
camera_recording_spec = MalmoPython.MissionRecordSpec()
recordingsDirectory = \
malmoutils.get_recordings_directory(self.agent_host_bot)
if recordingsDirectory:
camera_recording_spec.setDestination(
recordingsDirectory + "//rollout_" + str(seed) + ".tgz")
camera_recording_spec.recordMP4(
MalmoPython.FrameType.VIDEO,
36,
2000000,
False)
# Start the agents
self._start_mission(
self.agent_host_bot,
my_mission,
bot_recording_spec,
0)
self._start_mission(
self.agent_host_camera,
my_mission,
camera_recording_spec,
1)
self._wait_for_start([self.agent_host_camera, self.agent_host_bot])
# Teleport the camera agent to the required position
self.agent_host_camera.sendCommand('tp -29 72 -6.7')
instruction_index = 0
while self._is_mission_running():
command = instructions[instruction_index]
instruction_index += 1
self.agent_host_bot.sendCommand(command)
# Pause for half a second - change this for faster/slower videos
time.sleep(0.5)
if instruction_index == len(instructions):
self.agent_host_bot.sendCommand("jump 1")
time.sleep(2)
self.agent_host_bot.sendCommand("quit")
# Wait a little for Malmo to reset before the
# next mission is started
time.sleep(2)
print("Video recorded.")

View File

@@ -0,0 +1,180 @@
import json
import logging
import gym
import minerl.env.core
import minerl.env.comms
import numpy as np
from ray.rllib.env.atari_wrappers import FrameStack
from minerl.env.malmo import InstanceManager
# Modify the MineRL timeouts to detect common errors
# quicker and speed up recovery
minerl.env.core.SOCKTIME = 60.0
minerl.env.comms.retry_timeout = 1
class EnvWrapper(minerl.env.core.MineRLEnv):
def __init__(self, xml, port):
InstanceManager.configure_malmo_base_port(port)
self.action_to_command_array = [
'move 1',
'camera 0 270',
'camera 0 90']
super().__init__(
xml,
gym.spaces.Box(low=0, high=255, shape=(84, 84, 3), dtype=np.uint8),
gym.spaces.Discrete(3)
)
self.metadata['video.frames_per_second'] = 2
def _setup_spaces(self, observation_space, action_space):
self.observation_space = observation_space
self.action_space = action_space
def _process_action(self, action_in) -> str:
assert self.action_space.contains(action_in)
assert action_in <= len(
self.action_to_command_array) - 1, 'action index out of bounds.'
return self.action_to_command_array[action_in]
def _process_observation(self, pov, info):
'''
Overwritten to simplify: returns only `pov` and
not as the MineRLEnv an obs_dict (observation directory)
'''
pov = np.frombuffer(pov, dtype=np.uint8)
if pov is None or len(pov) == 0:
raise Exception('Invalid observation, probably an aborted peek')
else:
pov = pov.reshape(
(self.height, self.width, self.depth)
)[::-1, :, :]
assert self.observation_space.contains(pov)
self._last_pov = pov
return pov
class TrackingEnv(gym.Wrapper):
def __init__(self, env):
super().__init__(env)
self._actions = [
self._forward,
self._turn_left,
self._turn_right
]
def _reset_state(self):
self._facing = (1, 0)
self._position = (0, 0)
self._visited = {}
self._update_visited()
def _forward(self):
self._position = (
self._position[0] + self._facing[0],
self._position[1] + self._facing[1]
)
def _turn_left(self):
self._facing = (self._facing[1], -self._facing[0])
def _turn_right(self):
self._facing = (-self._facing[1], self._facing[0])
def _encode_state(self):
return self._position
def _update_visited(self):
state = self._encode_state()
value = self._visited.get(state, 0)
self._visited[state] = value + 1
return value
def reset(self):
self._reset_state()
return super().reset()
def step(self, action):
o, r, d, i = super().step(action)
self._actions[action]()
revisit_count = self._update_visited()
if revisit_count == 0:
r += 0.1
return o, r, d, i
class TrajectoryWrapper(gym.Wrapper):
def __init__(self, env):
super().__init__(env)
self._trajectory = []
self._action_to_malmo_command_array = ['move 1', 'turn -1', 'turn 1']
def get_trajectory(self):
return self._trajectory
def _to_malmo_action(self, action_index):
return self._action_to_malmo_command_array[action_index]
def step(self, action):
self._trajectory.append(self._to_malmo_action(action))
o, r, d, i = super().step(action)
return o, r, d, i
class DummyEnv(gym.Env):
def __init__(self):
self.observation_space = gym.spaces.Box(
low=0,
high=255,
shape=(84, 84, 6),
dtype=np.uint8)
self.action_space = gym.spaces.Discrete(3)
# Define a function to create a MineRL environment
def create_env(config):
mission = config["mission"]
port = 1000 * config.worker_index + config.vector_index
print('*********************************************')
print(f'* Worker {config.worker_index} creating from \
mission: {mission}, port {port}')
print('*********************************************')
if config.worker_index == 0:
# The first environment is only used for checking the action
# and observation space. By using a dummy environment, there's
# no need to spin up a Minecraft instance behind it saving some
# CPU resources on the head node.
return DummyEnv()
env = EnvWrapper(mission, port)
env = TrackingEnv(env)
env = FrameStack(env, 2)
return env
def create_env_for_rollout(config):
mission = config['mission']
port = 1000 * config.worker_index + config.vector_index
print('*********************************************')
print(f'* Worker {config.worker_index} creating from \
mission: {mission}, port {port}')
print('*********************************************')
env = EnvWrapper(mission, port)
env = TrackingEnv(env)
env = FrameStack(env, 2)
env = TrajectoryWrapper(env)
return env

View File

@@ -0,0 +1,95 @@
<?xml version="1.0" encoding="UTF-8" standalone="no" ?>
<Mission xmlns="http://ProjectMalmo.microsoft.com" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<About>
<Summary>$(ENV_NAME)</Summary>
</About>
<ModSettings>
<MsPerTick>50</MsPerTick>
</ModSettings>
<ServerSection>
<ServerInitialConditions>
<Time>
<StartTime>6000</StartTime>
<AllowPassageOfTime>false</AllowPassageOfTime>
</Time>
<Weather>clear</Weather>
<AllowSpawning>false</AllowSpawning>
</ServerInitialConditions>
<ServerHandlers>
<FlatWorldGenerator generatorString="3;7,220*1,5*3,2;3;,biome_1"/>
<DrawingDecorator>
<DrawSphere x="-29" y="70" z="-2" radius="100" type="air"/>
<DrawCuboid x1="-34" y1="70" z1="-7" x2="-24" y2="70" z2="3" type="lava" />
</DrawingDecorator>
<MazeDecorator>
<Seed>random</Seed>
<SizeAndPosition width="5" length="5" height="10" xOrigin="-32" yOrigin="69" zOrigin="-5"/>
<StartBlock type="emerald_block" fixedToEdge="false"/>
<EndBlock type="lapis_block" fixedToEdge="false"/>
<PathBlock type="grass"/>
<FloorBlock type="air"/>
<GapBlock type="lava"/>
<GapProbability>0.6</GapProbability>
<AllowDiagonalMovement>false</AllowDiagonalMovement>
</MazeDecorator>
<ServerQuitFromTimeUp timeLimitMs="300000" description="out_of_time"/>
<ServerQuitWhenAnyAgentFinishes/>
</ServerHandlers>
</ServerSection>
<AgentSection mode="Survival">
<Name>AML_Bot</Name>
<AgentStart>
<Placement x="-28.5" y="71.0" z="-1.5" pitch="70" yaw="0"/>
</AgentStart>
<AgentHandlers>
<VideoProducer want_depth="false">
<Width>84</Width>
<Height>84</Height>
</VideoProducer>
<FileBasedPerformanceProducer/>
<ObservationFromFullInventory flat="false"/>
<ObservationFromFullStats/>
<HumanLevelCommands>
<ModifierList type="deny-list">
<command>moveMouse</command>
<command>inventory</command>
</ModifierList>
</HumanLevelCommands>
<CameraCommands/>
<ObservationFromCompass/>
<DiscreteMovementCommands/>
<RewardForMissionEnd>
<Reward description="out_of_time" reward="-1" />
</RewardForMissionEnd>
<RewardForTouchingBlockType>
<Block reward="-1.0" type="lava" behaviour="onceOnly"/>
<Block reward="1.0" type="lapis_block" behaviour="onceOnly"/>
</RewardForTouchingBlockType>
<RewardForSendingCommand reward="-0.02"/>
<AgentQuitFromTouchingBlockType>
<Block type="lava" />
<Block type="lapis_block" />
</AgentQuitFromTouchingBlockType>
<PauseCommand/>
<AgentQuitFromReachingCommandQuota total="50"/>
</AgentHandlers>
</AgentSection>
</Mission>

View File

@@ -0,0 +1,95 @@
<?xml version="1.0" encoding="UTF-8" standalone="no" ?>
<Mission xmlns="http://ProjectMalmo.microsoft.com" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<About>
<Summary>$(ENV_NAME)</Summary>
</About>
<ModSettings>
<MsPerTick>50</MsPerTick>
</ModSettings>
<ServerSection>
<ServerInitialConditions>
<Time>
<StartTime>6000</StartTime>
<AllowPassageOfTime>false</AllowPassageOfTime>
</Time>
<Weather>clear</Weather>
<AllowSpawning>false</AllowSpawning>
</ServerInitialConditions>
<ServerHandlers>
<FlatWorldGenerator generatorString="3;7,220*1,5*3,2;3;,biome_1"/>
<DrawingDecorator>
<DrawSphere x="-29" y="70" z="-2" radius="100" type="air"/>
<DrawCuboid x1="-34" y1="70" z1="-7" x2="-24" y2="70" z2="3" type="lava" />
</DrawingDecorator>
<MazeDecorator>
<Seed>{SEED_PLACEHOLDER}</Seed>
<SizeAndPosition width="6" length="6" height="10" xOrigin="-32" yOrigin="69" zOrigin="-5"/>
<StartBlock type="emerald_block" fixedToEdge="false"/>
<EndBlock type="lapis_block" fixedToEdge="false"/>
<PathBlock type="grass"/>
<FloorBlock type="air"/>
<GapBlock type="lava"/>
<GapProbability>0.6</GapProbability>
<AllowDiagonalMovement>false</AllowDiagonalMovement>
</MazeDecorator>
<ServerQuitFromTimeUp timeLimitMs="300000" description="out_of_time"/>
<ServerQuitWhenAnyAgentFinishes/>
</ServerHandlers>
</ServerSection>
<AgentSection mode="Survival">
<Name>AML_Bot</Name>
<AgentStart>
<Placement x="-28.5" y="71.0" z="-1.5" pitch="70" yaw="0"/>
</AgentStart>
<AgentHandlers>
<VideoProducer want_depth="false">
<Width>84</Width>
<Height>84</Height>
</VideoProducer>
<FileBasedPerformanceProducer/>
<ObservationFromFullInventory flat="false"/>
<ObservationFromFullStats/>
<HumanLevelCommands>
<ModifierList type="deny-list">
<command>moveMouse</command>
<command>inventory</command>
</ModifierList>
</HumanLevelCommands>
<CameraCommands/>
<ObservationFromCompass/>
<DiscreteMovementCommands/>
<RewardForMissionEnd>
<Reward description="out_of_time" reward="-1" />
</RewardForMissionEnd>
<RewardForTouchingBlockType>
<Block reward="-1.0" type="lava" behaviour="onceOnly"/>
<Block reward="1.0" type="lapis_block" behaviour="onceOnly"/>
</RewardForTouchingBlockType>
<RewardForSendingCommand reward="-0.02"/>
<AgentQuitFromTouchingBlockType>
<Block type="lava" />
<Block type="lapis_block" />
</AgentQuitFromTouchingBlockType>
<PauseCommand/>
<AgentQuitFromReachingCommandQuota total="50"/>
</AgentHandlers>
</AgentSection>
</Mission>

View File

@@ -0,0 +1,74 @@
<?xml version="1.0" encoding="UTF-8" standalone="no" ?>
<Mission xmlns="http://ProjectMalmo.microsoft.com" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<About>
<Summary>AML-Video-Gatherer</Summary>
</About>
<ModSettings>
<MsPerTick>50</MsPerTick>
</ModSettings>
<ServerSection>
<ServerInitialConditions>
<Time>
<StartTime>6000</StartTime>
<AllowPassageOfTime>false</AllowPassageOfTime>
</Time>
<Weather>clear</Weather>
<AllowSpawning>false</AllowSpawning>
</ServerInitialConditions>
<ServerHandlers>
<FlatWorldGenerator generatorString="3;7,220*1,5*3,2;3;,biome_1"/>
<MazeDecorator>
<Seed>{SEED_PLACEHOLDER}</Seed>
<SizeAndPosition width="6" length="6" height="10" xOrigin="-32" yOrigin="69" zOrigin="-5"/>
<StartBlock type="emerald_block" fixedToEdge="false"/>
<EndBlock type="lapis_block" fixedToEdge="false"/>
<PathBlock type="grass"/>
<FloorBlock type="air"/>
<GapBlock type="lava"/>
<GapProbability>0.6</GapProbability>
<AllowDiagonalMovement>false</AllowDiagonalMovement>
</MazeDecorator>
<ServerQuitFromTimeUp timeLimitMs="300000" description="out_of_time"/>
<ServerQuitWhenAnyAgentFinishes/>
</ServerHandlers>
</ServerSection>
<AgentSection mode="Survival">
<Name>Agent</Name>
<AgentStart>
<Placement x="-28.5" y="71.0" z="-1.5" yaw="0"/>
</AgentStart>
<AgentHandlers>
<HumanLevelCommands>
<ModifierList type="deny-list">
<command>moveMouse</command>
<command>inventory</command>
</ModifierList>
</HumanLevelCommands>
<DiscreteMovementCommands/>
<MissionQuitCommands/>
<AgentQuitFromReachingCommandQuota total="50"/>
</AgentHandlers>
</AgentSection>
<AgentSection mode="Spectator">
<Name>Camera_Bot</Name>
<AgentStart>
<Placement x="-29" y="72" z="-6.7" pitch="16" yaw="0"/>
</AgentStart>
<AgentHandlers>
<VideoProducer want_depth="false">
<Width>860</Width>
<Height>480</Height>
</VideoProducer>
<AbsoluteMovementCommands/>
</AgentHandlers>
</AgentSection>
</Mission>

View File

@@ -0,0 +1,130 @@
import argparse
import os
import re
from azureml.core import Run
from azureml.core.model import Model
from minecraft_environment import create_env_for_rollout
from malmo_video_recorder import MalmoVideoRecorder
from gym import wrappers
import ray
import ray.tune as tune
from ray.rllib import rollout
from ray.tune.registry import get_trainable_cls
def write_mission_file_for_seed(mission_file, seed):
with open(mission_file, 'r') as base_file:
mission_file_path = mission_file.replace('v0', seed)
content = base_file.read().format(SEED_PLACEHOLDER=seed)
mission_file = open(mission_file_path, 'w')
mission_file.writelines(content)
mission_file.close()
return mission_file_path
def run_rollout(trainable_type, mission_file, seed):
# Writes the mission file for minerl
mission_file_path = write_mission_file_for_seed(mission_file, seed)
# Instantiate the agent. Note: the IMPALA trainer implementation in
# Ray uses an AsyncSamplesOptimizer. Under the hood, this starts a
# LearnerThread which will wait for training samples. This will fail
# after a timeout, but has no influence on the rollout. See
# https://github.com/ray-project/ray/blob/708dff6d8f7dd6f7919e06c1845f1fea0cca5b89/rllib/optimizers/aso_learner.py#L66
config = {
"env_config": {
"mission": mission_file_path,
"is_rollout": True,
"seed": seed
},
"num_workers": 0
}
cls = get_trainable_cls(args.run)
agent = cls(env="Minecraft", config=config)
# The optimizer is not needed during a rollout
agent.optimizer.stop()
# Load state from checkpoint
agent.restore(f'{checkpoint_path}/{checkpoint_file}')
# Get a reference to the environment
env = agent.workers.local_worker().env
# Let the agent choose actions until the game is over
obs = env.reset()
done = False
total_reward = 0
while not done:
action = agent.compute_action(obs)
obs, reward, done, info = env.step(action)
total_reward += reward
print(f'Total reward using seed {seed}: {total_reward}')
# This avoids a sigterm trace in the logs, see minerl.env.malmo.Instance
env.instance.watcher_process.kill()
env.close()
agent.stop()
return env.get_trajectory()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--model_name', required=True)
parser.add_argument('--run', required=False, default="IMPALA")
args = parser.parse_args()
# Register custom Minecraft environment
tune.register_env("Minecraft", create_env_for_rollout)
ray.init(address='auto')
# Download the model files (contains a checkpoint)
ws = Run.get_context().experiment.workspace
model = Model(ws, args.model_name)
checkpoint_path = model.download(exist_ok=True)
files_ = os.listdir(checkpoint_path)
cp_pattern = re.compile('^checkpoint-\\d+$')
checkpoint_file = None
for f_ in files_:
if cp_pattern.match(f_):
checkpoint_file = f_
if checkpoint_file is None:
raise Exception("No checkpoint file found.")
# These are the Minecraft mission seeds for the rollouts
rollout_seeds = ['1234', '43289', '65224', '983341']
# Initialize the Malmo video recorder
video_recorder = MalmoVideoRecorder()
video_recorder.init_malmo()
# Path references to the mission files
base_training_mission_file = \
'minecraft_missions/lava_maze_rollout-v0.xml'
base_video_recording_mission_file = \
'minecraft_missions/lava_maze_rollout_video.xml'
for seed in rollout_seeds:
trajectory = run_rollout(
args.run,
base_training_mission_file,
seed)
video_recorder.record_malmo_video(
trajectory,
base_video_recording_mission_file,
seed)

View File

@@ -0,0 +1,49 @@
import os
import ray
import ray.tune as tune
from utils import callbacks
from minecraft_environment import create_env
def stop(trial_id, result):
max_train_time = int(os.environ.get("AML_MAX_TRAIN_TIME_SECONDS", 5 * 60 * 60))
return result["episode_reward_mean"] >= 1 \
or result["time_total_s"] >= max_train_time
if __name__ == '__main__':
tune.register_env("Minecraft", create_env)
ray.init(address='auto')
tune.run(
run_or_experiment="IMPALA",
config={
"env": "Minecraft",
"env_config": {
"mission": "minecraft_missions/lava_maze-v0.xml"
},
"num_workers": 10,
"num_cpus_per_worker": 2,
"rollout_fragment_length": 50,
"train_batch_size": 1024,
"replay_buffer_num_slots": 4000,
"replay_proportion": 10,
"learner_queue_timeout": 900,
"num_sgd_iter": 2,
"num_data_loader_buffers": 2,
"exploration_config": {
"type": "EpsilonGreedy",
"initial_epsilon": 1.0,
"final_epsilon": 0.02,
"epsilon_timesteps": 500000
},
"callbacks": {"on_train_result": callbacks.on_train_result},
},
stop=stop,
checkpoint_at_end=True,
local_dir='./logs'
)

View File

@@ -0,0 +1,18 @@
'''RLlib callbacks module:
Common callback methods to be passed to RLlib trainer.
'''
from azureml.core import Run
def on_train_result(info):
'''Callback on train result to record metrics returned by trainer.
'''
run = Run.get_context()
run.log(
name='episode_reward_mean',
value=info["result"]["episode_reward_mean"])
run.log(
name='episodes_total',
value=info["result"]["episodes_total"])

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.2 MiB

View File

@@ -0,0 +1,928 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Copyright (c) Microsoft Corporation. All rights reserved.\n",
"\n",
"Licensed under the MIT License."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/reinforcement-learning/minecraft-on-distributed-compute/minecraft.png)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Reinforcement Learning in Azure Machine Learning - Training a Minecraft agent using custom environments\n",
"\n",
"This tutorial will show how to set up a more complex reinforcement\n",
"learning (RL) training scenario. It demonstrates how to train an agent to\n",
"navigate through a lava maze in the Minecraft game using Azure Machine\n",
"Learning.\n",
"\n",
"**Please note:** This notebook trains an agent on a randomly generated\n",
"Minecraft level. As a result, on rare occasions, a training run may fail\n",
"to produce a model that can solve the maze. If this happens, you can\n",
"re-run the training step as indicated below.\n",
"\n",
"**Please note:** This notebook uses 1 NC6 type node and 8 D2 type nodes\n",
"for up to 5 hours of training, which corresponds to approximately $9.06 (USD)\n",
"as of May 2020.\n",
"\n",
"Minecraft is currently one of the most popular video\n",
"games and as such has been a study object for RL. [Project \n",
"Malmo](https://www.microsoft.com/en-us/research/project/project-malmo/) is\n",
"a platform for artificial intelligence experimentation and research built on\n",
"top of Minecraft. We will use Minecraft [gym](https://gym.openai.com) environments from Project\n",
"Malmo's 2019 MineRL competition, which are part of the \n",
"[MineRL](http://minerl.io/docs/index.html) Python package.\n",
"\n",
"Minecraft environments require a display to run, so we will demonstrate\n",
"how to set up a virtual display within the docker container used for training.\n",
"Learning will be based on the agent's visual observations. To\n",
"generate the necessary amount of sample data, we will run several\n",
"instances of the Minecraft game in parallel. Below, you can see a video of\n",
"a trained agent navigating a lava maze. Starting from the green position,\n",
"it moves to the blue position by moving forward, turning left or turning right:\n",
"\n",
"<table style=\"width:50%\">\n",
" <tr>\n",
" <th style=\"text-align: center;\">\n",
" <img src=\"./images/lava_maze_minecraft.gif\" alt=\"Minecraft lava maze\" align=\"middle\" margin-left=\"auto\" margin-right=\"auto\"/>\n",
" </th>\n",
" </tr>\n",
" <tr style=\"text-align: center;\">\n",
" <th>Fig 1. Video of a trained Minecraft agent navigating a lava maze.</th>\n",
" </tr>\n",
"</table>\n",
"\n",
"The tutorial will cover the following steps:\n",
"- Initializing Azure Machine Learning resources for training\n",
"- Training the RL agent with Azure Machine Learning service\n",
"- Monitoring training progress\n",
"- Reviewing training results\n",
"\n",
"\n",
"## Prerequisites\n",
"\n",
"The user should have completed the Azure Machine Learning introductory tutorial.\n",
"You will need to make sure that you have a valid subscription id, a resource group and a\n",
"workspace. For detailed instructions see [Tutorial: Get started creating\n",
"your first ML experiment.](https://docs.microsoft.com/en-us/azure/machine-learning/tutorial-1st-experiment-sdk-setup)\n",
"\n",
"In addition, please follow the instructions in the [Reinforcement Learning in\n",
"Azure Machine Learning - Setting Up Development Environment](../setup/devenv_setup.ipynb)\n",
"notebook to correctly set up a Virtual Network which is required for completing \n",
"this tutorial.\n",
"\n",
"While this is a standalone notebook, we highly recommend going over the\n",
"introductory notebooks for RL first.\n",
"- Getting started:\n",
" - [RL using a compute instance with Azure Machine Learning service](../cartpole-on-compute-instance/cartpole_ci.ipynb)\n",
" - [Using Azure Machine Learning compute](../cartpole-on-single-compute/cartpole_sc.ipynb)\n",
"- [Scaling RL training runs with Azure Machine Learning service](../atari-on-distributed-compute/pong_rllib.ipynb)\n",
"\n",
"\n",
"## Initialize resources\n",
"\n",
"All required Azure Machine Learning service resources for this tutorial can be set up from Jupyter.\n",
"This includes:\n",
"- Connecting to your existing Azure Machine Learning workspace.\n",
"- Creating an experiment to track runs.\n",
"- Creating remote compute targets for [Ray](https://docs.ray.io/en/latest/index.html).\n",
"\n",
"### Azure Machine Learning SDK\n",
"\n",
"Display the Azure Machine Learning SDK version."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import azureml.core\n",
"print(\"Azure Machine Learning SDK Version: \", azureml.core.VERSION) "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Connect to workspace\n",
"\n",
"Get a reference to an existing Azure Machine Learning workspace."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.core import Workspace\n",
"\n",
"ws = Workspace.from_config()\n",
"print(ws.name, ws.location, ws.resource_group, sep=' | ')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Create an experiment\n",
"\n",
"Create an experiment to track the runs in your workspace. A\n",
"workspace can have multiple experiments and each experiment\n",
"can be used to track multiple runs (see [documentation](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.experiment.experiment?view=azure-ml-py)\n",
"for details)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"nbpresent": {
"id": "bc70f780-c240-4779-96f3-bc5ef9a37d59"
}
},
"outputs": [],
"source": [
"from azureml.core import Experiment\n",
"\n",
"exp = Experiment(workspace=ws, name='minecraft-maze')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Create or attach an existing compute resource\n",
"\n",
"A compute target is a designated compute resource where you\n",
"run your training script. For more information, see [What\n",
"are compute targets in Azure Machine Learning service?](https://docs.microsoft.com/en-us/azure/machine-learning/concept-compute-target).\n",
"\n",
"#### GPU target for Ray head\n",
"\n",
"In the experiment setup for this tutorial, the Ray head node\n",
"will run on a GPU-enabled node. A maximum cluster size\n",
"of 1 node is therefore sufficient. If you wish to run\n",
"multiple experiments in parallel using the same GPU\n",
"cluster, you may elect to increase this number. The cluster\n",
"will automatically scale down to 0 nodes when no training jobs\n",
"are scheduled (see `min_nodes`).\n",
"\n",
"The code below creates a compute cluster of GPU-enabled NC6\n",
"nodes. If the cluster with the specified name is already in\n",
"your workspace the code will skip the creation process.\n",
"\n",
"Note that we must specify a Virtual Network during compute\n",
"creation to allow communication between the cluster running\n",
"the Ray head node and the additional Ray compute nodes. For\n",
"details on how to setup the Virtual Network, please follow the\n",
"instructions in the \"Prerequisites\" section above.\n",
"\n",
"**Note: Creation of a compute resource can take several minutes**"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.core.compute import ComputeTarget, AmlCompute\n",
"from azureml.core.compute_target import ComputeTargetException\n",
"\n",
"# please enter the name of your Virtual Network (see Prerequisites -> Workspace setup)\n",
"vnet_name = 'your_vnet'\n",
"\n",
"# name of the Virtual Network subnet ('default' the default name)\n",
"subnet_name = 'default'\n",
"\n",
"gpu_cluster_name = 'gpu-cluster-nc6'\n",
"\n",
"try:\n",
" gpu_cluster = ComputeTarget(workspace=ws, name=gpu_cluster_name)\n",
" print('Found existing compute target')\n",
"except ComputeTargetException:\n",
" print('Creating a new compute target...')\n",
" compute_config = AmlCompute.provisioning_configuration(\n",
" vm_size='Standard_NC6',\n",
" min_nodes=0,\n",
" max_nodes=1,\n",
" vnet_resourcegroup_name=ws.resource_group,\n",
" vnet_name=vnet_name,\n",
" subnet_name=subnet_name)\n",
"\n",
" gpu_cluster = ComputeTarget.create(ws, gpu_cluster_name, compute_config)\n",
" gpu_cluster.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)\n",
"\n",
" print('Cluster created.')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### CPU target for additional Ray nodes\n",
"\n",
"The code below creates a compute cluster of D2 nodes. If the cluster with the specified name is already in your workspace the code will skip the creation process.\n",
"\n",
"This cluster will be used to start additional Ray nodes\n",
"increasing the clusters CPU resources.\n",
"\n",
"**Note: Creation of a compute resource can take several minutes**"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"cpu_cluster_name = 'cpu-cluster-d2'\n",
"\n",
"try:\n",
" cpu_cluster = ComputeTarget(workspace=ws, name=cpu_cluster_name)\n",
" print('Found existing compute target')\n",
"except ComputeTargetException:\n",
" print('Creating a new compute target...')\n",
" compute_config = AmlCompute.provisioning_configuration(\n",
" vm_size='STANDARD_D2',\n",
" min_nodes=0,\n",
" max_nodes=10,\n",
" vnet_resourcegroup_name=ws.resource_group,\n",
" vnet_name=vnet_name,\n",
" subnet_name=subnet_name)\n",
"\n",
" cpu_cluster = ComputeTarget.create(ws, cpu_cluster_name, compute_config)\n",
" cpu_cluster.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)\n",
"\n",
" print('Cluster created.')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Training the agent\n",
"\n",
"### Training environments\n",
"\n",
"This tutorial uses custom docker images (CPU and GPU respectively)\n",
"with the necessary software installed. The\n",
"[Environment](https://docs.microsoft.com/en-us/azure/machine-learning/how-to-use-environments)\n",
"class stores the configuration for the training environment. The docker\n",
"image is set via `env.docker.base_image` which can point to any\n",
"publicly available docker image. `user_managed_dependencies`\n",
"is set so that the preinstalled Python packages in the image are preserved.\n",
"\n",
"Note that since Minecraft requires a display to start, we set the `interpreter_path`\n",
"such that the Python process is started via **xvfb-run**."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"from azureml.core import Environment\n",
"\n",
"max_train_time = os.environ.get(\"AML_MAX_TRAIN_TIME_SECONDS\", 5 * 60 * 60)\n",
"\n",
"def create_env(env_type):\n",
" env = Environment(name='minecraft-{env_type}'.format(env_type=env_type))\n",
"\n",
" env.docker.enabled = True\n",
" env.docker.base_image = 'akdmsft/minecraft-{env_type}'.format(env_type=env_type)\n",
"\n",
" env.python.interpreter_path = \"xvfb-run -s '-screen 0 640x480x16 -ac +extension GLX +render' python\"\n",
" env.environment_variables[\"AML_MAX_TRAIN_TIME_SECONDS\"] = str(max_train_time)\n",
" env.python.user_managed_dependencies = True\n",
" \n",
" return env\n",
" \n",
"cpu_minecraft_env = create_env('cpu')\n",
"gpu_minecraft_env = create_env('gpu')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Training script\n",
"\n",
"As described above, we use the MineRL Python package to launch\n",
"Minecraft game instances. MineRL provides several OpenAI gym\n",
"environments for different scenarios, such as chopping wood.\n",
"Besides predefined environments, MineRL lets its users create\n",
"custom Minecraft environments through\n",
"[minerl.env](http://minerl.io/docs/api/env.html). In the helper\n",
"file **minecraft_environment.py** provided with this tutorial, we use the\n",
"latter option to customize a Minecraft level with a lava maze\n",
"that the agent has to navigate. The agent receives a negative\n",
"reward of -1 for falling into the lava, a negative reward of\n",
"-0.02 for sending a command (i.e. navigating through the maze\n",
"with fewer actions yields a higher total reward) and a positive reward\n",
"of 1 for reaching the goal. To encourage the agent to explore\n",
"the maze, it also receives a positive reward of 0.1 for visiting\n",
"a tile for the first time.\n",
"\n",
"The agent learns purely from visual observations and the image\n",
"is scaled to an 84x84 format, stacking four frames. For the\n",
"purposes of this example, we use a small action space of size\n",
"three: move forward, turn 90 degrees to the left, and turn 90\n",
"degrees to the right.\n",
"\n",
"The training script itself registers the function to create training\n",
"environments with the `tune.register_env` function and connects to\n",
"the Ray cluster Azure Machine Learning service started on the GPU \n",
"and CPU nodes. Lastly, it starts a RL training run with `tune.run()`.\n",
"\n",
"We recommend setting the `local_dir` parameter to `./logs` as this\n",
"directory will automatically become available as part of the training\n",
"run's files in the Azure Portal. The Tensorboard integration\n",
"(see \"View the Tensorboard\" section below) also depends on the files'\n",
"availability. For a list of common parameter options, please refer\n",
"to the [Ray documentation](https://docs.ray.io/en/latest/rllib-training.html#common-parameters).\n",
"\n",
"\n",
"```python\n",
"# Taken from minecraft_environment.py and minecraft_train.py\n",
"\n",
"# Define a function to create a MineRL environment\n",
"def create_env(config):\n",
" mission = config['mission']\n",
" port = 1000 * config.worker_index + config.vector_index\n",
" print('*********************************************')\n",
" print(f'* Worker {config.worker_index} creating from mission: {mission}, port {port}')\n",
" print('*********************************************')\n",
"\n",
" if config.worker_index == 0:\n",
" # The first environment is only used for checking the action and observation space.\n",
" # By using a dummy environment, there's no need to spin up a Minecraft instance behind it\n",
" # saving some CPU resources on the head node.\n",
" return DummyEnv()\n",
"\n",
" env = EnvWrapper(mission, port)\n",
" env = TrackingEnv(env)\n",
" env = FrameStack(env, 2)\n",
" \n",
" return env\n",
"\n",
"\n",
"def stop(trial_id, result):\n",
" return result[\"episode_reward_mean\"] >= 1 \\\n",
" or result[\"time_total_s\"] > 5 * 60 * 60\n",
"\n",
"\n",
"if __name__ == '__main__':\n",
" tune.register_env(\"Minecraft\", create_env)\n",
"\n",
" ray.init(address='auto')\n",
"\n",
" tune.run(\n",
" run_or_experiment=\"IMPALA\",\n",
" config={\n",
" \"env\": \"Minecraft\",\n",
" \"env_config\": {\n",
" \"mission\": \"minecraft_missions/lava_maze-v0.xml\"\n",
" },\n",
" \"num_workers\": 10,\n",
" \"num_cpus_per_worker\": 2,\n",
" \"rollout_fragment_length\": 50,\n",
" \"train_batch_size\": 1024,\n",
" \"replay_buffer_num_slots\": 4000,\n",
" \"replay_proportion\": 10,\n",
" \"learner_queue_timeout\": 900,\n",
" \"num_sgd_iter\": 2,\n",
" \"num_data_loader_buffers\": 2,\n",
" \"exploration_config\": {\n",
" \"type\": \"EpsilonGreedy\",\n",
" \"initial_epsilon\": 1.0,\n",
" \"final_epsilon\": 0.02,\n",
" \"epsilon_timesteps\": 500000\n",
" },\n",
" \"callbacks\": {\"on_train_result\": callbacks.on_train_result},\n",
" },\n",
" stop=stop,\n",
" checkpoint_at_end=True,\n",
" local_dir='./logs'\n",
" )\n",
"```\n",
"\n",
"### Submitting a training run\n",
"\n",
"Below, you create the training run using a `ReinforcementLearningEstimator`\n",
"object, which contains all the configuration parameters for this experiment:\n",
"- `source_directory`: Contains the training script and helper files to be\n",
"copied onto the node running the Ray head.\n",
"- `entry_script`: The training script, described in more detail above..\n",
"- `compute_target`: The compute target for the Ray head and training\n",
"script execution.\n",
"- `environment`: The Azure machine learning environment definition for\n",
"the node running the Ray head.\n",
"- `worker_configuration`: The configuration object for the additional\n",
"Ray nodes to be attached to the Ray cluster:\n",
" - `compute_target`: The compute target for the additional Ray nodes.\n",
" - `node_count`: The number of nodes to attach to the Ray cluster.\n",
" - `environment`: The environment definition for the additional Ray nodes.\n",
"- `max_run_duration_seconds`: The time after which to abort the run if it\n",
"is still running.\n",
"- `shm_size`: The size of docker container's shared memory block. \n",
"\n",
"For more details, please take a look at the [online documentation](https://docs.microsoft.com/en-us/python/api/azureml-contrib-reinforcementlearning/?view=azure-ml-py)\n",
"for Azure Machine Learning service's reinforcement learning offering.\n",
"\n",
"We configure 8 extra D2 (worker) nodes for the Ray cluster, giving us a total of\n",
"22 CPUs and 1 GPU. The GPU and one CPU are used by the IMPALA learner,\n",
"and each MineRL environment receives 2 CPUs allowing us to spawn a total\n",
"of 10 rollout workers (see `num_workers` parameter in the training script).\n",
"\n",
"\n",
"Lastly, the `RunDetails` widget displays information about the submitted\n",
"RL experiment, including a link to the Azure portal with more details."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.contrib.train.rl import ReinforcementLearningEstimator, WorkerConfiguration\n",
"from azureml.widgets import RunDetails\n",
"\n",
"worker_config = WorkerConfiguration(\n",
" compute_target=cpu_cluster, \n",
" node_count=8,\n",
" environment=cpu_minecraft_env)\n",
"\n",
"rl_est = ReinforcementLearningEstimator(\n",
" source_directory='files',\n",
" entry_script='minecraft_train.py',\n",
" compute_target=gpu_cluster,\n",
" environment=gpu_minecraft_env,\n",
" worker_configuration=worker_config,\n",
" max_run_duration_seconds=6 * 60 * 60,\n",
" shm_size=1024 * 1024 * 1024 * 30)\n",
"\n",
"train_run = exp.submit(rl_est)\n",
"\n",
"RunDetails(train_run).show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# If you wish to cancel the run before it completes, uncomment and execute:\n",
"#train_run.cancel()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Monitoring training progress\n",
"\n",
"### View the Tensorboard\n",
"\n",
"The Tensorboard can be displayed via the Azure Machine Learning service's\n",
"[Tensorboard API](https://docs.microsoft.com/en-us/azure/machine-learning/how-to-monitor-tensorboard).\n",
"When running locally, please make sure to follow the instructions in the\n",
"link and install required packages. Running this cell will output a URL\n",
"for the Tensorboard.\n",
"\n",
"Note that the training script sets the log directory when starting RLlib\n",
"via the `local_dir` parameter. `./logs` will automatically appear in\n",
"the downloadable files for a run. Since this script is executed on the\n",
"Ray head node run, we need to get a reference to it as shown below.\n",
"\n",
"The Tensorboard API will continuously stream logs from the run.\n",
"\n",
"**Note: It may take a couple of minutes after the run is in \"Running\" state\n",
"before Tensorboard files are available and the board will refresh automatically**"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import time\n",
"from azureml.tensorboard import Tensorboard\n",
"\n",
"head_run = None\n",
"\n",
"timeout = 60\n",
"while timeout > 0 and head_run is None:\n",
" timeout -= 1\n",
" \n",
" try:\n",
" head_run = next(r for r in train_run.get_children() if r.id.endswith('head'))\n",
" except StopIteration:\n",
" time.sleep(1)\n",
"\n",
"tb = Tensorboard([head_run])\n",
"tb.start()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Review results\n",
"\n",
"Please ensure that the training run has completed before continuing with this section."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"train_run.wait_for_completion()\n",
"\n",
"print('Training run completed.')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Please note:** If the final \"episode_reward_mean\" metric from the training run is negative,\n",
"the produced model does not solve the problem of navigating the maze well. You can view\n",
"the metric on the Tensorboard or in \"Metrics\" section of the head run in the Azure Machine Learning\n",
"portal. We recommend training a new model by rerunning the notebook starting from \"Submitting a training run\".\n",
"\n",
"\n",
"### Export final model\n",
"\n",
"The key result from the training run is the final checkpoint\n",
"containing the state of the IMPALA trainer (model) upon meeting the\n",
"stopping criteria specified in `minecraft_train.py`.\n",
"\n",
"Azure Machine Learning service offers the [Model.register()](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.model.model?view=azure-ml-py)\n",
"API which allows you to persist the model files from the\n",
"training run. We identify the directory containing the\n",
"final model written during the training run and register\n",
"it with Azure Machine Learning service. We use a Dataset\n",
"object to filter out the correct files."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import re\n",
"import tempfile\n",
"\n",
"from azureml.core import Dataset\n",
"\n",
"path_prefix = os.path.join(tempfile.gettempdir(), 'tmp_training_artifacts')\n",
"\n",
"run_artifacts_path = os.path.join('azureml', head_run.id)\n",
"datastore = ws.get_default_datastore()\n",
"\n",
"run_artifacts_ds = Dataset.File.from_files(datastore.path(os.path.join(run_artifacts_path, '**')))\n",
"\n",
"cp_pattern = re.compile('.*checkpoint-\\\\d+$')\n",
"\n",
"checkpoint_files = [file for file in run_artifacts_ds.to_path() if cp_pattern.match(file)]\n",
"\n",
"# There should only be one checkpoint with our training settings...\n",
"final_checkpoint = os.path.dirname(os.path.join(run_artifacts_path, os.path.normpath(checkpoint_files[-1][1:])))\n",
"datastore.download(target_path=path_prefix, prefix=final_checkpoint.replace('\\\\', '/'), show_progress=True)\n",
"\n",
"print('Download complete.')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.core.model import Model\n",
"\n",
"model_name = 'final_model_minecraft_maze'\n",
"\n",
"model = Model.register(\n",
" workspace=ws,\n",
" model_path=os.path.join(path_prefix, final_checkpoint),\n",
" model_name=model_name,\n",
" description='Model of an agent trained to navigate a lava maze in Minecraft.')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Models can be used through a varity of APIs. Please see the\n",
"[documentation](https://docs.microsoft.com/en-us/azure/machine-learning/how-to-deploy-and-where)\n",
"for more details.\n",
"\n",
"### Test agent performance in a rollout\n",
"\n",
"To observe the trained agent's behavior, it is a common practice to\n",
"view its behavior in a rollout. The previous reinforcement learning\n",
"tutorials explain rollouts in more detail.\n",
"\n",
"The provided `minecraft_rollout.py` script loads the final checkpoint\n",
"of the trained agent from the model registered with Azure Machine Learning\n",
"service. It then starts a rollout on 4 different lava maze layouts, that\n",
"are all larger and thus more difficult than the maze the agent was trained\n",
"on. The script further records videos by replaying the agent's decisions\n",
"in [Malmo](https://github.com/microsoft/malmo). Malmo supports multiple\n",
"agents in the same environment, thus allowing us to capture videos that\n",
"depict the agent from another agent's perspective. The provided\n",
"`malmo_video_recorder.py` file and the Malmo Github repository have more\n",
"details on the video recording setup.\n",
"\n",
"You can view the rewards for each rollout episode in the logs for the 'head'\n",
"run submitted below. In some episodes, the agent may fail to reach the goal\n",
"due to the higher level of difficulty - in practice, we could continue\n",
"training the agent on harder tasks starting with the final checkpoint."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"script_params = {\n",
" '--model_name': model_name\n",
"}\n",
"\n",
"rollout_est = ReinforcementLearningEstimator(\n",
" source_directory='files',\n",
" entry_script='minecraft_rollout.py',\n",
" script_params=script_params,\n",
" compute_target=gpu_cluster,\n",
" environment=gpu_minecraft_env,\n",
" shm_size=1024 * 1024 * 1024 * 30)\n",
"\n",
"rollout_run = exp.submit(rollout_est)\n",
"\n",
"RunDetails(rollout_run).show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### View videos captured during rollout\n",
"\n",
"To inspect the agent's training progress you can view the videos captured\n",
"during the rollout episodes. First, ensure that the training run has\n",
"completed."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"rollout_run.wait_for_completion()\n",
"\n",
"head_run_rollout = next(r for r in rollout_run.get_children() if r.id.endswith('head'))\n",
"\n",
"print('Rollout completed.')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Next, you need to download the video files from the training run. We use a\n",
"Dataset to filter out the video files which are in tgz archives."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"rollout_run_artifacts_path = os.path.join('azureml', head_run_rollout.id)\n",
"datastore = ws.get_default_datastore()\n",
"\n",
"rollout_run_artifacts_ds = Dataset.File.from_files(datastore.path(os.path.join(rollout_run_artifacts_path, '**')))\n",
"\n",
"video_archives = [file for file in rollout_run_artifacts_ds.to_path() if file.endswith('.tgz')]\n",
"video_archives = [os.path.join(rollout_run_artifacts_path, os.path.normpath(file[1:])) for file in video_archives]\n",
"\n",
"datastore.download(\n",
" target_path=path_prefix,\n",
" prefix=os.path.dirname(video_archives[0]).replace('\\\\', '/'),\n",
" show_progress=True)\n",
"\n",
"print('Download complete.')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Next, unzip the video files and rename them by the Minecraft mission seed used\n",
"(see `minecraft_rollout.py` for more details on how the seed is used)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import tarfile\n",
"import shutil\n",
"\n",
"training_artifacts_dir = './training_artifacts'\n",
"video_dir = os.path.join(training_artifacts_dir, 'videos')\n",
"video_files = []\n",
"\n",
"for tar_file_path in video_archives:\n",
" seed = tar_file_path[tar_file_path.index('rollout_') + len('rollout_'): tar_file_path.index('.tgz')]\n",
" \n",
" tar = tarfile.open(os.path.join(path_prefix, tar_file_path).replace('\\\\', '/'), 'r')\n",
" tar_info = next(t_info for t_info in tar.getmembers() if t_info.name.endswith('mp4'))\n",
" tar.extract(tar_info, video_dir)\n",
" tar.close()\n",
" \n",
" unzipped_folder = os.path.join(video_dir, next(f_ for f_ in os.listdir(video_dir) if not f_.endswith('mp4'))) \n",
" video_file = os.path.join(unzipped_folder,'video.mp4')\n",
" final_video_path = os.path.join(video_dir, '{seed}.mp4'.format(seed=seed))\n",
" \n",
" shutil.move(video_file, final_video_path) \n",
" video_files.append(final_video_path)\n",
" \n",
" shutil.rmtree(unzipped_folder)\n",
"\n",
"# Clean up any downloaded 'tmp' files\n",
"shutil.rmtree(path_prefix)\n",
"\n",
"print('Local video files:\\n', video_files)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Finally, run the cell below to display the videos in-line. In some cases,\n",
"the agent may struggle to find the goal since the maze size was increased\n",
"compared to training."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from IPython.core.display import display, HTML\n",
"\n",
"index = 0\n",
"while index < len(video_files) - 1:\n",
" display(\n",
" HTML('\\\n",
" <video controls alt=\"cannot display video\" autoplay loop width=49%> \\\n",
" <source src=\"{f1}\" type=\"video/mp4\"> \\\n",
" </video> \\\n",
" <video controls alt=\"cannot display video\" autoplay loop width=49%> \\\n",
" <source src=\"{f2}\" type=\"video/mp4\"> \\\n",
" </video>'.format(f1=video_files[index], f2=video_files[index + 1]))\n",
" )\n",
" \n",
" index += 2\n",
"\n",
"if index < len(video_files):\n",
" display(\n",
" HTML('\\\n",
" <video controls alt=\"cannot display video\" autoplay loop width=49%> \\\n",
" <source src=\"{f1}\" type=\"video/mp4\"> \\\n",
" </video>'.format(f1=video_files[index]))\n",
" )"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Cleaning up\n",
"\n",
"Below, you can find code snippets for your convenience to clean up any resources created as part of this tutorial you don't wish to retain."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# to stop the Tensorboard, uncomment and run\n",
"#tb.stop()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# to delete the gpu compute target, uncomment and run\n",
"#gpu_cluster.delete()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# to delete the cpu compute target, uncomment and run\n",
"#cpu_cluster.delete()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# to delete the registered model, uncomment and run\n",
"#model.delete()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# to delete the local video files, uncomment and run\n",
"#shutil.rmtree(training_artifacts_dir)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Next steps\n",
"\n",
"This is currently the last introductory tutorial for Azure Machine Learning\n",
"service's Reinforcement\n",
"Learning offering. We would love to hear your feedback to build the features\n",
"you need!\n",
"\n"
]
}
],
"metadata": {
"authors": [
{
"name": "andress"
}
],
"kernelspec": {
"display_name": "Python 3.6",
"language": "python",
"name": "python36"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.0"
},
"notice": "Copyright (c) Microsoft Corporation. All rights reserved.\u00e2\u20ac\u00afLicensed under the MIT License.\u00e2\u20ac\u00af "
},
"nbformat": 4,
"nbformat_minor": 4
}

View File

@@ -0,0 +1,8 @@
name: minecraft
dependencies:
- pip:
- azureml-sdk
- azureml-contrib-reinforcementlearning
- azureml-widgets
- tensorboard
- azureml-tensorboard

View File

@@ -20,7 +20,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"# Azure ML Reinforcement Learning Sample - Setting Up Development Environment\n",
"# Reinforcement Learning in Azure Machine Learning - Setting Up Development Environment\n",
"\n",
"Ray multi-node cluster setup requires all worker nodes to be able to communicate with the head node. This notebook explains you how to setup a virtual network, to be used by the Ray head and worker compute targets, created and used in other notebook examples."
]
@@ -31,7 +31,7 @@
"source": [
"### Prerequisite\n",
"\n",
"The user should have completed the Azure Machine Learning Tutorial: [Get started creating your first ML experiment with the Python SDK](https://docs.microsoft.com/en-us/azure/machine-learning/tutorial-1st-experiment-sdk-setup). You will need to make sure that you have a valid subscription id, a resource group and a workspace."
"The user should have completed the Azure Machine Learning Tutorial: [Get started creating your first ML experiment with the Python SDK](https://docs.microsoft.com/en-us/azure/machine-learning/tutorial-1st-experiment-sdk-setup). You will need to make sure that you have a valid subscription ID, a resource group, and an Azure Machine Learning workspace."
]
},
{
@@ -48,19 +48,17 @@
"metadata": {},
"outputs": [],
"source": [
"# Azure ML Core imports\n",
"import azureml.core\n",
"\n",
"# Check core SDK version number\n",
"print(\"Azure ML SDK Version: \", azureml.core.VERSION)"
"print(\"Azure Machine Learning SDK Version: \", azureml.core.VERSION)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Get Azure ML workspace\n",
"Get a reference to an existing Azure ML workspace. Please make sure that the VM sizes `STANDARD_NC6` and `STANDARD_D2_V2` are supported in the workspace's region.\n"
"### Get Azure Machine Learning workspace\n",
"Get a reference to an existing Azure Machine Learning workspace. Please make sure to change `STANDARD_NC6` and `STANDARD_D2_V2` to [the ones available in your region](https://azure.microsoft.com/en-us/global-infrastructure/services/?products=virtual-machines).\n"
]
},
{
@@ -115,7 +113,7 @@
"# The Azure subscription you are using\n",
"subscription_id=ws.subscription_id\n",
"\n",
"# The resource group for the RL cluster\n",
"# The resource group for the reinforcement learning cluster\n",
"resource_group=ws.resource_group\n",
"\n",
"# Azure region of the resource group\n",
@@ -135,7 +133,7 @@
")\n",
"\n",
"async_vnet_creation.wait()\n",
"print(\"VNet created successfully: \", async_vnet_creation.result())"
"print(\"Virtual network created successfully: \", async_vnet_creation.result())"
]
},
{
@@ -169,7 +167,7 @@
" azure.mgmt.network.models.SecurityRule(\n",
" name=security_rule_name,\n",
" access=azure.mgmt.network.models.SecurityRuleAccess.allow,\n",
" description='Azure ML RL rule',\n",
" description='Reinforcement Learning in Azure Machine Learning rule',\n",
" destination_address_prefix='*',\n",
" destination_port_range='29876-29877',\n",
" direction=azure.mgmt.network.models.SecurityRuleDirection.inbound,\n",
@@ -202,7 +200,7 @@
" network_security_group=network_security_group\n",
" )\n",
" \n",
"# Create subnet on vnet\n",
"# Create subnet on virtual network\n",
"async_subnet_creation = network_client.subnets.create_or_update(\n",
" resource_group_name=resource_group,\n",
" virtual_network_name=vnet_name,\n",

View File

@@ -100,7 +100,7 @@
"\n",
"# Check core SDK version number\n",
"\n",
"print(\"This notebook was created using SDK version 1.5.0, you are currently running version\", azureml.core.VERSION)"
"print(\"This notebook was created using SDK version 1.6.0, you are currently running version\", azureml.core.VERSION)"
]
},
{

View File

@@ -439,6 +439,8 @@
"metadata": {},
"outputs": [],
"source": [
"from azureml.train.dnn import TensorFlow\n",
"\n",
"script_params = {\"--log_dir\": \"./logs\"}\n",
"\n",
"# If you want the run to go longer, set --max-steps to a higher number.\n",

View File

@@ -144,25 +144,18 @@
"import os\n",
"\n",
"try:\n",
" # if you want to connect using SSH key instead of username/password you can provide parameters private_key_file and private_key_passphrase\n",
" attach_config = HDInsightCompute.attach_configuration(address=os.environ.get('hdiservername', '<my_hdi_cluster_name>-ssh.azurehdinsight.net'), \n",
" ssh_port=22, \n",
" username=os.environ.get('hdiusername', '<ssh_username>'), \n",
"# If you want to connect using SSH key instead of username/password you can provide parameters private_key_file and private_key_passphrase\n",
"\n",
"# Attaching a HDInsight cluster using the public address of the HDInsight cluster is no longer supported.\n",
"# Instead, use resourceId of the HDInsight cluster.\n",
"# The resourceId of the HDInsight Cluster can be constructed using the following string format:\n",
"# /subscriptions/<subscription_id>/resourceGroups/<resource_group>/providers/Microsoft.HDInsight/clusters/<cluster_name>.\n",
"# You can also use subscription_id, resource_group and cluster_name without constructing resourceId.\n",
" attach_config = HDInsightCompute.attach_configuration(resource_id='<resource_id>',\n",
" ssh_port=22,\n",
" username=os.environ.get('hdiusername', '<ssh_username>'),\n",
" password=os.environ.get('hdipassword', '<my_password>'))\n",
"\n",
"# The following Azure regions do not support attaching a HDI Cluster using the public IP address of the HDI Cluster.\n",
"# Instead, use the Azure Resource Manager ID of the HDI Cluster with the resource_id parameter:\n",
"# US East\n",
"# US West 2\n",
"# US South Central\n",
"# The resource ID of the HDI Cluster can be constructed using the\n",
"# subscription ID, resource group name, and cluster name using the following string format:\n",
"# /subscriptions/<subscription_id>/resourceGroups/<resource_group>/providers/Microsoft.HDInsight/clusters/<cluster_name>. \n",
"# If in US East, US West 2, or US South Central, use the following instead:\n",
"# attach_config = HDInsightCompute.attach_configuration(resource_id='<resource_id>',\n",
"# ssh_port=22,\n",
"# username=os.environ.get('hdiusername', '<ssh_username>'),\n",
"# password=os.environ.get('hdipassword', '<my_password>'))\n",
" hdi_compute = ComputeTarget.attach(workspace=ws, \n",
" name='myhdi', \n",
" attach_configuration=attach_config)\n",

View File

@@ -268,23 +268,15 @@
" private_key_file='./.ssh/id_rsa')\n",
"\n",
"\n",
"# The following Azure regions do not support attaching a virtual machine using the public IP address of the VM.\n",
"# Instead, use the Azure Resource Manager ID of the VM with the resource_id parameter:\n",
"# US East\n",
"# US West 2\n",
"# US South Central\n",
"# The resource ID of the VM can be constructed using the\n",
"# subscription ID, resource group name, and VM name using the following string format:\n",
"# /subscriptions/<subscription_id>/resourceGroups/<resource_group>/providers/Microsoft.Compute/virtualMachines/<vm_name>. \n",
"# If in US East, US West 2, or US South Central, use the following instead:\n",
"# attach_config = RemoteCompute.attach_configuration(resource_id='<resource_id>',\n",
"# ssh_port=22,\n",
"# username='username',\n",
"# private_key_file='./.ssh/id_rsa')\n",
"\n",
" attached_dsvm_compute = ComputeTarget.attach(workspace=ws,\n",
" name=compute_target_name,\n",
" attach_configuration=attach_config)\n",
"# Attaching a virtual machine using the public IP address of the VM is no longer supported.\n",
"# Instead, use resourceId of the VM.\n",
"# The resourceId of the VM can be constructed using the following string format:\n",
"# /subscriptions/<subscription_id>/resourceGroups/<resource_group>/providers/Microsoft.Compute/virtualMachines/<vm_name>.\n",
"# You can also use subscription_id, resource_group and vm_name without constructing resourceId.\n",
" attach_config = RemoteCompute.attach_configuration(resource_id='<resource_id>',\n",
" ssh_port=22,\n",
" username='username',\n",
" private_key_file='./.ssh/id_rsa')\n",
" attached_dsvm_compute.wait_for_completion(show_output=True)"
]
},

View File

@@ -279,7 +279,8 @@
" outputs=[prepared_fashion_ds],\n",
" source_directory=script_folder,\n",
" compute_target=compute_target,\n",
" runconfig=run_config)"
" runconfig=run_config,\n",
" allow_reuse=False)"
]
},
{

View File

@@ -2,6 +2,5 @@ name: pipeline-for-image-classification
dependencies:
- pip:
- azureml-sdk
- azureml-dataprep
- pandas<=0.23.4
- fuse

View File

@@ -2,5 +2,4 @@ name: tabular-timeseries-dataset-filtering
dependencies:
- pip:
- azureml-sdk
- azureml-dataprep
- pandas<=0.23.4

View File

@@ -3,7 +3,6 @@ dependencies:
- pip:
- azureml-sdk
- azureml-widgets
- azureml-dataprep
- pandas<=0.23.4
- fuse
- scikit-learn

View File

@@ -26,7 +26,7 @@ Machine Learning notebook samples and encourage efficient retrieval of topics an
| :star:[Datasets with ML Pipeline](https://github.com/Azure/MachineLearningNotebooks/blob/master//how-to-use-azureml/work-with-data/datasets-tutorial/pipeline-with-datasets/pipeline-for-image-classification.ipynb) | Train | Fashion MNIST | Remote | None | Azure ML | Dataset, Pipeline, Estimator, ScriptRun |
| :star:[Filtering data using Tabular Timeseiries Dataset related API](https://github.com/Azure/MachineLearningNotebooks/blob/master//how-to-use-azureml/work-with-data/datasets-tutorial/timeseries-datasets/tabular-timeseries-dataset-filtering.ipynb) | Filtering | NOAA | Local | None | Azure ML | Dataset, Tabular Timeseries |
| :star:[Train with Datasets (Tabular and File)](https://github.com/Azure/MachineLearningNotebooks/blob/master//how-to-use-azureml/work-with-data/datasets-tutorial/train-with-datasets/train-with-datasets.ipynb) | Train | Iris, Diabetes | Remote | None | Azure ML | Dataset, Estimator, ScriptRun |
| [Forecasting away from training data](https://github.com/Azure/MachineLearningNotebooks/blob/master//how-to-use-azureml/automated-machine-learning/forecasting-high-frequency/auto-ml-forecasting-function.ipynb) | Forecasting | None | Remote | None | Azure ML AutoML | Forecasting, Confidence Intervals |
| [Forecasting away from training data](https://github.com/Azure/MachineLearningNotebooks/blob/master//how-to-use-azureml/automated-machine-learning/forecasting-forecast-function/auto-ml-forecasting-function.ipynb) | Forecasting | None | Remote | None | Azure ML AutoML | Forecasting, Confidence Intervals |
| [Automated ML run with basic edition features.](https://github.com/Azure/MachineLearningNotebooks/blob/master//how-to-use-azureml/automated-machine-learning/classification-bank-marketing-all-features/auto-ml-classification-bank-marketing-all-features.ipynb) | Classification | Bankmarketing | AML | ACI | None | featurization, explainability, remote_run, AutomatedML |
| [Classification of credit card fraudulent transactions using Automated ML](https://github.com/Azure/MachineLearningNotebooks/blob/master//how-to-use-azureml/automated-machine-learning/classification-credit-card-fraud/auto-ml-classification-credit-card-fraud.ipynb) | Classification | Creditcard | AML Compute | None | None | remote_run, AutomatedML |
| [Automated ML run with featurization and model explainability.](https://github.com/Azure/MachineLearningNotebooks/blob/master//how-to-use-azureml/automated-machine-learning/regression-explanation-featurization/auto-ml-regression-explanation-featurization.ipynb) | Regression | MachineData | AML | ACI | None | featurization, explainability, remote_run, AutomatedML |
@@ -41,6 +41,7 @@ Machine Learning notebook samples and encourage efficient retrieval of topics an
| :star:[How to Setup a Schedule for a Published Pipeline](https://github.com/Azure/MachineLearningNotebooks/blob/master//how-to-use-azureml/machine-learning-pipelines/intro-to-pipelines/aml-pipelines-setup-schedule-for-a-published-pipeline.ipynb) | Demonstrates the use of Schedules for Published Pipelines | Custom | AML Compute | None | Azure ML | None |
| [How to setup a versioned Pipeline Endpoint](https://github.com/Azure/MachineLearningNotebooks/blob/master//how-to-use-azureml/machine-learning-pipelines/intro-to-pipelines/aml-pipelines-setup-versioned-pipeline-endpoints.ipynb) | Demonstrates the use of PipelineEndpoint to run a specific version of the Published Pipeline | Custom | AML Compute | None | Azure ML | None |
| :star:[How to use DataPath as a PipelineParameter](https://github.com/Azure/MachineLearningNotebooks/blob/master//how-to-use-azureml/machine-learning-pipelines/intro-to-pipelines/aml-pipelines-showcasing-datapath-and-pipelineparameter.ipynb) | Demonstrates the use of DataPath as a PipelineParameter | Custom | AML Compute | None | Azure ML | None |
| :star:[How to use Dataset as a PipelineParameter](https://github.com/Azure/MachineLearningNotebooks/blob/master//how-to-use-azureml/machine-learning-pipelines/intro-to-pipelines/aml-pipelines-showcasing-dataset-and-pipelineparameter.ipynb) | Demonstrates the use of Dataset as a PipelineParameter | Custom | AML Compute | None | Azure ML | None |
| [How to use AdlaStep with AML Pipelines](https://github.com/Azure/MachineLearningNotebooks/blob/master//how-to-use-azureml/machine-learning-pipelines/intro-to-pipelines/aml-pipelines-use-adla-as-compute-target.ipynb) | Demonstrates the use of AdlaStep | Custom | Azure Data Lake Analytics | None | Azure ML | None |
| :star:[How to use DatabricksStep with AML Pipelines](https://github.com/Azure/MachineLearningNotebooks/blob/master//how-to-use-azureml/machine-learning-pipelines/intro-to-pipelines/aml-pipelines-use-databricks-as-compute-target.ipynb) | Demonstrates the use of DatabricksStep | Custom | Azure Databricks | None | Azure ML, Azure Databricks | None |
| :star:[How to use AutoMLStep with AML Pipelines](https://github.com/Azure/MachineLearningNotebooks/blob/master//how-to-use-azureml/machine-learning-pipelines/intro-to-pipelines/aml-pipelines-with-automated-machine-learning-step.ipynb) | Demonstrates the use of AutoMLStep | Custom | AML Compute | None | Automated Machine Learning | None |
@@ -113,7 +114,6 @@ Machine Learning notebook samples and encourage efficient retrieval of topics an
| [onnx-model-register-and-deploy](https://github.com/Azure/MachineLearningNotebooks/blob/master//how-to-use-azureml/deployment/onnx/onnx-model-register-and-deploy.ipynb) | | | | | | |
| [production-deploy-to-aks](https://github.com/Azure/MachineLearningNotebooks/blob/master//how-to-use-azureml/deployment/production-deploy-to-aks/production-deploy-to-aks.ipynb) | | | | | | |
| [production-deploy-to-aks-gpu](https://github.com/Azure/MachineLearningNotebooks/blob/master//how-to-use-azureml/deployment/production-deploy-to-aks-gpu/production-deploy-to-aks-gpu.ipynb) | | | | | | |
| [tensorflow-model-register-and-deploy](https://github.com/Azure/MachineLearningNotebooks/blob/master//how-to-use-azureml/deployment/tensorflow/tensorflow-model-register-and-deploy.ipynb) | | | | | | |
| [explain-model-on-amlcompute](https://github.com/Azure/MachineLearningNotebooks/blob/master//how-to-use-azureml/explain-model/azure-integration/remote-explanation/explain-model-on-amlcompute.ipynb) | | | | | | |
| [save-retrieve-explanations-run-history](https://github.com/Azure/MachineLearningNotebooks/blob/master//how-to-use-azureml/explain-model/azure-integration/run-history/save-retrieve-explanations-run-history.ipynb) | | | | | | |
| [train-explain-model-locally-and-deploy](https://github.com/Azure/MachineLearningNotebooks/blob/master//how-to-use-azureml/explain-model/azure-integration/scoring-time/train-explain-model-locally-and-deploy.ipynb) | | | | | | |
@@ -123,7 +123,8 @@ Machine Learning notebook samples and encourage efficient retrieval of topics an
| [authentication-in-azureml](https://github.com/Azure/MachineLearningNotebooks/blob/master//how-to-use-azureml/manage-azureml-service/authentication-in-azureml/authentication-in-azureml.ipynb) | | | | | | |
| [pong_rllib](https://github.com/Azure/MachineLearningNotebooks/blob/master//how-to-use-azureml/reinforcement-learning/atari-on-distributed-compute/pong_rllib.ipynb) | | | | | | |
| [cartpole_ci](https://github.com/Azure/MachineLearningNotebooks/blob/master//how-to-use-azureml/reinforcement-learning/cartpole-on-compute-instance/cartpole_ci.ipynb) | | | | | | |
| [cartpole_cc](https://github.com/Azure/MachineLearningNotebooks/blob/master//how-to-use-azureml/reinforcement-learning/cartpole-on-single-compute/cartpole_cc.ipynb) | | | | | | |
| [cartpole_sc](https://github.com/Azure/MachineLearningNotebooks/blob/master//how-to-use-azureml/reinforcement-learning/cartpole-on-single-compute/cartpole_sc.ipynb) | | | | | | |
| [minecraft](https://github.com/Azure/MachineLearningNotebooks/blob/master//how-to-use-azureml/reinforcement-learning/minecraft-on-distributed-compute/minecraft.ipynb) | | | | | | |
| [devenv_setup](https://github.com/Azure/MachineLearningNotebooks/blob/master//how-to-use-azureml/reinforcement-learning/setup/devenv_setup.ipynb) | | | | | | |
| [Logging APIs](https://github.com/Azure/MachineLearningNotebooks/blob/master//how-to-use-azureml/track-and-monitor-experiments/logging-api/logging-api.ipynb) | Logging APIs and analyzing results | None | None | None | None | None |
| [distributed-cntk-with-custom-docker](https://github.com/Azure/MachineLearningNotebooks/blob/master//how-to-use-azureml/training-with-deep-learning/distributed-cntk-with-custom-docker/distributed-cntk-with-custom-docker.ipynb) | | | | | | |
@@ -132,5 +133,6 @@ Machine Learning notebook samples and encourage efficient retrieval of topics an
| [tutorial-1st-experiment-sdk-train](https://github.com/Azure/MachineLearningNotebooks/blob/master//tutorials/create-first-ml-experiment/tutorial-1st-experiment-sdk-train.ipynb) | | | | | | |
| [img-classification-part1-training](https://github.com/Azure/MachineLearningNotebooks/blob/master//tutorials/image-classification-mnist-data/img-classification-part1-training.ipynb) | | | | | | |
| [img-classification-part2-deploy](https://github.com/Azure/MachineLearningNotebooks/blob/master//tutorials/image-classification-mnist-data/img-classification-part2-deploy.ipynb) | | | | | | |
| [img-classification-part3-deploy-encrypted](https://github.com/Azure/MachineLearningNotebooks/blob/master//tutorials/image-classification-mnist-data/img-classification-part3-deploy-encrypted.ipynb) | | | | | | |
| [tutorial-pipeline-batch-scoring-classification](https://github.com/Azure/MachineLearningNotebooks/blob/master//tutorials/machine-learning-pipelines-advanced/tutorial-pipeline-batch-scoring-classification.ipynb) | | | | | | |
| [regression-automated-ml](https://github.com/Azure/MachineLearningNotebooks/blob/master//tutorials/regression-automl-nyc-taxi-data/regression-automated-ml.ipynb) | | | | | | |

View File

@@ -102,7 +102,7 @@
"source": [
"import azureml.core\n",
"\n",
"print(\"This notebook was created using version 1.5.0 of the Azure ML SDK\")\n",
"print(\"This notebook was created using version 1.6.0 of the Azure ML SDK\")\n",
"print(\"You are currently using version\", azureml.core.VERSION, \"of the Azure ML SDK\")"
]
},

View File

@@ -19,6 +19,7 @@ The following tutorials are intended to provide an introductory overview of Azur
| [Train your first ML Model](https://docs.microsoft.com/azure/machine-learning/tutorial-1st-experiment-sdk-train) | Learn the foundational design patterns in Azure Machine Learning and train a scikit-learn model based on a diabetes data set. | [tutorial-quickstart-train-model.ipynb](create-first-ml-experiment/tutorial-1st-experiment-sdk-train.ipynb) | Regression | Scikit-Learn
| [Train an image classification model](https://docs.microsoft.com/azure/machine-learning/tutorial-train-models-with-aml) | Train a scikit-learn image classification model. | [img-classification-part1-training.ipynb](image-classification-mnist-data/img-classification-part1-training.ipynb) | Image Classification | Scikit-Learn
| [Deploy an image classification model](https://docs.microsoft.com/azure/machine-learning/tutorial-deploy-models-with-aml) | Deploy a scikit-learn image classification model to Azure Container Instances. | [img-classification-part2-deploy.ipynb](image-classification-mnist-data/img-classification-part2-deploy.ipynb) | Image Classification | Scikit-Learn
| [Deploy an encrypted inferencing service](https://docs.microsoft.com/azure/machine-learning/tutorial-deploy-models-with-aml) |Deploy an image classification model for encrypted inferencing in Azure Container Instances | [img-classification-part3-deploy-encrypted.ipynb](image-classification-mnist-data/img-classification-part3-deploy-encrypted.ipynb) | Image Classification | Scikit-Learn
| [Use automated machine learning to predict taxi fares](https://docs.microsoft.com/azure/machine-learning/tutorial-auto-train-models) | Train a regression model to predict taxi fares using Automated Machine Learning. | [regression-part2-automated-ml.ipynb](regression-automl-nyc-taxi-data/regression-automated-ml.ipynb) | Regression | Automated ML
## Advanced Samples

View File

@@ -212,7 +212,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"![Main Experiment page in the studio](../imgs/experiment_main.png)"
"![Main Experiment page in the studio](./imgs/experiment_main.png)"
]
},
{
@@ -226,7 +226,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"![Run details page in the studio](../imgs/model_download.png)"
"![Run details page in the studio](./imgs/model_download.png)"
]
},
{

View File

@@ -0,0 +1,615 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Copyright (c) Microsoft Corporation. All rights reserved.\n",
"\n",
"Licensed under the MIT License."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Tutorial #3: Deploy an image classification model for encrypted inferencing in Azure Container Instance (ACI)\n",
"\n",
"This tutorial is **a new addition to the two-part series**. In the [previous tutorial](img-classification-part1-training.ipynb), you trained machine learning models and then registered a model in your workspace on the cloud. \n",
"\n",
"Now, you're ready to deploy the model as a encrypted inferencing web service in [Azure Container Instances](https://docs.microsoft.com/azure/container-instances/) (ACI). A web service is an image, in this case a Docker image, that encapsulates the scoring logic and the model itself. \n",
"\n",
"In this part of the tutorial, you use Azure Machine Learning service (Preview) to:\n",
"\n",
"> * Set up your testing environment\n",
"> * Retrieve the model from your workspace\n",
"> * Test the model locally\n",
"> * Deploy the model to ACI\n",
"> * Test the deployed model\n",
"\n",
"ACI is a great solution for testing and understanding the workflow. For scalable production deployments, consider using Azure Kubernetes Service. For more information, see [how to deploy and where](https://docs.microsoft.com/azure/machine-learning/service/how-to-deploy-and-where).\n",
"\n",
"\n",
"## Prerequisites\n",
"\n",
"Complete the model training in the [Tutorial #1: Train an image classification model with Azure Machine Learning](train-models.ipynb) notebook. \n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# If you did NOT complete the tutorial, you can instead run this cell \n",
"# This will register a model and download the data needed for this tutorial\n",
"# These prerequisites are created in the training tutorial\n",
"# Feel free to skip this cell if you completed the training tutorial \n",
"\n",
"# register a model\n",
"from azureml.core import Workspace\n",
"ws = Workspace.from_config()\n",
"\n",
"from azureml.core.model import Model\n",
"\n",
"model_name = \"sklearn_mnist\"\n",
"model = Model.register(model_path=\"sklearn_mnist_model.pkl\",\n",
" model_name=model_name,\n",
" tags={\"data\": \"mnist\", \"model\": \"classification\"},\n",
" description=\"Mnist handwriting recognition\",\n",
" workspace=ws)\n",
"\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Setup the Environment \n",
"\n",
"Add `encrypted-inference` package as a conda dependency "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azureml.core.environment import Environment\n",
"from azureml.core.conda_dependencies import CondaDependencies\n",
"\n",
"# to install required packages\n",
"env = Environment('tutorial-env')\n",
"cd = CondaDependencies.create(pip_packages=['azureml-dataprep[pandas,fuse]>=1.1.14', 'azureml-defaults', 'azure-storage-blob', 'encrypted-inference==0.9'], conda_packages = ['scikit-learn==0.22.1'])\n",
"\n",
"env.python.conda_dependencies = cd\n",
"\n",
"# Register environment to re-use later\n",
"env.register(workspace = ws)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Set up the environment\n",
"\n",
"Start by setting up a testing environment.\n",
"\n",
"### Import packages\n",
"\n",
"Import the Python packages needed for this tutorial."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": [
"check version"
]
},
"outputs": [],
"source": [
"%matplotlib inline\n",
"import numpy as np\n",
"import matplotlib.pyplot as plt\n",
" \n",
"import azureml.core\n",
"\n",
"# display the core SDK version number\n",
"print(\"Azure ML SDK Version: \", azureml.core.VERSION)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Install Homomorphic Encryption based library for Secure Inferencing\n",
"\n",
"Our library is based on [Microsoft SEAL](https://github.com/Microsoft/SEAL) and pubished to [PyPi.org](https://pypi.org/project/encrypted-inference) as an easy to use package "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"!pip install encrypted-inference==0.9"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Deploy as web service\n",
"\n",
"Deploy the model as a web service hosted in ACI. \n",
"\n",
"To build the correct environment for ACI, provide the following:\n",
"* A scoring script to show how to use the model\n",
"* A configuration file to build the ACI\n",
"* The model you trained before\n",
"\n",
"### Create scoring script\n",
"\n",
"Create the scoring script, called score.py, used by the web service call to show how to use the model.\n",
"\n",
"You must include two required functions into the scoring script:\n",
"* The `init()` function, which typically loads the model into a global object. This function is run only once when the Docker container is started. \n",
"\n",
"* The `run(input_data)` function uses the model to predict a value based on the input data. Inputs and outputs to the run typically use JSON for serialization and de-serialization, but other formats are supported. The function fetches homomorphic encryption based public keys that are uploaded by the service caller. \n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%writefile score.py\n",
"import json\n",
"import os\n",
"import pickle\n",
"import joblib\n",
"from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient, PublicAccess\n",
"from encrypted.inference.eiserver import EIServer\n",
"\n",
"def init():\n",
" global model\n",
" # AZUREML_MODEL_DIR is an environment variable created during deployment.\n",
" # It is the path to the model folder (./azureml-models/$MODEL_NAME/$VERSION)\n",
" # For multiple models, it points to the folder containing all deployed models (./azureml-models)\n",
" model_path = os.path.join(os.getenv('AZUREML_MODEL_DIR'), 'sklearn_mnist_model.pkl')\n",
" model = joblib.load(model_path)\n",
"\n",
" global server\n",
" server = EIServer(model.coef_, model.intercept_, verbose=True)\n",
"\n",
"def run(raw_data):\n",
"\n",
" json_properties = json.loads(raw_data)\n",
"\n",
" key_id = json_properties['key_id']\n",
" conn_str = json_properties['conn_str']\n",
" container = json_properties['container']\n",
" data = json_properties['data']\n",
"\n",
" # download the Galois keys from blob storage\n",
" #TODO optimize by caching the keys locally \n",
" blob_service_client = BlobServiceClient.from_connection_string(conn_str=conn_str)\n",
" blob_client = blob_service_client.get_blob_client(container=container, blob=key_id)\n",
" public_keys = blob_client.download_blob().readall()\n",
" \n",
" result = {}\n",
" # make prediction\n",
" result = server.predict(data, public_keys)\n",
"\n",
" # you can return any data type as long as it is JSON-serializable\n",
" return result"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Create configuration file\n",
"\n",
"Create a deployment configuration file and specify the number of CPUs and gigabyte of RAM needed for your ACI container. While it depends on your model, the default of 1 core and 1 gigabyte of RAM is usually sufficient for many models. If you feel you need more later, you would have to recreate the image and redeploy the service."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": [
"configure web service",
"aci"
]
},
"outputs": [],
"source": [
"from azureml.core.webservice import AciWebservice\n",
"\n",
"aciconfig = AciWebservice.deploy_configuration(cpu_cores=1, \n",
" memory_gb=1, \n",
" tags={\"data\": \"MNIST\", \"method\" : \"sklearn\"}, \n",
" description='Encrypted Predict MNIST with sklearn + SEAL')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Deploy in ACI\n",
"Estimated time to complete: **about 2-5 minutes**\n",
"\n",
"Configure the image and deploy. The following code goes through these steps:\n",
"\n",
"1. Create environment object containing dependencies needed by the model using the environment file (`myenv.yml`)\n",
"1. Create inference configuration necessary to deploy the model as a web service using:\n",
" * The scoring file (`score.py`)\n",
" * envrionment object created in previous step\n",
"1. Deploy the model to the ACI container.\n",
"1. Get the web service HTTP endpoint."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": [
"configure image",
"create image",
"deploy web service",
"aci"
]
},
"outputs": [],
"source": [
"%%time\n",
"from azureml.core.webservice import Webservice\n",
"from azureml.core.model import InferenceConfig\n",
"from azureml.core.environment import Environment\n",
"from azureml.core import Workspace\n",
"from azureml.core.model import Model\n",
"\n",
"ws = Workspace.from_config()\n",
"model = Model(ws, 'sklearn_mnist')\n",
"\n",
"myenv = Environment.get(workspace=ws, name=\"tutorial-env\")\n",
"inference_config = InferenceConfig(entry_script=\"score.py\", environment=myenv)\n",
"\n",
"service = Model.deploy(workspace=ws, \n",
" name='sklearn-mnist-svc', \n",
" models=[model], \n",
" inference_config=inference_config, \n",
" deployment_config=aciconfig)\n",
"\n",
"service.wait_for_deployment(show_output=True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Get the scoring web service's HTTP endpoint, which accepts REST client calls. This endpoint can be shared with anyone who wants to test the web service or integrate it into an application."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": [
"get scoring uri"
]
},
"outputs": [],
"source": [
"print(service.scoring_uri)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Test the model\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Download test data\n",
"Download the test data to the **./data/** directory"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"from azureml.core import Dataset\n",
"from azureml.opendatasets import MNIST\n",
"\n",
"data_folder = os.path.join(os.getcwd(), 'data')\n",
"os.makedirs(data_folder, exist_ok=True)\n",
"\n",
"mnist_file_dataset = MNIST.get_file_dataset()\n",
"mnist_file_dataset.download(data_folder, overwrite=True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Load test data\n",
"\n",
"Load the test data from the **./data/** directory created during the training tutorial."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from utils import load_data\n",
"import os\n",
"import glob\n",
"\n",
"data_folder = os.path.join(os.getcwd(), 'data')\n",
"# note we also shrink the intensity values (X) from 0-255 to 0-1. This helps the neural network converge faster\n",
"X_test = load_data(glob.glob(os.path.join(data_folder,\"**/t10k-images-idx3-ubyte.gz\"), recursive=True)[0], False) / 255.0\n",
"y_test = load_data(glob.glob(os.path.join(data_folder,\"**/t10k-labels-idx1-ubyte.gz\"), recursive=True)[0], True).reshape(-1)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Predict test data\n",
"\n",
"Feed the test dataset to the model to get predictions.\n",
"\n",
"\n",
"The following code goes through these steps:\n",
"\n",
"1. Create our Homomorphic Encryption based client \n",
"\n",
"1. Upload HE generated public keys \n",
"\n",
"1. Encrypt the data\n",
"\n",
"1. Send the data as JSON to the web service hosted in ACI. \n",
"\n",
"1. Use the SDK's `run` API to invoke the service. You can also make raw calls using any HTTP tool such as curl."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Create our Homomorphic Encryption based client \n",
"\n",
"Create a new EILinearRegressionClient and setup the public keys "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from encrypted.inference.eiclient import EILinearRegressionClient\n",
"\n",
"# Create a new Encrypted inference client and a new secret key.\n",
"edp = EILinearRegressionClient(verbose=True)\n",
"\n",
"public_keys_blob, public_keys_data = edp.get_public_keys()\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Upload HE generated public keys\n",
"\n",
"Upload the public keys to the workspace default blob store. This will allow us to share the keys with the inference server"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import azureml.core\n",
"from azureml.core import Workspace, Datastore\n",
"import os\n",
"\n",
"ws = Workspace.from_config()\n",
"\n",
"datastore = ws.get_default_datastore()\n",
"container_name=datastore.container_name\n",
"\n",
"# Create a local file and write the keys to it\n",
"public_keys = open(public_keys_blob, \"wb\")\n",
"public_keys.write(public_keys_data)\n",
"public_keys.close()\n",
"\n",
"# Upload the file to blob store\n",
"datastore.upload_files([public_keys_blob])\n",
"\n",
"# Delete the local file\n",
"os.remove(public_keys_blob)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Encrypt the data "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#choose any one sample from the test data \n",
"sample_index = 1\n",
"\n",
"#encrypt the data\n",
"raw_data = edp.encrypt(X_test[sample_index])\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Send the test data to the webservice hosted in ACI\n",
"\n",
"Feed the test dataset to the model to get predictions. We will need to send the connection string to the blob storage where the public keys were uploaded \n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import json\n",
"from azureml.core import Webservice\n",
"\n",
"service = Webservice(ws, 'sklearn-mnist-svc')\n",
"\n",
"#pass the connection string for blob storage to give the server access to the uploaded public keys \n",
"conn_str_template = 'DefaultEndpointsProtocol={};AccountName={};AccountKey={};EndpointSuffix=core.windows.net'\n",
"conn_str = conn_str_template.format(datastore.protocol, datastore.account_name, datastore.account_key)\n",
"\n",
"#build the json \n",
"data = json.dumps({\"data\": raw_data, \"key_id\" : public_keys_blob, \"conn_str\" : conn_str, \"container\" : container_name })\n",
"data = bytes(data, encoding='ASCII')\n",
"\n",
"print ('Making an encrypted inference web service call ')\n",
"eresult = service.run(input_data=data)\n",
"\n",
"print ('Received encrypted inference results')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Decrypt the data\n",
"\n",
"Use the client to decrypt the results"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import numpy as np \n",
"\n",
"results = edp.decrypt(eresult)\n",
"\n",
"print ('Decrypted the results ', results)\n",
"\n",
"#Apply argmax to identify the prediction result\n",
"prediction = np.argmax(results)\n",
"\n",
"print ( ' Prediction : ', prediction)\n",
"print ( ' Actual Label : ', y_test[sample_index])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Clean up resources\n",
"\n",
"To keep the resource group and workspace for other tutorials and exploration, you can delete only the ACI deployment using this API call:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": [
"delete web service"
]
},
"outputs": [],
"source": [
"service.delete()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"If you're not going to use what you've created here, delete the resources you just created with this quickstart so you don't incur any charges. In the Azure portal, select and delete your resource group. You can also keep the resource group, but delete a single workspace by displaying the workspace properties and selecting the Delete button.\n",
"\n",
"\n",
"## Next steps\n",
"\n",
"In this Azure Machine Learning tutorial, you used Python to:\n",
"\n",
"> * Set up your testing environment\n",
"> * Retrieve the model from your workspace\n",
"> * Test the model locally\n",
"> * Deploy the model to ACI\n",
"> * Test the deployed model\n",
" \n",
"You can also try out the [regression tutorial](regression-part1-data-prep.ipynb)."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/tutorials/img-classification-part2-deploy.png)"
]
}
],
"metadata": {
"authors": [
{
"name": "vkanne"
}
],
"celltoolbar": "Edit Metadata",
"kernelspec": {
"display_name": "Python 3.6",
"language": "python",
"name": "python36"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6"
},
"msauthor": "vkanne"
},
"nbformat": 4,
"nbformat_minor": 2
}

View File

@@ -0,0 +1,10 @@
name: img-classification-part3-deploy-encrypted
dependencies:
- pip:
- azureml-sdk
- matplotlib
- sklearn
- pandas
- azureml-opendatasets
- encrypted-inference==0.9
- azure-storage-blob

View File

@@ -21,9 +21,10 @@ image_size = 299
num_channel = 3
def get_class_label_dict():
def get_class_label_dict(labels_dir):
label = []
proto_as_ascii_lines = tf.gfile.GFile("labels.txt").readlines()
labels_path = os.path.join(labels_dir, 'labels.txt')
proto_as_ascii_lines = tf.gfile.GFile(labels_path).readlines()
for l in proto_as_ascii_lines:
label.append(l.rstrip())
return label
@@ -34,14 +35,10 @@ def init():
parser = argparse.ArgumentParser(description="Start a tensorflow model serving")
parser.add_argument('--model_name', dest="model_name", required=True)
parser.add_argument('--labels_name', dest="labels_name", required=True)
parser.add_argument('--labels_dir', dest="labels_dir", required=True)
args, _ = parser.parse_known_args()
workspace = Run.get_context(allow_offline=False).experiment.workspace
label_ds = Dataset.get_by_name(workspace=workspace, name=args.labels_name)
label_ds.download(target_path='.', overwrite=True)
label_dict = get_class_label_dict()
label_dict = get_class_label_dict(args.labels_dir)
classes_num = len(label_dict)
with slim.arg_scope(inception_v3.inception_v3_arg_scope()):

View File

@@ -20,14 +20,8 @@
"metadata": {},
"source": [
"# Use Azure Machine Learning Pipelines for batch prediction\n",
"\n",
"## Note\n",
"This notebook uses public preview functionality (ParallelRunStep). Please install azureml-contrib-pipeline-steps package before running this notebook.\n",
"\n",
"\n",
"In this tutorial, you use Azure Machine Learning service pipelines to run a batch scoring image classification job. The example job uses the pre-trained [Inception-V3](https://arxiv.org/abs/1512.00567) CNN (convolutional neural network) Tensorflow model to classify unlabeled images. Machine learning pipelines optimize your workflow with speed, portability, and reuse so you can focus on your expertise, machine learning, rather than on infrastructure and automation. After building and publishing a pipeline, you can configure a REST endpoint to enable triggering the pipeline from any HTTP library on any platform.\n",
"\n",
"\n",
"In this tutorial, you learn the following tasks:\n",
"\n",
"> * Configure workspace and download sample data\n",
@@ -38,7 +32,7 @@
"> * Build, run, and publish a pipeline\n",
"> * Enable a REST endpoint for the pipeline\n",
"\n",
"If you don\u00e2\u20ac\u2122t have an Azure subscription, create a free account before you begin. Try the [free or paid version of Azure Machine Learning service](https://aka.ms/AMLFree) today."
"If you don't have an Azure subscription, create a free account before you begin. Try the [free or paid version of Azure Machine Learning service](https://aka.ms/AMLFree) today."
]
},
{
@@ -129,7 +123,7 @@
"from azureml.pipeline.core import PipelineData\n",
"\n",
"input_images = Dataset.File.from_files((batchscore_blob, \"batchscoring/images/\"))\n",
"label_ds = Dataset.File.from_files((batchscore_blob, \"batchscoring/labels/*.txt\"))\n",
"label_ds = Dataset.File.from_files((batchscore_blob, \"batchscoring/labels/\"))\n",
"output_dir = PipelineData(name=\"scores\", \n",
" datastore=def_data_store, \n",
" output_path_on_compute=\"batchscoring/results\")"
@@ -149,7 +143,7 @@
"outputs": [],
"source": [
"input_images = input_images.register(workspace = ws, name = \"input_images\")\n",
"label_ds = label_ds.register(workspace = ws, name = \"label_ds\")"
"label_ds = label_ds.register(workspace = ws, name = \"label_ds\", create_new_version=True)"
]
},
{
@@ -260,7 +254,7 @@
"The script `batch_scoring.py` takes the following parameters, which get passed from the `ParallelRunStep` that you create later:\n",
"\n",
"- `--model_name`: the name of the model being used\n",
"- `--labels_name` : the name of the `Dataset` holding the `labels.txt` file \n",
"- `--labels_dir` : the directory path having the `labels.txt` file \n",
"\n",
"The pipelines infrastructure uses the `ArgumentParser` class to pass parameters into pipeline steps. For example, in the code below the first argument `--model_name` is given the property identifier `model_name`. In the `main()` function, this property is accessed using `Model.get_model_path(args.model_name)`."
]
@@ -296,7 +290,8 @@
"from azureml.core.conda_dependencies import CondaDependencies\n",
"from azureml.core.runconfig import DEFAULT_GPU_IMAGE\n",
"\n",
"cd = CondaDependencies.create(pip_packages=[\"tensorflow-gpu==1.15.2\", \"azureml-defaults\"])\n",
"cd = CondaDependencies.create(pip_packages=[\"tensorflow-gpu==1.15.2\",\n",
" \"azureml-core\", \"azureml-dataprep[fuse]\"])\n",
"\n",
"env = Environment(name=\"parallelenv\")\n",
"env.python.conda_dependencies=cd\n",
@@ -317,7 +312,7 @@
"metadata": {},
"outputs": [],
"source": [
"from azureml.contrib.pipeline.steps import ParallelRunConfig\n",
"from azureml.pipeline.steps import ParallelRunConfig\n",
"\n",
"parallel_run_config = ParallelRunConfig(\n",
" environment=env,\n",
@@ -356,18 +351,20 @@
"metadata": {},
"outputs": [],
"source": [
"from azureml.contrib.pipeline.steps import ParallelRunStep\n",
"from azureml.pipeline.steps import ParallelRunStep\n",
"from datetime import datetime\n",
"\n",
"parallel_step_name = \"batchscoring-\" + datetime.now().strftime(\"%Y%m%d%H%M\")\n",
"\n",
"label_config = label_ds.as_named_input(\"labels_input\")\n",
"\n",
"batch_score_step = ParallelRunStep(\n",
" name=parallel_step_name,\n",
" inputs=[input_images.as_named_input(\"input_images\")],\n",
" output=output_dir,\n",
" models=[model],\n",
" arguments=[\"--model_name\", \"inception\",\n",
" \"--labels_name\", \"label_ds\"],\n",
" \"--labels_dir\", label_config],\n",
" side_inputs=[label_config],\n",
" parallel_run_config=parallel_run_config,\n",
" allow_reuse=False\n",
")"

View File

@@ -3,7 +3,7 @@ dependencies:
- pip:
- azureml-sdk
- azureml-pipeline-core
- azureml-contrib-pipeline-steps
- azureml-pipeline-steps
- pandas
- requests
- azureml-widgets