Skip to content

Commit

Permalink
[CELEBORN-1725][FOLLOWUP] Optimize isAllMapTasksEnd performance
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Followup for #2905,

using the same logic to optimize `isAllMapTasksEnd` method.

### Why are the changes needed?
Address comments: #2905 (review)

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Same logic with #2905

Closes #2959 from turboFei/celeborn_1725_follow.

Authored-by: Wang, Fei <[email protected]>
Signed-off-by: Shuang <[email protected]>
  • Loading branch information
turboFei authored and RexXiong committed Nov 29, 2024
1 parent 3bf9192 commit c84733f
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 16 deletions.
40 changes: 40 additions & 0 deletions client/src/main/scala/org/apache/celeborn/client/ClientUtils.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.celeborn.client

object ClientUtils {

/**
* Check if all the mapper attempts are finished. If any of the attempts is not finished, return false.
* This method checks the attempts array in reverse order, which can be faster if the unfinished attempts
* are more likely to be towards the end of the array.
*
* @param attempts The mapper finished attemptId array. An attempt ID of -1 indicates that the mapper is not finished.
* @return True if all mapper attempts are finished, false otherwise.
*/
def areAllMapperAttemptsFinished(attempts: Array[Int]): Boolean = {
var i = attempts.length - 1
while (i >= 0) {
if (attempts(i) < 0) {
return false
}
i -= 1
}
true
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -862,8 +862,8 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
return
}

def isAllMaptaskEnd(shuffleId: Int): Boolean = {
!commitManager.getMapperAttempts(shuffleId).exists(_ < 0)
def areAllMapTasksEnd(shuffleId: Int): Boolean = {
ClientUtils.areAllMapperAttemptsFinished(commitManager.getMapperAttempts(shuffleId))
}

shuffleIds.synchronized {
Expand Down Expand Up @@ -912,7 +912,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
}
} else {
shuffleIds.values.filter(v => v._2).map(v => v._1).toSeq.reverse.find(
isAllMaptaskEnd) match {
areAllMapTasksEnd) match {
case Some(shuffleId) =>
val pbGetShuffleIdResponse = {
logDebug(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.collection.mutable

import com.google.common.cache.{Cache, CacheBuilder}

import org.apache.celeborn.client.{ShuffleCommittedInfo, WorkerStatusTracker}
import org.apache.celeborn.client.{ClientUtils, ShuffleCommittedInfo, WorkerStatusTracker}
import org.apache.celeborn.client.CommitManager.CommittedPartitionInfo
import org.apache.celeborn.client.LifecycleManager.{ShuffleAllocatedWorkers, ShuffleFailedWorkers}
import org.apache.celeborn.common.CelebornConf
Expand Down Expand Up @@ -251,7 +251,7 @@ class ReducePartitionCommitHandler(
if (attempts(mapId) < 0) {
attempts(mapId) = attemptId
// Mapper with this attemptId finished, also check all other mapper finished or not.
(true, areAllMapperAttemptsFinished(attempts))
(true, ClientUtils.areAllMapperAttemptsFinished(attempts))
} else {
// Mapper with another attemptId finished, skip this request
(false, false)
Expand Down Expand Up @@ -336,15 +336,4 @@ class ReducePartitionCommitHandler(

(timeout <= 0, stageEndTimeout - timeout)
}

private def areAllMapperAttemptsFinished(attempts: Array[Int]): Boolean = {
var i = attempts.length - 1
while (i >= 0) {
if (attempts(i) < 0) {
return false
}
i -= 1
}
true
}
}

0 comments on commit c84733f

Please sign in to comment.