Library to turn Azure ML Compute into Ray and Dask cluster
Published Dec 30 2021 04:17 PM 13.8K Views
Microsoft

Ray and Dask are two among the most popular frameworks to parallelize and scale Python computation. They are very helpful to speed up computing for data processing, hyperparameter tunning, reinforcement learning and model serving and many other scenarios.

For an Azure ML compute instance, we can easily install Ray and Dask to take advantage of parallel computing for all cores within the node. However, there is yet an easy way in Azure Machine Learning to extend this to a multi-node cluster when the computing and ML problems require the power of more than one nodes. One would need to setup a separate environment using VMs or K8s outside Azure ML to run multi-node Ray/Dask. This would mean losing all capabilities of Azure ML.

To address this gap, we have developed a library that can easily turn Azure ML compute instance and compute cluster into Ray and Dask cluster. The library does all the complex wirings and setup of a Ray cluster with Dask behind the scene while exposing a simple Ray context object for users perform parallel Python computing tasks. In addition, it is shipped with high performance Pyarrow APIs to access Azure storage and simple interface to install additional libraries.

The library also comes with support for both Interactive mode and job mode. Data scientist can perform fast interactive work with the cluster during exploratory phase then easily turn the code into the job mode with minimal change.

Checkout library repo at https://github.com/microsoft/ray-on-aml for details.

In this post, we'll walk through steps to setup and use the library

 

jamesnguyen_0-1640901046641.png

 

Installation of the library

  1. Prepare compute environment 

For Interactive use at your compute instance, create a compute cluster in the same vnet where your compute instance is. 

Check list

[ ] Azure Machine Learning Workspace

[ ] Virtual network/Subnet

[ ] Create Compute Instance in the Virtual Network

[ ] Create Compute Cluster in the same Virtual Network

Use azureml_py38 conda environment from (Jupyter) Notebook in Azure Machine Learning Studio.

Again, don't forget to create the virtual network for Compute Instance and Compute Cluster. Without it, they cannot communicate to each other.

   

    2.  Install library

 

 

 

 

 

 

 

pip install --upgrade ray-on-aml

 

 

 

 

 

 

 

Installing this library will also install ray[default]==1.9.1, pyarrow>= 5.0.0, dask[complete]==2021.12.0, adlfs==2021.10.0 and fsspec==2021.10.1

 

    3. Use cluster in interactive mode

       Run in interactive mode in compute instance's notebook. Notice the option ci_is_head to enable your current CI as head node.

 

 

 

 

 

 

from ray_on_aml.core import Ray_On_AML
ws = Workspace.from_config()
ray_on_aml =Ray_On_AML(ws=ws, compute_cluster =NAME_OF_COMPUTE_CLUSTER, additional_pip_packages=['torch==1.10.0', 'torchvision', 'sklearn'], maxnode=4)
ray = ray_on_aml.getRay()
# Note that by default, ci_is_head=True which means  compute instance as head node and all nodes in the remote compute cluster as workers 
# But if you want to use one of the nodes in the remote AML compute cluster is used as head node and the remaining are worker nodes.
# then simply specify ray = ray_on_aml.getRay(ci_is_head=False)
# To install additional library, use additional_pip_packages and additional_conda_packages parameters.

 

 

 

 

 

 

At this point, you have the ray object where you can use to perform various parallel computing tasks using ray API.

 

* Advanced usage:There are two arguments to Ray_On_AML() object initilization with to specify base configuration for the library with following default values. Although it's possible, you should not change the default values of base_conda_dep and base_pip_dep as it may break the package. Only do so when you need to customize the cluster default configuration such as ray version.

 

 

 

 

 

 

Ray_On_AML(ws=ws, compute_cluster ="Name_of_Compute_Cluster",base_conda_dep =['adlfs==2021.10.0','pip'],base_pip_dep = ['ray[tune]==1.9.1', 'xgboost_ray==0.1.5', 'dask==2021.12.0','pyarrow >= 5.0.0','fsspec==2021.10.1'])

 

 

 

 

 

 

    4. Use the cluster in job mode

 For use in an AML job, simply include ray_on_aml as a pip dependency then inside your script, do this to get ray

 

 

 

 

 

 

 

from ray_on_aml.core import Ray_On_AML
ray_on_aml =Ray_On_AML()
ray = ray_on_aml.getRay()

if ray: #in the headnode
    #logic to use Ray for distributed ML training, tunning or distributed data transformation with Dask

else:
    print("in worker node")

 

 

 

 

 

 

 

Example scenarios

  1. Perform big data analysis with Dask on Ray

 

 

 

 

 

 

from adlfs import AzureBlobFileSystem

