Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pyspark] support gpu transform #9542

Merged
merged 4 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions python-package/xgboost/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,18 @@ def is_cudf_available() -> bool:
return False


def is_cupy_available() -> bool:
"""Check cupy package available or not"""
if importlib.util.find_spec("cupy") is None:
return False
try:
import cupy

return True
except ImportError:
return False
Comment on lines +95 to +100
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test for cuDF availability originally had the comment:

    # Checking by `importing` instead of check `importlib.util.find_spec("cudf") is not None`
WeichenXu123 marked this conversation as resolved.
    # because user might install cudf successfully but importing cudf raises issues (e.g. saying
    # running on mismatched cuda version)

which to me was weird, that's a mismanaged environment and I'm not sure it's necessary for xgboost to work around it (or even a good idea to workaround anything since users with cuDF installed might expect GPU to be used).



try:
import scipy.sparse as scipy_sparse
from scipy.sparse import csr_matrix as scipy_csr
Expand Down
85 changes: 81 additions & 4 deletions python-package/xgboost/spark/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@

import xgboost
from xgboost import XGBClassifier
from xgboost.compat import is_cudf_available
from xgboost.compat import is_cudf_available, is_cupy_available
from xgboost.core import Booster, _check_distributed_params
from xgboost.sklearn import DEFAULT_N_ESTIMATORS, XGBModel, _can_use_qdm
from xgboost.training import train as worker_train
Expand Down Expand Up @@ -242,6 +242,12 @@ class _SparkXGBParams(
TypeConverters.toList,
)

def set_device(self, value: str) -> "_SparkXGBParams":
"""Set device (cpu, cuda, gpu)"""
assert value in ("cpu", "cuda", "gpu")
self.set(self.device, value)
return self

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a check here:

