Skip to content

Commit

Permalink
[CELEBORN-1725] Optimize performance of handling MapperEnd RPC in `…
Browse files Browse the repository at this point in the history
…LifecycleManager`

### What changes were proposed in this pull request?

As title

### Why are the changes needed?

Per https://github.com/databricks/scala-style-guide?tab=readme-ov-file#traversal-and-zipwithindex, this PR replaces the `exists` with `while` loops to optimize performance-sensitive code.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

GA

Conducted a local test of a Spark job where the shuffle write stage `Stage 0` included 210,000 tasks. The mean processing time for `MapperEnd` RPC requests decreased from 185ms to 0.2ms.

the `LifecycleManagerEndpoint` metrics before this PR:
```
histogram for LifecycleManagerEndpoint RPC metrics: class org.apache.celeborn.common.protocol.message.ControlMessages$MapperEnd
count: 229298
min: 8732
mean: 1.8579149415564203E8
p50: 1.10197579E8
p75: 3.1664286125E8
p95: 5.927707685999998E8
p99: 7.620123359800001E8
max: 862981475
```

after this PR.
```
histogram for LifecycleManagerEndpoint RPC metrics: class org.apache.celeborn.common.protocol.message.ControlMessages$MapperEnd
count: 229298
min: 6281
mean: 20155.255836575874
p50: 19623.5
p75: 23865.25
p95: 32006.749999999996
p99: 45231.44
max: 74217
```

count of slow `MapperEnd` requests before this PR:
```
$ grep "slow rpc detected:" driver.log | grep MapperEnd | wc -l
124801
```

after this PR:
```
$ grep "slow rpc detected:" driver.log | grep MapperEnd | wc -l
0
```

the fireflame before this PR

<img width="1917" alt="截屏2024-11-19 19 38 10" src="https://github.com/user-attachments/assets/16294992-0e51-402e-8da0-035a2226c7dd">

Closes #2905 from cfmcgrady/improve-mapper-end.

Authored-by: Fu Chen <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
(cherry picked from commit ed52e8d)
Signed-off-by: Wang, Fei <[email protected]>
  • Loading branch information
cfmcgrady authored and turboFei committed Nov 27, 2024
1 parent 4bf450d commit e8c59e5
Showing 1 changed file with 13 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -251,10 +251,10 @@ class ReducePartitionCommitHandler(
if (attempts(mapId) < 0) {
attempts(mapId) = attemptId
// Mapper with this attemptId finished, also check all other mapper finished or not.
(true, !attempts.exists(_ < 0))
(true, areAllMapperAttemptsFinished(attempts))
} else {
// Mapper with another attemptId finished, skip this request
(false, !attempts.exists(_ < 0))
(false, false)
}
}
}
Expand Down Expand Up @@ -333,4 +333,15 @@ class ReducePartitionCommitHandler(

(timeout <= 0, stageEndTimeout - timeout)
}

private def areAllMapperAttemptsFinished(attempts: Array[Int]): Boolean = {
var i = attempts.length - 1
while (i >= 0) {
if (attempts(i) < 0) {
return false
}
i -= 1
}
true
}
}

0 comments on commit e8c59e5

Please sign in to comment.