Azure ML & Azure Databricks notebooks by Parashar Shah.

Copyright (c) Microsoft Corporation. All rights reserved.

Licensed under the MIT License.

This notebook uses image from ACI notebook for deploying to AKS.

In [None]:
import azureml.core

# Check core SDK version number
print("SDK version:", azureml.core.VERSION)

In [None]:
# Set auth to be used by workspace related APIs.
# For automation or CI/CD ServicePrincipalAuthentication can be used.
# https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.authentication.serviceprincipalauthentication?view=azure-ml-py
auth = None

In [None]:
from azureml.core import Workspace

ws = Workspace.from_config(auth = auth)
print('Workspace name: ' + ws.name, 
      'Azure region: ' + ws.location, 
      'Subscription id: ' + ws.subscription_id, 
      'Resource group: ' + ws.resource_group, sep = '\n')

In [None]:
#Register the model
import os
from azureml.core.model import Model

model_name = "AdultCensus_runHistory_aks.mml" # 
model_name_dbfs = os.path.join("/dbfs", model_name)

print("copy model from dbfs to local")
model_local = "file:" + os.getcwd() + "/" + model_name
dbutils.fs.cp(model_name, model_local, True)

mymodel = Model.register(model_path = model_name, # this points to a local file
                       model_name = model_name, # this is the name the model is registered as, am using same name for both path and name.                 
                       description = "ADB trained model by Parashar",
                       workspace = ws)

print(mymodel.name, mymodel.description, mymodel.version)

In [None]:
#%%writefile score_sparkml.py
score_sparkml = """
 
import json
 
def init():
    # One-time initialization of PySpark and predictive model
    import pyspark
    from azureml.core.model import Model
    from pyspark.ml import PipelineModel
 
    global trainedModel
    global spark
 
    spark = pyspark.sql.SparkSession.builder.appName("ADB and AML notebook by Parashar").getOrCreate()
    model_name = "{model_name}" #interpolated
    model_path = Model.get_model_path(model_name)
    trainedModel = PipelineModel.load(model_path)
    
def run(input_json):
    if isinstance(trainedModel, Exception):
        return json.dumps({{"trainedModel":str(trainedModel)}})
      
    try:
        sc = spark.sparkContext
        input_list = json.loads(input_json)
        input_rdd = sc.parallelize(input_list)
        input_df = spark.read.json(input_rdd)
    
        # Compute prediction
        prediction = trainedModel.transform(input_df)
        #result = prediction.first().prediction
        predictions = prediction.collect()
 
        #Get each scored result
        preds = [str(x['prediction']) for x in predictions]
        result = ",".join(preds)
        # you can return any data type as long as it is JSON-serializable
        return result.tolist()
    except Exception as e:
        result = str(e)
        return result
    
""".format(model_name=model_name)
 
exec(score_sparkml)
 
with open("score_sparkml.py", "w") as file:
    file.write(score_sparkml)

In [None]:
from azureml.core.conda_dependencies import CondaDependencies 

myacienv = CondaDependencies.create(conda_packages=['scikit-learn','numpy','pandas']) #showing how to add libs as an eg. - not needed for this model.

with open("mydeployenv.yml","w") as f:
    f.write(myacienv.serialize_to_string())

In [None]:
#create AKS compute
#it may take 20-25 minutes to create a new cluster

from azureml.core.compute import AksCompute, ComputeTarget

# Use the default configuration (can also provide parameters to customize)
prov_config = AksCompute.provisioning_configuration()

aks_name = 'ps-aks-demo2' 

# Create the cluster
aks_target = ComputeTarget.create(workspace = ws, 
                                  name = aks_name, 
                                  provisioning_configuration = prov_config)

aks_target.wait_for_completion(show_output = True)

print(aks_target.provisioning_state)
print(aks_target.provisioning_errors)

In [None]:
#deploy to AKS
from azureml.core.webservice import AksWebservice, Webservice
from azureml.core.model import InferenceConfig

aks_config = AksWebservice.deploy_configuration(enable_app_insights=True)

inference_config = InferenceConfig(runtime = 'spark-py', 
                                   entry_script ='score_sparkml.py',
                                   conda_file ='mydeployenv.yml')

aks_service = Model.deploy(ws, 'ps-aks-service', [mymodel], inference_config, aks_config, aks_target)
aks_service.wait_for_deployment(show_output=True)

In [None]:
aks_service.deployment_status

In [None]:
#for using the Web HTTP API 
print(aks_service.scoring_uri)
print(aks_service.get_keys())

In [None]:
import json

#get the some sample data
test_data_path = "AdultCensusIncomeTest"
test = spark.read.parquet(test_data_path).limit(5)

test_json = json.dumps(test.toJSON().collect())

print(test_json)

In [None]:
#using data defined above predict if income is >50K (1) or <=50K (0)
aks_service.run(input_data=test_json)

In [None]:
#comment to not delete the web service
aks_service.delete()
#model.delete()
aks_target.delete() 

![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/azure-databricks/amlsdk/deploy-to-aks-existingimage-05.png)