Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

repartition-based fallback for hash aggregate v3 #11712

Draft
wants to merge 50 commits into
base: branch-24.12
Choose a base branch
from

Conversation

binmahone
Copy link
Collaborator

@binmahone binmahone commented Nov 8, 2024

This PR replaces #11116, since there has been too many differences with #11116.

Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
binmahone and others added 20 commits August 20, 2024 16:35
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
…gether small buckets

Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
@binmahone binmahone marked this pull request as draft November 8, 2024 07:21
@binmahone binmahone changed the title 240821 repartition agg v3 repartition-based fallback for hash aggregate v3 Nov 8, 2024
@abellina abellina requested review from abellina and revans2 and removed request for revans2 November 8, 2024 14:33
Copy link
Collaborator

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have not finished yet. Could you post an explanation of the changes? I see places in the code that appear to have duplicate functionality. Not to mention we have the old sort based agg code completely duplicating a lot of the newer hash re-partition based code.

I really just want to understand what the work flow is supposed to be?


override def serializeStream(out: OutputStream): SerializationStream = new SerializationStream {
private[this] val dOut: DataOutputStream =
new DataOutputStream(new BufferedOutputStream(out))

override def writeValue[T: ClassTag](value: T): SerializationStream = {
val start = System.nanoTime()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is going to include the I/O time for writing to out, which can include compression/encryption in addition to I/O depending on the shuffle manager used. Some that do the processing in a background thread will not show this, but the default implementations will show it.

}
}

def voluntaryRelease(context: TaskContext): Unit = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please have some docs here to explain what is going on? This feels like a totally different feature from hash re-partitioning, and I would like to measure performance changes for it separate from anything with hash aggregation changes.

@@ -551,6 +551,14 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern")
.integerConf
.createWithDefault(2)

val ENABLE_VOLUNTARY_GPU_RELEASE_CHECK = conf("spark.rapids.gpu.voluntaryReleaseCheck")
.doc("If true, the plugin will check if voluntary release of GPU is forbidden, " +
"e.g. when GpuAggregateExec still have more output batches to offer." +
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing a space at the end

.checkValues(Set("sort", "repartition"))
.createWithDefault("repartition")

// todo: remove this
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have an follow on issue for that.

@@ -1558,6 +1566,25 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern")
.checkValue(v => v >= 0 && v <= 1, "The ratio value must be in [0, 1].")
.createWithDefault(1.0)

val AGG_OUTPUT_SIZE_RATIO = conf("spark.rapids.sql.agg.outputSizeRatioToBatchSize")
.doc("The ratio of the output size of an aggregation to the batch size. ")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this used and how is it different from spark.rapids.sql.agg.skipAggPassReductionRatio? It is not explained well at all here.

@@ -117,8 +117,8 @@ class SpillableColumnarBatchImpl (
}

override def getColumnarBatch(): ColumnarBatch = {
GpuSemaphore.acquireIfNecessary(TaskContext.get())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? Is this because we can have locked the rapids buffer while blocked waiting to get on the semaphore? Did you see this show up in practice?

This feels like a bug fix and not a part of reparation-based fallback. I would much rather see this as a separate issue/PR.

import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch

object SBAggregateUtils {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we not use abbreviations? If this is Sort Based? can we just call it SortFallbackAggregateUtils or something similar?

@@ -335,7 +513,10 @@ class AggHelper(
// We need to merge the aggregated batches into 1 before calling post process,
// if the aggregate code had to split on a retry
if (aggregatedSeq.size > 1) {
val concatted = concatenateBatches(metrics, aggregatedSeq)
val concatted =
withResource(aggregatedSeq) { _ =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am confused by this? Was this a bug? This change feels wrong to me.

concatenateBatches has the contract that it will either close everything in toConcat, or if there is a single item in the sequence it will just return it without closing anything. By putting it within a withResource it looks like we are going to double close the data in aggregatedSeq.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SpillableColumnarBatch has the nasty habit (?) of hiding double closes from us (https://github.com/NVIDIA/spark-rapids/blob/branch-24.12/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala#L137). I'd like to remove this behavior with my spillable changes.

@abellina
Copy link
Collaborator

abellina commented Nov 8, 2024

I will review this today

realIter = Some(ConcatIterator.apply(firstPassIter,
(aggOutputSizeRatio * configuredTargetBatchSize).toLong
))
firstPassAggToggle.set(false)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that this does what you think that it does. The line that reads this is used to create an iterator. It is not within an iterator which decides if we should or should not do the agg. I added in some print statements and I have verified that it does indeed agg for every batch, even if the first batch set this to false. Which is a good thing because if you disabled the initial aggregation on something where the output types do not match the input types you would get a crash or data corruption.


// Handle the case of skipping second and third pass of aggregation
// This only work when spark.rapids.sql.agg.skipAggPassReductionRatio < 1
if (!firstBatchChecked && firstPassIter.hasNext
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are doing an aggregate every time, would it be better to check each batch and skip repartitioning if the batch stayed large?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants