Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

repartition-based fallback for hash aggregate #11116

Open
wants to merge 11 commits into
base: branch-24.12
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/additional-functionality/advanced_configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ Name | Description | Default Value | Applicable at
<a name="shuffle.ucx.activeMessages.forceRndv"></a>spark.rapids.shuffle.ucx.activeMessages.forceRndv|Set to true to force 'rndv' mode for all UCX Active Messages. This should only be required with UCX 1.10.x. UCX 1.11.x deployments should set to false.|false|Startup
<a name="shuffle.ucx.managementServerHost"></a>spark.rapids.shuffle.ucx.managementServerHost|The host to be used to start the management server|null|Startup
<a name="shuffle.ucx.useWakeup"></a>spark.rapids.shuffle.ucx.useWakeup|When set to true, use UCX's event-based progress (epoll) in order to wake up the progress thread when needed, instead of a hot loop.|true|Startup
<a name="sql.agg.fallbackAlgorithm"></a>spark.rapids.sql.agg.fallbackAlgorithm|When agg cannot be done in a single pass, use sort-based fallback or repartition-based fallback.|sort|Runtime
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood we make sort default to understand regression. Can we set this by default on to gain more exercises? And eventually, we should deprecate sort based fallback within aggregation.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

per our offline discussion , this should be unnecessary now, right?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Particular for this one, I still felt to make it default ON and this is more like an alternative to sort-based approach.

<a name="sql.agg.skipAggPassReductionRatio"></a>spark.rapids.sql.agg.skipAggPassReductionRatio|In non-final aggregation stages, if the previous pass has a row reduction ratio greater than this value, the next aggregation pass will be skipped.Setting this to 1 essentially disables this feature.|1.0|Runtime
<a name="sql.allowMultipleJars"></a>spark.rapids.sql.allowMultipleJars|Allow multiple rapids-4-spark, spark-rapids-jni, and cudf jars on the classpath. Spark will take the first one it finds, so the version may not be expected. Possisble values are ALWAYS: allow all jars, SAME_REVISION: only allow jars with the same revision, NEVER: do not allow multiple jars at all.|SAME_REVISION|Startup
<a name="sql.castDecimalToFloat.enabled"></a>spark.rapids.sql.castDecimalToFloat.enabled|Casting from decimal to floating point types on the GPU returns results that have tiny difference compared to results returned from CPU.|true|Runtime
Expand Down
7 changes: 6 additions & 1 deletion integration_tests/src/main/python/hash_aggregate_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
_float_conf_skipagg = copy_and_update(_float_smallbatch_conf,
{'spark.rapids.sql.agg.skipAggPassReductionRatio': '0'})

_float_conf_repartition_fallback = copy_and_update(_float_smallbatch_conf,
{'spark.rapids.sql.agg.fallbackAlgorithm': 'repartition'})

_float_conf_partial = copy_and_update(_float_conf,
{'spark.rapids.sql.hashAgg.replaceMode': 'partial'})

Expand Down Expand Up @@ -225,7 +228,9 @@ def get_params(init_list, marked_params=[]):


# Run these tests with in 5 modes, all on the GPU
_confs = [_float_conf, _float_smallbatch_conf, _float_conf_skipagg, _float_conf_final, _float_conf_partial]
_confs = [_float_conf, _float_smallbatch_conf,
_float_conf_skipagg, _float_conf_repartition_fallback,
_float_conf_final, _float_conf_partial]

# Pytest marker for list of operators allowed to run on the CPU,
# esp. useful in partial and final only modes.
Expand Down
16 changes: 15 additions & 1 deletion sql-plugin/src/main/scala/com/nvidia/spark/rapids/Arm.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package com.nvidia.spark.rapids

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.util.control.ControlThrowable

import com.nvidia.spark.rapids.RapidsPluginImplicits._
Expand Down Expand Up @@ -134,6 +134,20 @@ object Arm extends ArmScalaSpecificImpl {
}
}

/** Executes the provided code block, closing the resources only if an exception occurs */
def closeOnExcept[T <: AutoCloseable, V](r: ListBuffer[T])(block: ListBuffer[T] => V): V = {
try {
block(r)
} catch {
case t: ControlThrowable =>
// Don't close for these cases..
throw t
case t: Throwable =>
r.safeClose(t)
throw t
}
}


/** Executes the provided code block, closing the resources only if an exception occurs */
def closeOnExcept[T <: AutoCloseable, V](r: mutable.Queue[T])(block: mutable.Queue[T] => V): V = {
Expand Down
Loading