mirror of
https://github.com/Azure/MachineLearningNotebooks.git
synced 2025-12-20 17:45:10 -05:00
2 lines
8.9 KiB
Plaintext
2 lines
8.9 KiB
Plaintext
{"cells":[{"cell_type":"markdown","source":["Azure ML & Azure Databricks notebooks by Parashar Shah.\n\nCopyright (c) Microsoft Corporation. All rights reserved.\n\nLicensed under the MIT License."],"metadata":{}},{"cell_type":"markdown","source":["Please ensure you have run all previous notebooks in sequence before running this."],"metadata":{}},{"cell_type":"markdown","source":["#Model Building"],"metadata":{}},{"cell_type":"code","source":["import os\nimport pprint\nimport numpy as np\n\nfrom pyspark.ml import Pipeline, PipelineModel\nfrom pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler\nfrom pyspark.ml.classification import LogisticRegression\nfrom pyspark.ml.evaluation import BinaryClassificationEvaluator\nfrom pyspark.ml.tuning import CrossValidator, ParamGridBuilder"],"metadata":{},"outputs":[],"execution_count":4},{"cell_type":"code","source":["import azureml.core\n\n# Check core SDK version number\nprint(\"SDK version:\", azureml.core.VERSION)"],"metadata":{},"outputs":[],"execution_count":5},{"cell_type":"code","source":["# import the Workspace class and check the azureml SDK version\nfrom azureml.core import Workspace\n\nws = Workspace.from_config()\nprint('Workspace name: ' + ws.name, \n 'Azure region: ' + ws.location, \n 'Subscription id: ' + ws.subscription_id, \n 'Resource group: ' + ws.resource_group, sep = '\\n')"],"metadata":{},"outputs":[],"execution_count":6},{"cell_type":"code","source":["#get the train and test datasets\ntrain_data_path = \"AdultCensusIncomeTrain\"\ntest_data_path = \"AdultCensusIncomeTest\"\n\ntrain = spark.read.parquet(train_data_path)\ntest = spark.read.parquet(test_data_path)\n\nprint(\"train: ({}, {})\".format(train.count(), len(train.columns)))\nprint(\"test: ({}, {})\".format(test.count(), len(test.columns)))\n\ntrain.printSchema()"],"metadata":{},"outputs":[],"execution_count":7},{"cell_type":"markdown","source":["#Define ML Pipeline"],"metadata":{}},{"cell_type":"code","source":["label = \"income\"\ndtypes = dict(train.dtypes)\ndtypes.pop(label)\n\nsi_xvars = []\nohe_xvars = []\nfeatureCols = []\nfor idx,key in enumerate(dtypes):\n if dtypes[key] == \"string\":\n featureCol = \"-\".join([key, \"encoded\"])\n featureCols.append(featureCol)\n \n tmpCol = \"-\".join([key, \"tmp\"])\n # string-index and one-hot encode the string column\n #https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/ml/feature/StringIndexer.html\n #handleInvalid: Param for how to handle invalid data (unseen labels or NULL values). \n #Options are 'skip' (filter out rows with invalid data), 'error' (throw an error), \n #or 'keep' (put invalid data in a special additional bucket, at index numLabels). Default: \"error\"\n si_xvars.append(StringIndexer(inputCol=key, outputCol=tmpCol, handleInvalid=\"skip\"))\n ohe_xvars.append(OneHotEncoder(inputCol=tmpCol, outputCol=featureCol))\n else:\n featureCols.append(key)\n\n# string-index the label column into a column named \"label\"\nsi_label = StringIndexer(inputCol=label, outputCol='label')\n\n# assemble the encoded feature columns in to a column named \"features\"\nassembler = VectorAssembler(inputCols=featureCols, outputCol=\"features\")"],"metadata":{},"outputs":[],"execution_count":9},{"cell_type":"code","source":["from azureml.core.run import Run\nfrom azureml.core.experiment import Experiment\nimport numpy as np\nimport os\nimport shutil\n\nmodel_name = \"AdultCensus_runHistory.mml\"\nmodel_dbfs = os.path.join(\"/dbfs\", model_name)\nrun_history_name = 'spark-ml-notebook'\n\n# start a training run by defining an experiment\nmyexperiment = Experiment(ws, \"Azure_Databricks_Experiment\")\nroot_run = myexperiment.start_logging()\n\n# Regularization Rates\nregs = np.arange(0.0, 1.0, 0.2)\n\n# try a bunch of alpha values in a Linear Regression (Ridge) model\nfor reg in regs:\n print(\"Regularization rate: {}\".format(reg))\n # create a bunch of child runs\n with root_run.child_run(\"reg-\" + str(reg)) as run:\n # create a new Logistic Regression model.\n lr = LogisticRegression(regParam=reg)\n \n # put together the pipeline\n pipe = Pipeline(stages=[*si_xvars, *ohe_xvars, si_label, assembler, lr])\n\n # train the model\n model_pipeline = pipe.fit(train)\n \n # make prediction\n pred = model_pipeline.transform(test)\n \n # evaluate. note only 2 metrics are supported out of the box by Spark ML.\n bce = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction')\n au_roc = bce.setMetricName('areaUnderROC').evaluate(pred)\n au_prc = bce.setMetricName('areaUnderPR').evaluate(pred)\n\n print(\"Area under ROC: {}\".format(au_roc))\n print(\"Area Under PR: {}\".format(au_prc))\n \n # log reg, au_roc, au_prc and feature names in run history\n run.log(\"reg\", reg)\n run.log(\"au_roc\", au_roc)\n run.log(\"au_prc\", au_prc)\n run.log_list(\"columns\", train.columns)\n\n # save model\n model_pipeline.write().overwrite().save(model_name)\n \n # upload the serialized model into run history record\n mdl, ext = model_name.split(\".\")\n model_zip = mdl + \".zip\"\n shutil.make_archive(mdl, 'zip', model_dbfs)\n run.upload_file(\"outputs/\" + model_name, model_zip) \n #run.upload_file(\"outputs/\" + model_name, path_or_stream = model_dbfs) #cannot deal with folders\n\n # now delete the serialized model from local folder since it is already uploaded to run history \n shutil.rmtree(model_dbfs)\n os.remove(model_zip)\n \n# Declare run completed\nroot_run.complete()\nroot_run_id = root_run.id\nprint (\"run id:\", root_run.id)"],"metadata":{},"outputs":[],"execution_count":10},{"cell_type":"code","source":["#Load all run metrics from run history into a dictionary object.\nchild_runs = {}\nchild_run_metrics = {}\n\nfor r in root_run.get_children():\n child_runs[r.id] = r\n child_run_metrics[r.id] = r.get_metrics()"],"metadata":{},"outputs":[],"execution_count":11},{"cell_type":"code","source":["best_run_id = max(child_run_metrics, key = lambda k: child_run_metrics[k]['au_roc'])\nbest_run = child_runs[best_run_id]\nprint('Best run is:', best_run_id)\nprint('Metrics:', child_run_metrics[best_run_id])"],"metadata":{},"outputs":[],"execution_count":12},{"cell_type":"code","source":["best_reg = child_run_metrics[best_run_id]['reg']\nmax_auc = child_run_metrics[best_run_id]['au_roc']\n\nreg_auc = np.array([(child_run_metrics[k]['reg'], child_run_metrics[k]['au_roc']) for k in child_run_metrics.keys()])\nreg_auc_sorted = reg_auc[reg_auc[:,0].argsort()]\n\nimport pandas as pd\ndf = pd.DataFrame(reg_auc_sorted)\nspdf = spark.createDataFrame(df)\ndisplay(spdf)"],"metadata":{},"outputs":[],"execution_count":13},{"cell_type":"code","source":["#Download the model from the best run to a local folder\nbest_model_file_name = \"best_model.zip\"\nbest_run.download_file(name = 'outputs/' + model_name, output_file_path = best_model_file_name)"],"metadata":{},"outputs":[],"execution_count":14},{"cell_type":"markdown","source":["#Model Evaluation"],"metadata":{}},{"cell_type":"code","source":["##unzip the model to dbfs (as load() seems to require that) and load it.\nif os.path.isfile(model_dbfs) or os.path.isdir(model_dbfs):\n shutil.rmtree(model_dbfs)\nshutil.unpack_archive(best_model_file_name, model_dbfs)\n\nmodel_pipeline_best = PipelineModel.load(model_name)"],"metadata":{},"outputs":[],"execution_count":16},{"cell_type":"code","source":["# make prediction\npred = model_pipeline_best.transform(test)\noutput = pred[['hours_per_week','age','workclass','marital_status','income','prediction']]\ndisplay(output.limit(5))"],"metadata":{},"outputs":[],"execution_count":17},{"cell_type":"code","source":["# evaluate. note only 2 metrics are supported out of the box by Spark ML.\nbce = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction')\nau_roc = bce.setMetricName('areaUnderROC').evaluate(pred)\nau_prc = bce.setMetricName('areaUnderPR').evaluate(pred)\n\nprint(\"Area under ROC: {}\".format(au_roc))\nprint(\"Area Under PR: {}\".format(au_prc))"],"metadata":{},"outputs":[],"execution_count":18},{"cell_type":"markdown","source":["#Model Persistence"],"metadata":{}},{"cell_type":"code","source":["##NOTE: by default the model is saved to and loaded from /dbfs/ instead of cwd!\nmodel_pipeline_best.write().overwrite().save(model_name)\nprint(\"saved model to {}\".format(model_dbfs))"],"metadata":{},"outputs":[],"execution_count":20},{"cell_type":"code","source":["%sh\n\nls -la /dbfs/AdultCensus_runHistory.mml/*"],"metadata":{},"outputs":[],"execution_count":21},{"cell_type":"code","source":["dbutils.notebook.exit(\"success\")"],"metadata":{},"outputs":[],"execution_count":22}],"metadata":{"name":"03b.Build_model_runHistory","notebookId":3874566296719353},"nbformat":4,"nbformat_minor":0}
|