-
Notifications
You must be signed in to change notification settings - Fork 33
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* Cleanup Spark shuffle data after data is consumed * update comments --------- (cherry picked from commit 222e473) Signed-off-by: Peng Huo <[email protected]> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
- Loading branch information
1 parent
6001731
commit 5b3526f
Showing
7 changed files
with
112 additions
and
36 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
57 changes: 57 additions & 0 deletions
57
spark-sql-application/src/main/scala/org/apache/spark/sql/util/ShuffleCleaner.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.apache.spark.sql.util | ||
|
||
import org.apache.spark.{MapOutputTrackerMaster, SparkEnv} | ||
import org.apache.spark.internal.Logging | ||
import org.apache.spark.sql.SparkSession | ||
import org.apache.spark.sql.streaming.StreamingQueryListener | ||
|
||
/** | ||
* Clean Spark shuffle data after each microBatch. | ||
* https://github.com/opensearch-project/opensearch-spark/issues/302 | ||
*/ | ||
class ShuffleCleaner(spark: SparkSession) extends StreamingQueryListener with Logging { | ||
|
||
override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {} | ||
|
||
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = { | ||
ShuffleCleaner.cleanUp(spark) | ||
} | ||
|
||
override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {} | ||
} | ||
|
||
trait Cleaner { | ||
def cleanUp(spark: SparkSession) | ||
} | ||
|
||
object CleanerFactory { | ||
def cleaner(streaming: Boolean): Cleaner = { | ||
if (streaming) NoOpCleaner else ShuffleCleaner | ||
} | ||
} | ||
|
||
/** | ||
* No operation cleaner. | ||
*/ | ||
object NoOpCleaner extends Cleaner { | ||
override def cleanUp(spark: SparkSession): Unit = {} | ||
} | ||
|
||
/** | ||
* Spark shuffle data cleaner. | ||
*/ | ||
object ShuffleCleaner extends Cleaner with Logging { | ||
def cleanUp(spark: SparkSession): Unit = { | ||
logInfo("Before cleanUp Shuffle") | ||
val cleaner = spark.sparkContext.cleaner | ||
val masterTracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] | ||
val shuffleIds = masterTracker.shuffleStatuses.keys.toSet | ||
shuffleIds.foreach(shuffleId => cleaner.foreach(c => c.doCleanupShuffle(shuffleId, true))) | ||
logInfo("After cleanUp Shuffle") | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
23 changes: 23 additions & 0 deletions
23
spark-sql-application/src/test/scala/org/apache/spark/sql/util/CleanerFactoryTest.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.apache.spark.sql.util | ||
|
||
import org.scalatest.matchers.should.Matchers | ||
|
||
import org.apache.spark.SparkFunSuite | ||
|
||
class CleanerFactoryTest extends SparkFunSuite with Matchers { | ||
|
||
test("CleanerFactory should return NoOpCleaner when streaming is true") { | ||
val cleaner = CleanerFactory.cleaner(streaming = true) | ||
cleaner shouldBe NoOpCleaner | ||
} | ||
|
||
test("CleanerFactory should return ShuffleCleaner when streaming is false") { | ||
val cleaner = CleanerFactory.cleaner(streaming = false) | ||
cleaner shouldBe ShuffleCleaner | ||
} | ||
} |