abfs = AzureBlobFileSystem(account_name="azureopendatastorage",  container_name="isdweatherdatacontainer")
data = ray.data.read_parquet(["az://isdweatherdatacontainer/ISDWeather/year=2012/"], filesystem=abfs)
data1 = ray.data.read_parquet(["az://isdweatherdatacontainer/ISDWeather/year=2015/"], filesystem=abfs)
data2 = ray.data.read_parquet(["az://isdweatherdatacontainer/ISDWeather/year=2010/"], filesystem=abfs)
data3 = ray.data.read_parquet(["az://isdweatherdatacontainer/ISDWeather/year=2009/"], filesystem=abfs)
data4 = ray.data.read_parquet(["az://isdweatherdatacontainer/ISDWeather/year=2011/"], filesystem=abfs)
data5 = ray.data.read_parquet(["az://isdweatherdatacontainer/ISDWeather/year=2013/"], filesystem=abfs)
data6 = ray.data.read_parquet(["az://isdweatherdatacontainer/ISDWeather/year=2014/"], filesystem=abfs)
all_data =data.union(data1).union(data2).union(data3).union(data4).union(data5).union(data6)
print(all_data.count())
all_data_dask = data.to_dask().describe().compute()
print(all_data_dask)

 

 

 

 

 

 

 

        2. Distributed hypeparam tunning with ray.tune

 

 

 

 

 

 

 import sklearn.datasets
 import sklearn.metrics
 from sklearn.model_selection import train_test_split
 import xgboost as xgb

 from ray import tune


 def train_breast_cancer(config):
     # Load dataset
     data, labels = sklearn.datasets.load_breast_cancer(return_X_y=True)
     # Split into train and test set
     train_x, test_x, train_y, test_y = train_test_split(
         data, labels, test_size=0.25)
     # Build input matrices for XGBoost
     train_set = xgb.DMatrix(train_x, label=train_y)
     test_set = xgb.DMatrix(test_x, label=test_y)
     # Train the classifier
     results = {}
     xgb.train(
         config,
         train_set,
         evals=[(test_set, "eval")],
         evals_result=results,
         verbose_eval=False)
     # Return prediction accuracy
     accuracy = 1. - results["eval"]["error"][-1]
     tune.report(mean_accuracy=accuracy, done=True)


 config = {
     "objective": "binary:logistic",
     "eval_metric": ["logloss", "error"],
     "max_depth": tune.randint(1, 9),
     "min_child_weight": tune.choice([1, 2, 3]),
     "subsample": tune.uniform(0.5, 1.0),
     "eta": tune.loguniform(1e-4, 1e-1)
 }
 analysis = tune.run(
     train_breast_cancer,
     resources_per_trial={"cpu": 1},
     config=config,
     num_samples=10)

 

 

 

 

 

 

 

       3. Distributed XGBoost 

 

 

 

 

 

 

from xgboost_ray import RayXGBClassifier, RayParams
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split

seed = 42

X, y = load_breast_cancer(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(
    X, y, train_size=0.25, random_state=42
)

clf = RayXGBClassifier(
    n_jobs=4,  # In XGBoost-Ray, n_jobs sets the number of actors
    random_state=seed
)

# scikit-learn API will automatically conver the data
# to RayDMatrix format as needed.
# You can also pass X as a RayDMatrix, in which case
# y will be ignored.

clf.fit(X_train, y_train)

pred_ray = clf.predict(X_test)
print(pred_ray.shape)

pred_proba_ray = clf.predict_proba(X_test)
print(pred_proba_ray.shape)

# It is also possible to pass a RayParams object
# to fit/predict/predict_proba methods - will override
# n_jobs set during initialization

clf.fit(X_train, y_train, ray_params=RayParams(num_actors=4))

pred_ray = clf.predict(X_test, ray_params=RayParams(num_actors=4))
print(pred_ray.shape)

 

 

 

 

 

 

 

4. Use with in job mode with AML job

 

 

 

 

 

 

ws = Workspace.from_config()

compute_cluster = 'worker-cpu-v3'
maxnode =5
vm_size='STANDARD_DS3_V2'
vnet='rayvnet'
subnet='default'
exp ='ray_on_aml_job'
ws_detail = ws.get_details()
ws_rg = ws_detail['id'].split("/")[4]
vnet_rg=None
try:
    ray_cluster = ComputeTarget(workspace=ws, name=compute_cluster)

    print('Found existing cluster, use it.')
except ComputeTargetException:
    if vnet_rg is None:
        vnet_rg = ws_rg
    compute_config = AmlCompute.provisioning_configuration(vm_size=vm_size,
                                                        min_nodes=0, max_nodes=maxnode,
                                                        vnet_resourcegroup_name=vnet_rg,
                                                        vnet_name=vnet,
                                                        subnet_name=subnet)
    ray_cluster = ComputeTarget.create(ws, compute_cluster, compute_config)

    ray_cluster.wait_for_completion(show_output=True)


rayEnv = Environment.from_conda_specification(name = "rayEnv",
                                             file_path = "../examples/conda_env.yml")

# rayEnv = Environment.get(ws, "rayEnv", version=19)


src=ScriptRunConfig(source_directory='../examples/job',
                script='aml_job.py',
                environment=rayEnv,
                compute_target=ray_cluster,
                distributed_job_config=PyTorchConfiguration(node_count=maxnode),
                    # arguments = ["--master_ip",master_ip]
                )
run = Experiment(ws, exp).submit(src)

 

 

 

 

 

 

 

This is the code inside aml_job.py with details omitted for brevity 

 

 

 

 

 

 

 


if __name__ == "__main__":
    run = Run.get_context()
    ws = run.experiment.workspace
    account_key = ws.get_default_keyvault().get_secret("adls7-account-key")
    ray_on_aml =Ray_On_AML()
    ray = ray_on_aml.getRay()

    if ray: #in the headnode
        print("head node detected")

        datasets.MNIST("~/data", train=True, download=True)

        analysis = tune.run(train_mnist, config=search_space)
        print(ray.cluster_resources())
        print("data count result", get_data_count(account_key))

    else:
        print("in worker node")

 

 

 

 

 

 

5. View Ray dashboard

The easiest way to view Ray dashboard is using the connection from VSCode for Azure ML. Open VSCode to your Compute Instance, open a terminal windows, type http://127.0.0.1:8265/ then ctrl+click to open the Ray Dashboard in your local computer.

 

VS code terminalVS code terminal

This trick tells VScode to forward port to your local machine without having to setup ssh port forwarding using VScode's extension on the CI.

 

Ray DashboardRay Dashboard

 

 

 

        See more examples at quick start examples

        In partnership with Hyun Suk Shin.

3 Comments
Co-Authors
Version history
Last update:
‎Apr 29 2022 01:58 PM
Updated by: