diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index a660bccd2e68f..b2630b535ede3 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -20,6 +20,7 @@ package org.apache.spark import java.io.{ByteArrayInputStream, InputStream, IOException, ObjectInputStream, ObjectOutputStream} import java.nio.ByteBuffer import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit} +import java.util.concurrent.atomic.AtomicLongArray import java.util.concurrent.locks.ReentrantReadWriteLock import scala.collection @@ -39,7 +40,7 @@ import org.apache.spark.internal.LogKeys._ import org.apache.spark.internal.config._ import org.apache.spark.io.CompressionCodec import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv} -import org.apache.spark.scheduler.{MapStatus, MergeStatus, ShuffleOutputStatus} +import org.apache.spark.scheduler.{HighlyCompressedMapStatus, MapStatus, MergeStatus, ShuffleOutputStatus} import org.apache.spark.shuffle.MetadataFetchFailedException import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockId, ShuffleMergedBlockId} import org.apache.spark.util._ @@ -698,6 +699,12 @@ private[spark] class MapOutputTrackerMaster( /** Whether to compute locality preferences for reduce tasks */ private val shuffleLocalityEnabled = conf.get(SHUFFLE_REDUCE_LOCALITY_ENABLE) + private val enableOptimizeCompressedMapStatus = conf.get( + SHUFFLE_OPTIMIZE_COMPRESSED_MAP_STATUS) + + private val enableOptimizeCompressedConvertHighly = conf.get( + SHUFFLE_OPTIMIZE_COMPRESSED_CONVERT_HIGHLY_MAP_STATUS) + private val shuffleMigrationEnabled = conf.get(DECOMMISSION_ENABLED) && conf.get(STORAGE_DECOMMISSION_ENABLED) && conf.get(STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED) @@ -718,6 +725,8 @@ private[spark] class MapOutputTrackerMaster( // Exposed for testing val shuffleStatuses = new ConcurrentHashMap[Int, ShuffleStatus]().asScala + val mapOutputStatisticsCache = new ConcurrentHashMap[Int, AtomicLongArray]() + private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf) // requests for MapOutputTrackerMasterMessages @@ -815,6 +824,9 @@ private[spark] class MapOutputTrackerMaster( } def registerShuffle(shuffleId: Int, numMaps: Int, numReduces: Int): Unit = { + if (enableOptimizeCompressedMapStatus) { + mapOutputStatisticsCache.put(shuffleId, new AtomicLongArray(numReduces)) + } if (pushBasedShuffleEnabled) { if (shuffleStatuses.put(shuffleId, new ShuffleStatus(numMaps, numReduces)).isDefined) { throw new IllegalArgumentException("Shuffle ID " + shuffleId + " registered twice") @@ -839,7 +851,26 @@ private[spark] class MapOutputTrackerMaster( } def registerMapOutput(shuffleId: Int, mapIndex: Int, status: MapStatus): Unit = { - shuffleStatuses(shuffleId).addMapOutput(mapIndex, status) + if (enableOptimizeCompressedMapStatus && status != null) { + val numReducer = shuffleStatuses(shuffleId).mergeStatuses.length + val array = mapOutputStatisticsCache.get(shuffleId) + val uncompressedSizes = new Array[Long](numReducer) + for (i <- 0 until numReducer) { + val blockSize = status.getSizeForBlock(i) + array.getAndAdd(i, blockSize) + uncompressedSizes(i) = blockSize + } + if (enableOptimizeCompressedConvertHighly) { + // Convert to HighlyCompressedMapStatus for reduce Driver Memory + val highlyMapStatus = + HighlyCompressedMapStatus(status.location, uncompressedSizes, status.mapId) + shuffleStatuses(shuffleId).addMapOutput(mapIndex, highlyMapStatus) + } else { + shuffleStatuses(shuffleId).addMapOutput(mapIndex, status) + } + } else { + shuffleStatuses(shuffleId).addMapOutput(mapIndex, status) + } } /** Unregister map output information of the given shuffle, mapper and block manager */ @@ -1003,6 +1034,15 @@ private[spark] class MapOutputTrackerMaster( * Return statistics about all of the outputs for a given shuffle. */ def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = { + if (enableOptimizeCompressedMapStatus) { + val numReducer = dep.partitioner.numPartitions + val statistics = mapOutputStatisticsCache.get(dep.shuffleId) + val newArray = new Array[Long](dep.partitioner.numPartitions) + for (i <- 0 until numReducer) { + newArray(i) = statistics.get(i) + } + return new MapOutputStatistics(dep.shuffleId, newArray) + } shuffleStatuses(dep.shuffleId).withMapStatuses { statuses => val totalSizes = new Array[Long](dep.partitioner.numPartitions) val parallelAggThreshold = conf.get( diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index c58c371da20cf..e86edc5c4ae78 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1594,6 +1594,22 @@ package object config { .checkValue(v => v > 0, "The threshold should be positive.") .createWithDefault(10000000) + private[spark] val SHUFFLE_OPTIMIZE_COMPRESSED_MAP_STATUS = + ConfigBuilder("spark.shuffle.optimize.compressed.map.status") + .internal() + .doc("Using CompressedMapStatus optimize skewed job.") + .version("2.3.0") + .booleanConf + .createWithDefault(false) + + private[spark] val SHUFFLE_OPTIMIZE_COMPRESSED_CONVERT_HIGHLY_MAP_STATUS = + ConfigBuilder("spark.shuffle.optimize.compressed.convert.highly.map.status") + .internal() + .doc("CompressedMapStatus Convert to HighlyCompressedMapStatus for reduce Driver memory") + .version("2.3.0") + .booleanConf + .createWithDefault(false) + private[spark] val MAX_RESULT_SIZE = ConfigBuilder("spark.driver.maxResultSize") .doc("Size limit for results.") .version("1.2.0") diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index 26dc218c30c74..f6dc96bb40991 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -85,6 +85,31 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext { rpcEnv.shutdown() } + test("Optimize CompressedMapStatus") { + val rpcEnv = createRpcEnv("test") + val tracker = newTrackerMaster() + tracker.trackerEndpoint = rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, + new MapOutputTrackerMasterEndpoint(rpcEnv, tracker, conf)) + conf.set(SHUFFLE_OPTIMIZE_COMPRESSED_MAP_STATUS.key, "true") + tracker.registerShuffle(10, 2, MergeStatus.SHUFFLE_PUSH_DUMMY_NUM_REDUCES) + assert(tracker.containsShuffle(10)) + val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L)) + val size10000 = MapStatus.decompressSize(MapStatus.compressSize(10000L)) + tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000), + Array(1000L, 10000L), 5)) + tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000), + Array(10000L, 1000L), 6)) + val statuses = tracker.getMapSizesByExecutorId(10, 0) + assert(statuses.toSet === + Seq((BlockManagerId("a", "hostA", 1000), + ArrayBuffer((ShuffleBlockId(10, 5, 0), size1000, 0))), + (BlockManagerId("b", "hostB", 1000), + ArrayBuffer((ShuffleBlockId(10, 6, 0), size10000, 1)))).toSet) + assert(0 == tracker.getNumCachedSerializedBroadcast) + tracker.stop() + rpcEnv.shutdown() + } + test("master register and unregister shuffle") { val rpcEnv = createRpcEnv("test") val tracker = newTrackerMaster()