From 1ae3678dc9930d000ae45f6666c5460c6448c67a Mon Sep 17 00:00:00 2001 From: "Corey J. Nolet" Date: Thu, 11 Jul 2019 13:38:50 -0400 Subject: [PATCH] Making requested updated for PR --- README.md | 4 +-- ci/gpu/build.sh | 2 +- cpp/DEVELOPER_GUIDE.md | 3 +- cpp/comms/std/CMakeLists.txt | 2 +- python/cuml/dask/cluster/kmeans.py | 58 ++++++++++++++++++++++++++++-- 5 files changed, 60 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 64e0c74918..a79e81c20a 100644 --- a/README.md +++ b/README.md @@ -39,9 +39,9 @@ Output: dtype: int32 ``` -Using [Dask](https://www.dask.org) cuML also features multi-GPU and multi-node-multi-GPU operation for a +cuML also features multi-GPU and multi-node-multi-GPU operation, using [Dask](https://www.dask.org), for a growing list of algorithms. The following Python snippet reads input from a CSV file and performs -a NearestNeighbors query across a cluster of Dask workers, using multiple GPUs: +a NearestNeighbors query across a cluster of Dask workers, using multiple GPUs on a single node: ```python # Create a Dask CUDA cluster w/ one worker per device from dask_cuda import LocalCUDACluster diff --git a/ci/gpu/build.sh b/ci/gpu/build.sh index 44d80663ce..02c6938ade 100644 --- a/ci/gpu/build.sh +++ b/ci/gpu/build.sh @@ -49,7 +49,7 @@ conda install -c conda-forge -c rapidsai -c rapidsai-nightly -c nvidia \ lapack cmake==3.14.3 \ umap-learn \ libclang \ - nccl=2 \ + nccl>=2.4 \ dask \ distributed \ dask-cudf \ diff --git a/cpp/DEVELOPER_GUIDE.md b/cpp/DEVELOPER_GUIDE.md index fa2ea4fc0d..1082264286 100644 --- a/cpp/DEVELOPER_GUIDE.md +++ b/cpp/DEVELOPER_GUIDE.md @@ -275,8 +275,7 @@ void ml_algo(const ML::cumlHandle& handle, ...) ## Multi GPU -The multi GPU paradigm of cuML is **O**ne **P**rocess per **G**PU (OPG). Each algorithm should be implemented in a way that it can run with a single GPU without any dependencies to any communication library. A multi GPU implementation can use the methods offered by the class `MLCommon::cumlCommunicator` from [cuml_comms_int.hpp](src_prims/src/common/cuml_comms_int.hpp) for inter rank/GPU communication. It is the responsibility of the user of cuML to create an initialized instance of `MLCommon::cumlCommunicator`. - +The multi GPU paradigm of cuML is **O**ne **P**rocess per **G**PU (OPG). Each algorithm should be implemented in a way that it can run with a single GPU without any specific dependencies to a particular communication library. A multi GPU implementation can use the methods offered by the class `MLCommon::cumlCommunicator` from [cuml_comms_int.hpp](src_prims/src/common/cuml_comms_int.hpp) for inter rank/GPU communication. It is the responsibility of the user of cuML to create an initialized instance of `MLCommon::cumlCommunicator`. E.g. with a CUDA-aware MPI a cuML algorithm could use code like this for collective communications: diff --git a/cpp/comms/std/CMakeLists.txt b/cpp/comms/std/CMakeLists.txt index 42466785b0..e3753bcd4e 100644 --- a/cpp/comms/std/CMakeLists.txt +++ b/cpp/comms/std/CMakeLists.txt @@ -14,7 +14,7 @@ # limitations under the License. # -cmake_minimum_required(VERSION 3.12 FATAL_ERROR) +cmake_minimum_required(VERSION 3.13 FATAL_ERROR) project(cuML-comms VERSION 0.9.0 LANGUAGES CXX CUDA) set(CUML_DIR ${PROJECT_SOURCE_DIR}/../.. CACHE STRING "Path to the cuML repo") diff --git a/python/cuml/dask/cluster/kmeans.py b/python/cuml/dask/cluster/kmeans.py index 0588d6e635..7971f392d3 100644 --- a/python/cuml/dask/cluster/kmeans.py +++ b/python/cuml/dask/cluster/kmeans.py @@ -27,13 +27,23 @@ class KMeans(CommsBase): """ def __init__(self, n_clusters=8, init_method="random", verbose=0): + """ + Constructor for distributed KMeans model + :param n_clusters: Number of clusters to fit + :param init_method: Method for finding initial centroids + :param verbose: Print useful info while executing + """ super(KMeans, self).__init__(comms_p2p=False) self.init_(n_clusters=n_clusters, init_method=init_method, verbose=verbose) def init_(self, n_clusters, init_method, verbose=0): """ - Creates local kmeans instance on each worker + Creates a local KMeans instance on each worker + :param n_clusters: Number of clusters to fit + :param init_method: Method for finding initial centroids + :param verbose: Print useful info while executing + :return: """ self.init() @@ -51,17 +61,44 @@ def init_(self, n_clusters, init_method, verbose=0): def func_build_kmeans_(handle, n_clusters, init_method, verbose, r): """ Create local KMeans instance on worker + :param handle: instance of cuml.handle.Handle + :param n_clusters: Number of clusters to fit + :param init_method: Method for finding initial centroids + :param verbose: Print useful info while executing + :param r: Stops memoization caching """ return cumlKMeans(handle=handle, init=init_method, n_clusters=n_clusters, verbose=verbose) @staticmethod - def func_fit(model, df, r): return model.fit(df) + def func_fit(model, df, r): + """ + Runs on each worker to call fit on local KMeans instance + :param model: Local KMeans instance + :param df: cudf.Dataframe to use + :param r: Stops memoizatiion caching + :return: The fit model + """ + return model.fit(df) @staticmethod - def func_predict(model, df, r): return model.predict(df) + def func_predict(model, df, r): + """ + Runs on each worker to call fit on local KMeans instance + :param model: Local KMeans instance + :param df: cudf.Dataframe to use + :param r: Stops memoization caching + :return: cudf.Series with predictions + """ + return model.predict(df) def run_model_func_on_dask_cudf(self, func, X): + """ + Runs a function on a local KMeans instance on each worker + :param func: The function to execute on each worker + :param X: Input dask_cudf.Dataframe + :return: Futures containing results of func + """ gpu_futures = self.client.sync(extract_ddf_partitions, X) worker_model_map = dict(map(lambda x: (x[0], x[1]), self.kmeans)) @@ -75,12 +112,27 @@ def run_model_func_on_dask_cudf(self, func, X): return f def fit(self, X): + """ + Fits a distributed KMeans model + :param X: dask_cudf.Dataframe to fit + :return: This KMeans instance + """ self.run_model_func_on_dask_cudf(KMeans.func_fit, X) return self def predict(self, X): + """ + Predicts the labels using a distributed KMeans model + :param X: dask_cudf.Dataframe to predict + :return: A dask_cudf.Dataframe containing label predictions + """ f = self.run_model_func_on_dask_cudf(KMeans.func_predict, X) return to_dask_cudf(f) def fit_predict(self, X): + """ + Calls fit followed by predict using a distributed KMeans model + :param X: dask_cudf.Dataframe to fit & predict + :return: A dask_cudf.Dataframe containing label predictions + """ return self.fit(X).predict(X)