Skip to content

Commit

Permalink
Making requested updated for PR
Browse files Browse the repository at this point in the history
  • Loading branch information
cjnolet committed Jul 11, 2019
1 parent a375a69 commit 1ae3678
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 9 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ Output:
dtype: int32
```

Using [Dask](https://www.dask.org) cuML also features multi-GPU and multi-node-multi-GPU operation for a
cuML also features multi-GPU and multi-node-multi-GPU operation, using [Dask](https://www.dask.org), for a
growing list of algorithms. The following Python snippet reads input from a CSV file and performs
a NearestNeighbors query across a cluster of Dask workers, using multiple GPUs:
a NearestNeighbors query across a cluster of Dask workers, using multiple GPUs on a single node:
```python
# Create a Dask CUDA cluster w/ one worker per device
from dask_cuda import LocalCUDACluster
Expand Down
2 changes: 1 addition & 1 deletion ci/gpu/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ conda install -c conda-forge -c rapidsai -c rapidsai-nightly -c nvidia \
lapack cmake==3.14.3 \
umap-learn \
libclang \
nccl=2 \
nccl>=2.4 \
dask \
distributed \
dask-cudf \
Expand Down
3 changes: 1 addition & 2 deletions cpp/DEVELOPER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,7 @@ void ml_algo(const ML::cumlHandle& handle, ...)

## Multi GPU

The multi GPU paradigm of cuML is **O**ne **P**rocess per **G**PU (OPG). Each algorithm should be implemented in a way that it can run with a single GPU without any dependencies to any communication library. A multi GPU implementation can use the methods offered by the class `MLCommon::cumlCommunicator` from [cuml_comms_int.hpp](src_prims/src/common/cuml_comms_int.hpp) for inter rank/GPU communication. It is the responsibility of the user of cuML to create an initialized instance of `MLCommon::cumlCommunicator`.

The multi GPU paradigm of cuML is **O**ne **P**rocess per **G**PU (OPG). Each algorithm should be implemented in a way that it can run with a single GPU without any specific dependencies to a particular communication library. A multi GPU implementation can use the methods offered by the class `MLCommon::cumlCommunicator` from [cuml_comms_int.hpp](src_prims/src/common/cuml_comms_int.hpp) for inter rank/GPU communication. It is the responsibility of the user of cuML to create an initialized instance of `MLCommon::cumlCommunicator`.

E.g. with a CUDA-aware MPI a cuML algorithm could use code like this for
collective communications:
Expand Down
2 changes: 1 addition & 1 deletion cpp/comms/std/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# limitations under the License.
#

cmake_minimum_required(VERSION 3.12 FATAL_ERROR)
cmake_minimum_required(VERSION 3.13 FATAL_ERROR)
project(cuML-comms VERSION 0.9.0 LANGUAGES CXX CUDA)

set(CUML_DIR ${PROJECT_SOURCE_DIR}/../.. CACHE STRING "Path to the cuML repo")
Expand Down
58 changes: 55 additions & 3 deletions python/cuml/dask/cluster/kmeans.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,23 @@ class KMeans(CommsBase):
"""

def __init__(self, n_clusters=8, init_method="random", verbose=0):
"""
Constructor for distributed KMeans model
:param n_clusters: Number of clusters to fit
:param init_method: Method for finding initial centroids
:param verbose: Print useful info while executing
"""
super(KMeans, self).__init__(comms_p2p=False)
self.init_(n_clusters=n_clusters, init_method=init_method,
verbose=verbose)

def init_(self, n_clusters, init_method, verbose=0):
"""
Creates local kmeans instance on each worker
Creates a local KMeans instance on each worker
:param n_clusters: Number of clusters to fit
:param init_method: Method for finding initial centroids
:param verbose: Print useful info while executing
:return:
"""
self.init()

Expand All @@ -51,17 +61,44 @@ def init_(self, n_clusters, init_method, verbose=0):
def func_build_kmeans_(handle, n_clusters, init_method, verbose, r):
"""
Create local KMeans instance on worker
:param handle: instance of cuml.handle.Handle
:param n_clusters: Number of clusters to fit
:param init_method: Method for finding initial centroids
:param verbose: Print useful info while executing
:param r: Stops memoization caching
"""
return cumlKMeans(handle=handle, init=init_method,
n_clusters=n_clusters, verbose=verbose)

@staticmethod
def func_fit(model, df, r): return model.fit(df)
def func_fit(model, df, r):
"""
Runs on each worker to call fit on local KMeans instance
:param model: Local KMeans instance
:param df: cudf.Dataframe to use
:param r: Stops memoizatiion caching
:return: The fit model
"""
return model.fit(df)

@staticmethod
def func_predict(model, df, r): return model.predict(df)
def func_predict(model, df, r):
"""
Runs on each worker to call fit on local KMeans instance
:param model: Local KMeans instance
:param df: cudf.Dataframe to use
:param r: Stops memoization caching
:return: cudf.Series with predictions
"""
return model.predict(df)

def run_model_func_on_dask_cudf(self, func, X):
"""
Runs a function on a local KMeans instance on each worker
:param func: The function to execute on each worker
:param X: Input dask_cudf.Dataframe
:return: Futures containing results of func
"""
gpu_futures = self.client.sync(extract_ddf_partitions, X)

worker_model_map = dict(map(lambda x: (x[0], x[1]), self.kmeans))
Expand All @@ -75,12 +112,27 @@ def run_model_func_on_dask_cudf(self, func, X):
return f

def fit(self, X):
"""
Fits a distributed KMeans model
:param X: dask_cudf.Dataframe to fit
:return: This KMeans instance
"""
self.run_model_func_on_dask_cudf(KMeans.func_fit, X)
return self

def predict(self, X):
"""
Predicts the labels using a distributed KMeans model
:param X: dask_cudf.Dataframe to predict
:return: A dask_cudf.Dataframe containing label predictions
"""
f = self.run_model_func_on_dask_cudf(KMeans.func_predict, X)
return to_dask_cudf(f)

def fit_predict(self, X):
"""
Calls fit followed by predict using a distributed KMeans model
:param X: dask_cudf.Dataframe to fit & predict
:return: A dask_cudf.Dataframe containing label predictions
"""
return self.fit(X).predict(X)

0 comments on commit 1ae3678

Please sign in to comment.