def _check_distributed_params(kwargs: Dict[str, Any]) -> None:
.
StringView msg{R"(Invalid argument for `device`. Expected to be one of the following:
Not sure if it's a good idea to override existing checks, which creates new error messages. In this case, an assertion error with no error message.

Copy link
Contributor Author

@wbo4958 wbo4958 Sep 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems _check_distributed_params just checks the type of the value, it seems we also need to restrict it to be one of (CPU ,GPU, cuda)

@classmethod
def _xgb_cls(cls) -> Type[XGBModel]:
"""
Expand Down Expand Up @@ -1193,6 +1199,31 @@ def _post_transform(self, dataset: DataFrame, pred_col: Column) -> DataFrame:
dataset = dataset.drop(pred_struct_col)
return dataset

def _gpu_transform(self) -> bool:
"""If gpu is used to do the prediction, true to gpu prediction"""
wbo4958 marked this conversation as resolved.
Show resolved Hide resolved

if _is_local(_get_spark_session().sparkContext):
# if it's local model, we just use the internal "device"
return use_cuda(self.getOrDefault(self.device))

gpu_per_task = (
_get_spark_session()
.sparkContext.getConf()
.get("spark.task.resource.gpu.amount")
)

# User don't set gpu configurations, just use cpu
if gpu_per_task is None:
if use_cuda(self.getOrDefault(self.device)):
get_logger("XGBoost-PySpark").warning(
"Do the prediction on the CPUs since "
"no gpu configurations are set"
)
return False

# User already sets the gpu configurations, we just use the internal "device".
return use_cuda(self.getOrDefault(self.device))

def _transform(self, dataset: DataFrame) -> DataFrame:
# pylint: disable=too-many-statements, too-many-locals
# Save xgb_sklearn_model and predict_params to be local variable
Expand All @@ -1216,21 +1247,67 @@ def _transform(self, dataset: DataFrame) -> DataFrame:

_, schema = self._out_schema()

is_local = _is_local(_get_spark_session().sparkContext)
run_on_gpu = self._gpu_transform()

@pandas_udf(schema) # type: ignore
def predict_udf(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.Series]:
assert xgb_sklearn_model is not None
model = xgb_sklearn_model

from pyspark import TaskContext
trivialfis marked this conversation as resolved.
Show resolved Hide resolved

context = TaskContext.get()
assert context is not None

dev_ordinal = -1
if is_local:
if run_on_gpu and is_cupy_available():
import cupy as cp # pylint: disable=import-error

total_gpus = cp.cuda.runtime.getDeviceCount()
if total_gpus > 0:
partition_id = context.partitionId()
# For transform local mode, default the gpu_id to (partition id) % gpus.
trivialfis marked this conversation as resolved.
Show resolved Hide resolved
dev_ordinal = partition_id % total_gpus
elif run_on_gpu:
dev_ordinal = _get_gpu_id(context)

if dev_ordinal >= 0:
device = "cuda:" + str(dev_ordinal)
get_logger("XGBoost-PySpark").info(
"Do the inference with device: %s", device
)
model.set_params(device=device)
else:
get_logger("XGBoost-PySpark").info("Do the inference on the CPUs")

def to_gpu_if_possible(data: ArrayLike) -> ArrayLike:
"""Move the data to gpu if possible"""
if dev_ordinal >= 0:
import cudf # pylint: disable=import-error
trivialfis marked this conversation as resolved.
Show resolved Hide resolved
import cupy as cp # pylint: disable=import-error

# We must set the device after import cudf, which will change the device id to 0
# See https://github.com/rapidsai/cudf/issues/11386
cp.cuda.runtime.setDevice(dev_ordinal) # pylint: disable=I1101
df = cudf.DataFrame(data)
del data
return df
return data

for data in iterator:
if enable_sparse_data_optim:
X = _read_csr_matrix_from_unwrapped_spark_vec(data)
else:
if feature_col_names is not None:
X = data[feature_col_names]
tmp = data[feature_col_names]
else:
X = stack_series(data[alias.data])
tmp = stack_series(data[alias.data])
X = to_gpu_if_possible(tmp)

if has_base_margin:
base_margin = data[alias.margin].to_numpy()
base_margin = to_gpu_if_possible(data[alias.margin].to_numpy())
trivialfis marked this conversation as resolved.
Show resolved Hide resolved
else:
base_margin = None

Expand Down
4 changes: 2 additions & 2 deletions python-package/xgboost/spark/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from typing import Any, Callable, Dict, Optional, Set, Type

import pyspark
from pyspark import BarrierTaskContext, SparkContext, SparkFiles
from pyspark import BarrierTaskContext, SparkContext, SparkFiles, TaskContext
from pyspark.sql.session import SparkSession

from xgboost import Booster, XGBModel, collective
Expand Down Expand Up @@ -129,7 +129,7 @@ def _is_local(spark_context: SparkContext) -> bool:
return spark_context._jsc.sc().isLocal()


def _get_gpu_id(task_context: BarrierTaskContext) -> int:
def _get_gpu_id(task_context: TaskContext) -> int:
"""Get the gpu id from the task resources"""
if task_context is None:
# This is a safety check.
Expand Down
29 changes: 28 additions & 1 deletion tests/test_distributed/test_gpu_with_spark/test_gpu_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql import SparkSession

from xgboost.spark import SparkXGBClassifier, SparkXGBRegressor
from xgboost.spark import SparkXGBClassifier, SparkXGBClassifierModel, SparkXGBRegressor

gpu_discovery_script_path = "tests/test_distributed/test_gpu_with_spark/discover_gpu.sh"

Expand Down Expand Up @@ -242,3 +242,30 @@ def test_sparkxgb_regressor_feature_cols_with_gpu(spark_diabetes_dataset_feature
evaluator = RegressionEvaluator(metricName="rmse")
rmse = evaluator.evaluate(pred_result_df)
assert rmse <= 65.0


def test_gpu_transform(spark_iris_dataset) -> None:
trivialfis marked this conversation as resolved.
Show resolved Hide resolved
classifier = SparkXGBClassifier(device="cuda", num_workers=num_workers)
train_df, test_df = spark_iris_dataset
model: SparkXGBClassifierModel = classifier.fit(train_df)

# The model trained with GPUs, and transform with GPU configurations.
assert model._gpu_transform()

model.set_device("cpu")
assert not model._gpu_transform()
# without error
model.transform(test_df).collect()

classifier = SparkXGBClassifier(device="cpu", num_workers=num_workers)
model: SparkXGBClassifierModel = classifier.fit(train_df)

# The model trained with CPUs. Even with GPU configurations,
# still prefer transforming with CPUs
assert not model._gpu_transform()

# Set gpu transform explicilty.
model.set_device("cuda")
assert model._gpu_transform()
# without error
model.transform(test_df).collect()
28 changes: 28 additions & 0 deletions tests/test_distributed/test_with_spark/test_spark_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -888,6 +888,34 @@ def test_device_param(self, reg_data: RegData, clf_data: ClfData) -> None:
clf = SparkXGBClassifier(device="cuda")
clf._validate_params()

def test_gpu_transform(self, clf_data: ClfData) -> None:
"""local mode"""
classifier = SparkXGBClassifier(device="cpu")
model: SparkXGBClassifierModel = classifier.fit(clf_data.cls_df_train)

with tempfile.TemporaryDirectory() as tmpdir:
path = "file:" + tmpdir
model.write().overwrite().save(path)

# The model trained with CPU, transform defaults to cpu
assert not model._gpu_transform()

# without error
model.transform(clf_data.cls_df_test).collect()

model.set_device("cuda")
assert model._gpu_transform()

model_loaded = SparkXGBClassifierModel.load(path)

# The model trained with CPU, transform defaults to cpu
assert not model_loaded._gpu_transform()
# without error
model_loaded.transform(clf_data.cls_df_test).collect()

model_loaded.set_device("cuda")
assert model_loaded._gpu_transform()


class XgboostLocalTest(SparkTestCase):
def setUp(self):
Expand Down
Loading