Skip to content

Commit

Permalink
sleep and then check timeout and config the timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
turboFei committed Nov 12, 2024
1 parent 6e3f43e commit f0ce882
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@ package org.apache.celeborn.service.deploy.master

import java.io.IOException
import java.net.BindException
import java.util.concurrent.TimeUnit

import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.util.{CelebornExitKind, Utils}

trait MasterClusterFeature extends Logging {
var masterInfo: (Master, Thread) = _

val maxRetries = 3
val masterWaitingTimeoutMs = TimeUnit.SECONDS.toMillis(30)

class RunnerWrap[T](code: => T) extends Thread {
override def run(): Unit = {
Expand Down Expand Up @@ -106,7 +109,6 @@ trait MasterClusterFeature extends Logging {
}

def setUpMaster(masterConf: Map[String, String] = null): Master = {
val timeout = 30000
val master = createMaster(masterConf)
val masterStartedSignal = Array(false)
val masterThread = new RunnerWrap({
Expand All @@ -122,13 +124,13 @@ trait MasterClusterFeature extends Logging {
masterThread.start()
masterInfo = (master, masterThread)
var masterStartWaitingTime = 0
Thread.sleep(3000)
while (!masterStartedSignal.head) {
logInfo("waiting for master node starting")
masterStartWaitingTime += 3000
if (masterStartWaitingTime >= timeout) {
if (masterStartWaitingTime >= masterWaitingTimeoutMs) {
throw new BindException("cannot start master rpc endpoint")
}
Thread.sleep(3000)
masterStartWaitingTime += 3000
}
master
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.celeborn.tests.spark

import java.util.concurrent.TimeUnit

import scala.util.Random

import org.apache.spark.SPARK_VERSION
Expand All @@ -34,6 +36,8 @@ import org.apache.celeborn.service.deploy.MiniClusterFeature
trait SparkTestBase extends AnyFunSuite
with Logging with MiniClusterFeature with BeforeAndAfterAll with BeforeAndAfterEach {

override val workersWaitingTimeoutMs: Long = TimeUnit.MINUTES.toMillis(1)

val Spark3OrNewer = SPARK_VERSION >= "3.0"
println(s"Spark version is $SPARK_VERSION, Spark3OrNewer: $Spark3OrNewer")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ trait MiniClusterFeature extends Logging {
val workerInfos = new mutable.HashMap[Worker, Thread]()
var workerConfForAdding: Map[String, String] = _

val maxRetries = 3
val masterWaitingTimeoutMs = TimeUnit.SECONDS.toMillis(30)
val workersWaitingTimeoutMs = TimeUnit.SECONDS.toMillis(30)

class RunnerWrap[T](code: => T) extends Thread {

override def run(): Unit = {
Expand Down Expand Up @@ -94,7 +98,7 @@ trait MiniClusterFeature extends Logging {
_.isInstanceOf[BindException]) =>
logError(s"failed to setup mini cluster, retrying (retry count: $retryCount)", e)
retryCount += 1
if (retryCount == 3) {
if (retryCount == maxRetries) {
logError("failed to setup mini cluster, reached the max retry count", e)
throw e
}
Expand Down Expand Up @@ -161,7 +165,6 @@ trait MiniClusterFeature extends Logging {
}

def setUpMaster(masterConf: Map[String, String] = null): Master = {
val timeout = 30000
val master = createMaster(masterConf)
val masterStartedSignal = Array(false)
val masterThread = new RunnerWrap({
Expand All @@ -177,21 +180,20 @@ trait MiniClusterFeature extends Logging {
masterThread.start()
masterInfo = (master, masterThread)
var masterStartWaitingTime = 0
Thread.sleep(5000)
while (!masterStartedSignal.head) {
logInfo("waiting for master node starting")
masterStartWaitingTime += 5000
if (masterStartWaitingTime >= timeout) {
if (masterStartWaitingTime >= masterWaitingTimeoutMs) {
throw new BindException("cannot start master rpc endpoint")
}
Thread.sleep(5000)
masterStartWaitingTime += 5000
}
master
}

def setUpWorkers(
workerConf: Map[String, String] = null,
workerNum: Int = 3): collection.Set[Worker] = {
val timeout = 30000
val workers = new Array[Worker](workerNum)
val flagUpdateLock = new ReentrantLock()
val threads = (1 to workerNum).map { i =>
Expand All @@ -214,7 +216,7 @@ trait MiniClusterFeature extends Logging {
workerStarted = false
workerStartRetry += 1
logError(s"cannot start worker $i, retrying: ", ex)
if (workerStartRetry == 3) {
if (workerStartRetry == maxRetries) {
logError(s"cannot start worker $i, reached to max retrying", ex)
throw ex
} else {
Expand Down Expand Up @@ -246,12 +248,12 @@ trait MiniClusterFeature extends Logging {
} catch {
case ex: Throwable =>
logError("all workers haven't been started retrying", ex)
Thread.sleep(5000)
workersWaitingTime += 5000
if (workersWaitingTime >= timeout) {
logError(s"cannot start all workers after $timeout ms", ex)
if (workersWaitingTime >= workersWaitingTimeoutMs) {
logError(s"cannot start all workers after $workersWaitingTimeoutMs ms", ex)
throw ex
}
Thread.sleep(5000)
workersWaitingTime += 5000
}
}
workerInfos.keySet
Expand Down

0 comments on commit f0ce882

Please sign in to comment.