Skip to content

Commit

Permalink
use non-interrupting scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
phearnot committed Dec 9, 2024
1 parent 900c1ac commit 14b1829
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 1 deletion.
2 changes: 1 addition & 1 deletion src/main/scala/units/ConsensusClientDependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class ConsensusClientDependencies(val config: ClientConfig) extends AutoCloseabl
Schedulers.singleThread(s"block-observer-${config.chainContract}", reporter = { e => log.warn("Error in BlockObserver", e) })
val globalScheduler: Scheduler = monix.execution.Scheduler.global
val eluScheduler: SchedulerService =
Scheduler.singleThread(s"el-updater-${config.chainContract}", reporter = { e => log.warn("Exception in ELUpdater", e) })
NonInterruptingScheduledExecutor(s"el-updater-${config.chainContract}", reporter = { e => log.warn("Exception in ELUpdater", e) })

private val httpClientBackend = HttpClientSyncBackend()
private val maybeAuthenticatedBackend = config.jwtSecretFile match {
Expand Down
80 changes: 80 additions & 0 deletions src/main/scala/units/NonInterruptingScheduledExecutor.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package units

import monix.execution.schedulers.ExecutorScheduler
import monix.execution.{Cancelable, Features, UncaughtExceptionReporter, ExecutionModel as ExecModel}

import java.util.concurrent.*
import scala.concurrent.duration.TimeUnit

abstract class AdaptedThreadPoolExecutor(corePoolSize: Int, factory: ThreadFactory) extends ScheduledThreadPoolExecutor(corePoolSize, factory) {
def reportFailure(t: Throwable): Unit

override def afterExecute(r: Runnable, t: Throwable): Unit = {
super.afterExecute(r, t)
var exception: Throwable = t

if ((exception eq null) && r.isInstanceOf[Future[?]]) {
try {
val future = r.asInstanceOf[Future[?]]
if (future.isDone) future.get()
} catch {
case ex: ExecutionException =>
exception = ex.getCause
case _: InterruptedException =>
// ignore/reset
Thread.currentThread().interrupt()
case _: CancellationException =>
() // ignore
}
}

if (exception ne null) reportFailure(exception)
}
}

class NonInterruptingScheduledExecutor(
s: ScheduledExecutorService,
r: UncaughtExceptionReporter,
override val executionModel: ExecModel,
override val features: Features
) extends ExecutorScheduler(s, r) {
override def executor: ScheduledExecutorService = s

def scheduleOnce(initialDelay: Long, unit: TimeUnit, r: Runnable): Cancelable = {
if (initialDelay <= 0) {
execute(r)
Cancelable.empty
} else {
val task = s.schedule(r, initialDelay, unit)
Cancelable(() => { task.cancel(false); () })
}
}

override def scheduleWithFixedDelay(initialDelay: Long, delay: Long, unit: TimeUnit, r: Runnable): Cancelable = {
val task = s.scheduleWithFixedDelay(r, initialDelay, delay, unit)
Cancelable(() => { task.cancel(false); () })
}

override def scheduleAtFixedRate(initialDelay: Long, period: Long, unit: TimeUnit, r: Runnable): Cancelable = {
val task = s.scheduleAtFixedRate(r, initialDelay, period, unit)
Cancelable(() => { task.cancel(false); () })
}
}

object NonInterruptingScheduledExecutor {
def apply(name: String, reporter: UncaughtExceptionReporter): NonInterruptingScheduledExecutor = {
val atpe = new AdaptedThreadPoolExecutor(
1,
(r: Runnable) => {
val t = new Thread(r)
t.setDaemon(true)
t.setName(name)
t.setUncaughtExceptionHandler(reporter.asJava)
t
}
) {
override def reportFailure(t: Throwable): Unit = reporter.reportFailure(t)
}
new NonInterruptingScheduledExecutor(atpe, reporter, ExecModel.AlwaysAsyncExecution, Features.empty)
}
}

0 comments on commit 14b1829

Please sign in to comment.