-
Notifications
You must be signed in to change notification settings - Fork 234
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
repartition-based fallback for hash aggregate v3 #11712
base: branch-24.12
Are you sure you want to change the base?
repartition-based fallback for hash aggregate v3 #11712
Conversation
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
…tor leak Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Firestarman <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
…gether small buckets Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have not finished yet. Could you post an explanation of the changes? I see places in the code that appear to have duplicate functionality. Not to mention we have the old sort based agg code completely duplicating a lot of the newer hash re-partition based code.
I really just want to understand what the work flow is supposed to be?
|
||
override def serializeStream(out: OutputStream): SerializationStream = new SerializationStream { | ||
private[this] val dOut: DataOutputStream = | ||
new DataOutputStream(new BufferedOutputStream(out)) | ||
|
||
override def writeValue[T: ClassTag](value: T): SerializationStream = { | ||
val start = System.nanoTime() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is going to include the I/O time for writing to out, which can include compression/encryption in addition to I/O depending on the shuffle manager used. Some that do the processing in a background thread will not show this, but the default implementations will show it.
} | ||
} | ||
|
||
def voluntaryRelease(context: TaskContext): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we please have some docs here to explain what is going on? This feels like a totally different feature from hash re-partitioning, and I would like to measure performance changes for it separate from anything with hash aggregation changes.
@@ -551,6 +551,14 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern") | |||
.integerConf | |||
.createWithDefault(2) | |||
|
|||
val ENABLE_VOLUNTARY_GPU_RELEASE_CHECK = conf("spark.rapids.gpu.voluntaryReleaseCheck") | |||
.doc("If true, the plugin will check if voluntary release of GPU is forbidden, " + | |||
"e.g. when GpuAggregateExec still have more output batches to offer." + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing a space at the end
.checkValues(Set("sort", "repartition")) | ||
.createWithDefault("repartition") | ||
|
||
// todo: remove this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have an follow on issue for that.
@@ -1558,6 +1566,25 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern") | |||
.checkValue(v => v >= 0 && v <= 1, "The ratio value must be in [0, 1].") | |||
.createWithDefault(1.0) | |||
|
|||
val AGG_OUTPUT_SIZE_RATIO = conf("spark.rapids.sql.agg.outputSizeRatioToBatchSize") | |||
.doc("The ratio of the output size of an aggregation to the batch size. ") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is this used and how is it different from spark.rapids.sql.agg.skipAggPassReductionRatio? It is not explained well at all here.
@@ -117,8 +117,8 @@ class SpillableColumnarBatchImpl ( | |||
} | |||
|
|||
override def getColumnarBatch(): ColumnarBatch = { | |||
GpuSemaphore.acquireIfNecessary(TaskContext.get()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why? Is this because we can have locked the rapids buffer while blocked waiting to get on the semaphore? Did you see this show up in practice?
This feels like a bug fix and not a part of reparation-based fallback. I would much rather see this as a separate issue/PR.
import org.apache.spark.sql.types._ | ||
import org.apache.spark.sql.vectorized.ColumnarBatch | ||
|
||
object SBAggregateUtils { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we not use abbreviations? If this is Sort Based? can we just call it SortFallbackAggregateUtils or something similar?
@@ -335,7 +513,10 @@ class AggHelper( | |||
// We need to merge the aggregated batches into 1 before calling post process, | |||
// if the aggregate code had to split on a retry | |||
if (aggregatedSeq.size > 1) { | |||
val concatted = concatenateBatches(metrics, aggregatedSeq) | |||
val concatted = | |||
withResource(aggregatedSeq) { _ => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am confused by this? Was this a bug? This change feels wrong to me.
concatenateBatches has the contract that it will either close everything in toConcat, or if there is a single item in the sequence it will just return it without closing anything. By putting it within a withResource
it looks like we are going to double close the data in aggregatedSeq.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SpillableColumnarBatch
has the nasty habit (?) of hiding double closes from us (https://github.com/NVIDIA/spark-rapids/blob/branch-24.12/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala#L137). I'd like to remove this behavior with my spillable changes.
I will review this today |
realIter = Some(ConcatIterator.apply(firstPassIter, | ||
(aggOutputSizeRatio * configuredTargetBatchSize).toLong | ||
)) | ||
firstPassAggToggle.set(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that this does what you think that it does. The line that reads this is used to create an iterator. It is not within an iterator which decides if we should or should not do the agg. I added in some print statements and I have verified that it does indeed agg for every batch, even if the first batch set this to false. Which is a good thing because if you disabled the initial aggregation on something where the output types do not match the input types you would get a crash or data corruption.
|
||
// Handle the case of skipping second and third pass of aggregation | ||
// This only work when spark.rapids.sql.agg.skipAggPassReductionRatio < 1 | ||
if (!firstBatchChecked && firstPassIter.hasNext |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we are doing an aggregate every time, would it be better to check each batch and skip repartitioning if the batch stayed large?
This PR replaces #11116, since there has been too many differences with #11116.