Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[jvm-packages] Support Ranker #10823

Merged
merged 5 commits into from
Sep 21, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,73 @@ class GpuXGBoostPluginSuite extends GpuTestSuite {
}
}

test("Ranker: XGBoost-Spark should match xgboost4j") {
withGpuSparkSession() { spark =>
import spark.implicits._

val trainPath = writeFile(Ranking.train.toDF("label", "weight", "group", "c1", "c2", "c3"))
val testPath = writeFile(Ranking.test.toDF("label", "weight", "group", "c1", "c2", "c3"))

val df = spark.read.parquet(trainPath)
val testdf = spark.read.parquet(testPath)

val features = Array("c1", "c2", "c3")
val featuresIndices = features.map(df.schema.fieldIndex)
val label = "label"
val group = "group"

val numRound = 100
val xgboostParams: Map[String, Any] = Map(
"device" -> "cuda",
"objective" -> "rank:ndcg"
)

val ranker = new XGBoostRanker(xgboostParams)
.setFeaturesCol(features)
.setLabelCol(label)
.setNumRound(numRound)
.setLeafPredictionCol("leaf")
.setContribPredictionCol("contrib")
.setGroupCol(group)
.setDevice("cuda")

val xgb4jModel = withResource(new GpuColumnBatch(
Table.readParquet(new File(trainPath)))) { batch =>
val cb = new CudfColumnBatch(batch.select(featuresIndices),
batch.select(df.schema.fieldIndex(label)), null, null,
batch.select(df.schema.fieldIndex(group)))
val qdm = new QuantileDMatrix(Seq(cb).iterator, ranker.getMissing,
ranker.getMaxBins, ranker.getNthread)
ScalaXGBoost.train(qdm, xgboostParams, numRound)
}

val (xgb4jLeaf, xgb4jContrib, xgb4jPred) = withResource(new GpuColumnBatch(
Table.readParquet(new File(testPath)))) { batch =>
val cb = new CudfColumnBatch(batch.select(featuresIndices), null, null, null, null
)
val qdm = new DMatrix(cb, ranker.getMissing, ranker.getNthread)
(xgb4jModel.predictLeaf(qdm), xgb4jModel.predictContrib(qdm),
xgb4jModel.predict(qdm))
}

val rows = ranker.fit(df).transform(testdf).collect()

// Check Leaf
val xgbSparkLeaf = rows.map(row => row.getAs[DenseVector]("leaf").toArray.map(_.toFloat))
checkEqual(xgb4jLeaf, xgbSparkLeaf)

// Check contrib
val xgbSparkContrib = rows.map(row =>
row.getAs[DenseVector]("contrib").toArray.map(_.toFloat))
checkEqual(xgb4jContrib, xgbSparkContrib)

// Check prediction
val xgbSparkPred = rows.map(row =>
Array(row.getAs[Double]("prediction").toFloat))
checkEqual(xgb4jPred, xgbSparkPred)
}
}

def writeFile(df: Dataset[_]): String = {
def listFiles(directory: String): Array[String] = {
val dir = new File(directory)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
Copyright (c) 2024 by Contributors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package ml.dmlc.xgboost4j.scala.spark

import org.apache.spark.ml.{PredictionModel, Predictor}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.util.{DefaultParamsReadable, Identifiable, MLReadable, MLReader}
import org.apache.spark.ml.xgboost.SparkUtils
import org.apache.spark.sql.Dataset
import ml.dmlc.xgboost4j.scala.Booster
import ml.dmlc.xgboost4j.scala.spark.XGBoostRanker._uid
import ml.dmlc.xgboost4j.scala.spark.params.HasGroupCol
import ml.dmlc.xgboost4j.scala.spark.params.LearningTaskParams.RANKER_OBJS
import org.apache.spark.sql.types.{DataType, DoubleType, StructType}

class XGBoostRanker(override val uid: String,
private val xgboostParams: Map[String, Any])
extends Predictor[Vector, XGBoostRanker, XGBoostRankerModel]
with XGBoostEstimator[XGBoostRanker, XGBoostRankerModel] with HasGroupCol {

def this() = this(_uid, Map[String, Any]())

def this(uid: String) = this(uid, Map[String, Any]())

def this(xgboostParams: Map[String, Any]) = this(_uid, xgboostParams)

def setGroupCol(value: String): XGBoostRanker = set(groupCol, value)

xgboost2SparkParams(xgboostParams)

/**
* Validate the parameters before training, throw exception if possible
*/
override protected[spark] def validate(dataset: Dataset[_]): Unit = {
super.validate(dataset)

require(isDefinedNonEmpty(groupCol), "groupCol needs to be set")

// If the objective is set explicitly, it must be in RANKER_OBJS
if (isSet(objective)) {
val tmpObj = getObjective
require(RANKER_OBJS.contains(tmpObj),
s"Wrong objective for XGBoostRanker, supported objs: ${RANKER_OBJS.mkString(",")}")
} else {
setObjective("rank:ndcg")
}
}

/**
* Preprocess the dataset to meet the xgboost input requirement
*
* @param dataset
* @return
*/
override private[spark] def preprocess(dataset: Dataset[_]): (Dataset[_], ColumnIndices) = {
val (output, columnIndices) = super.preprocess(dataset)
(output.sortWithinPartitions(getGroupCol), columnIndices)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this operation interact with spark-rapids plugin if enabled? Any implications on GPU memory?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this preprocess even get called if plugin is enabled? If not, partition might not be sorted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad. Fixed this issue. Please help review it again. Thx very much.

}

override protected def createModel(
booster: Booster,
summary: XGBoostTrainingSummary): XGBoostRankerModel = {
new XGBoostRankerModel(uid, booster, Option(summary))
}

override protected def validateAndTransformSchema(
schema: StructType,
fitting: Boolean,
featuresDataType: DataType): StructType =
SparkUtils.appendColumn(schema, $(predictionCol), DoubleType)
}

object XGBoostRanker extends DefaultParamsReadable[XGBoostRanker] {
private val _uid = Identifiable.randomUID("xgbranker")
}

class XGBoostRankerModel private[ml](val uid: String,
val nativeBooster: Booster,
val summary: Option[XGBoostTrainingSummary] = None)
extends PredictionModel[Vector, XGBoostRankerModel]
with RankerRegressorBaseModel[XGBoostRankerModel] with HasGroupCol {

def this(uid: String) = this(uid, null)

def setGroupCol(value: String): XGBoostRankerModel = set(groupCol, value)

override def copy(extra: ParamMap): XGBoostRankerModel = {
val newModel = copyValues(new XGBoostRankerModel(uid, nativeBooster, summary), extra)
newModel.setParent(parent)
}

override def predict(features: Vector): Double = {
val values = predictSingleInstance(features)
values(0)
}
}

object XGBoostRankerModel extends MLReadable[XGBoostRankerModel] {
override def read: MLReader[XGBoostRankerModel] = new ModelReader

private class ModelReader extends XGBoostModelReader[XGBoostRankerModel] {
override def load(path: String): XGBoostRankerModel = {
val xgbModel = loadBooster(path)
val meta = SparkUtils.loadMetadata(path, sc)
val model = new XGBoostRankerModel(meta.uid, xgbModel, None)
meta.getAndSetParams(model)
model
}
}
}
Loading
Loading