Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-50332][CORE] Optimize accurate CompressedMapStatus #48896

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 42 additions & 2 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark
import java.io.{ByteArrayInputStream, InputStream, IOException, ObjectInputStream, ObjectOutputStream}
import java.nio.ByteBuffer
import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}
import java.util.concurrent.atomic.AtomicLongArray
import java.util.concurrent.locks.ReentrantReadWriteLock

import scala.collection
Expand All @@ -39,7 +40,7 @@ import org.apache.spark.internal.LogKeys._
import org.apache.spark.internal.config._
import org.apache.spark.io.CompressionCodec
import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv}
import org.apache.spark.scheduler.{MapStatus, MergeStatus, ShuffleOutputStatus}
import org.apache.spark.scheduler.{HighlyCompressedMapStatus, MapStatus, MergeStatus, ShuffleOutputStatus}
import org.apache.spark.shuffle.MetadataFetchFailedException
import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockId, ShuffleMergedBlockId}
import org.apache.spark.util._
Expand Down Expand Up @@ -698,6 +699,12 @@ private[spark] class MapOutputTrackerMaster(
/** Whether to compute locality preferences for reduce tasks */
private val shuffleLocalityEnabled = conf.get(SHUFFLE_REDUCE_LOCALITY_ENABLE)

private val enableOptimizeCompressedMapStatus = conf.get(
SHUFFLE_OPTIMIZE_COMPRESSED_MAP_STATUS)

private val enableOptimizeCompressedConvertHighly = conf.get(
SHUFFLE_OPTIMIZE_COMPRESSED_CONVERT_HIGHLY_MAP_STATUS)

private val shuffleMigrationEnabled = conf.get(DECOMMISSION_ENABLED) &&
conf.get(STORAGE_DECOMMISSION_ENABLED) && conf.get(STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)

Expand All @@ -718,6 +725,8 @@ private[spark] class MapOutputTrackerMaster(
// Exposed for testing
val shuffleStatuses = new ConcurrentHashMap[Int, ShuffleStatus]().asScala

val mapOutputStatisticsCache = new ConcurrentHashMap[Int, AtomicLongArray]()

private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)

// requests for MapOutputTrackerMasterMessages
Expand Down Expand Up @@ -815,6 +824,9 @@ private[spark] class MapOutputTrackerMaster(
}

def registerShuffle(shuffleId: Int, numMaps: Int, numReduces: Int): Unit = {
if (enableOptimizeCompressedMapStatus) {
mapOutputStatisticsCache.put(shuffleId, new AtomicLongArray(numReduces))
}
if (pushBasedShuffleEnabled) {
if (shuffleStatuses.put(shuffleId, new ShuffleStatus(numMaps, numReduces)).isDefined) {
throw new IllegalArgumentException("Shuffle ID " + shuffleId + " registered twice")
Expand All @@ -839,7 +851,26 @@ private[spark] class MapOutputTrackerMaster(
}

def registerMapOutput(shuffleId: Int, mapIndex: Int, status: MapStatus): Unit = {
shuffleStatuses(shuffleId).addMapOutput(mapIndex, status)
if (enableOptimizeCompressedMapStatus && status != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can MapStatusesSerDeserBenchmark be used to demonstrate performance improvements?

val numReducer = shuffleStatuses(shuffleId).mergeStatuses.length
val array = mapOutputStatisticsCache.get(shuffleId)
val uncompressedSizes = new Array[Long](numReducer)
for (i <- 0 until numReducer) {
val blockSize = status.getSizeForBlock(i)
array.getAndAdd(i, blockSize)
uncompressedSizes(i) = blockSize
}
if (enableOptimizeCompressedConvertHighly) {
// Convert to HighlyCompressedMapStatus for reduce Driver Memory
val highlyMapStatus =
HighlyCompressedMapStatus(status.location, uncompressedSizes, status.mapId)
shuffleStatuses(shuffleId).addMapOutput(mapIndex, highlyMapStatus)
} else {
shuffleStatuses(shuffleId).addMapOutput(mapIndex, status)
}
} else {
shuffleStatuses(shuffleId).addMapOutput(mapIndex, status)
}
}

/** Unregister map output information of the given shuffle, mapper and block manager */
Expand Down Expand Up @@ -1003,6 +1034,15 @@ private[spark] class MapOutputTrackerMaster(
* Return statistics about all of the outputs for a given shuffle.
*/
def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = {
if (enableOptimizeCompressedMapStatus) {
val numReducer = dep.partitioner.numPartitions
val statistics = mapOutputStatisticsCache.get(dep.shuffleId)
val newArray = new Array[Long](dep.partitioner.numPartitions)
for (i <- 0 until numReducer) {
newArray(i) = statistics.get(i)
}
return new MapOutputStatistics(dep.shuffleId, newArray)
}
shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
val totalSizes = new Array[Long](dep.partitioner.numPartitions)
val parallelAggThreshold = conf.get(
Expand Down
16 changes: 16 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1594,6 +1594,22 @@ package object config {
.checkValue(v => v > 0, "The threshold should be positive.")
.createWithDefault(10000000)

private[spark] val SHUFFLE_OPTIMIZE_COMPRESSED_MAP_STATUS =
ConfigBuilder("spark.shuffle.optimize.compressed.map.status")
.internal()
.doc("Using CompressedMapStatus optimize skewed job.")
.version("2.3.0")
.booleanConf
.createWithDefault(false)

private[spark] val SHUFFLE_OPTIMIZE_COMPRESSED_CONVERT_HIGHLY_MAP_STATUS =
ConfigBuilder("spark.shuffle.optimize.compressed.convert.highly.map.status")
.internal()
.doc("CompressedMapStatus Convert to HighlyCompressedMapStatus for reduce Driver memory")
.version("2.3.0")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4.0.0?

.booleanConf
.createWithDefault(false)

private[spark] val MAX_RESULT_SIZE = ConfigBuilder("spark.driver.maxResultSize")
.doc("Size limit for results.")
.version("1.2.0")
Expand Down
25 changes: 25 additions & 0 deletions core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,31 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext {
rpcEnv.shutdown()
}

test("Optimize CompressedMapStatus") {
val rpcEnv = createRpcEnv("test")
val tracker = newTrackerMaster()
tracker.trackerEndpoint = rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME,
new MapOutputTrackerMasterEndpoint(rpcEnv, tracker, conf))
conf.set(SHUFFLE_OPTIMIZE_COMPRESSED_MAP_STATUS.key, "true")
tracker.registerShuffle(10, 2, MergeStatus.SHUFFLE_PUSH_DUMMY_NUM_REDUCES)
assert(tracker.containsShuffle(10))
val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L))
val size10000 = MapStatus.decompressSize(MapStatus.compressSize(10000L))
tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000),
Array(1000L, 10000L), 5))
tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000),
Array(10000L, 1000L), 6))
val statuses = tracker.getMapSizesByExecutorId(10, 0)
assert(statuses.toSet ===
Seq((BlockManagerId("a", "hostA", 1000),
ArrayBuffer((ShuffleBlockId(10, 5, 0), size1000, 0))),
(BlockManagerId("b", "hostB", 1000),
ArrayBuffer((ShuffleBlockId(10, 6, 0), size10000, 1)))).toSet)
assert(0 == tracker.getNumCachedSerializedBroadcast)
tracker.stop()
rpcEnv.shutdown()
}

test("master register and unregister shuffle") {
val rpcEnv = createRpcEnv("test")
val tracker = newTrackerMaster()
Expand Down