Skip to content

Commit

Permalink
Fix race condition. Also make sure to flush output stream before writ…
Browse files Browse the repository at this point in the history
…ing the worker response

There was a race condition where a task could be cancelled with the
worker output streams still set to null, which would cause a null
pointer exception in the worker.

We also weren't always flushing the print stream before responding to
Bazel about the work request. This fixes that.
  • Loading branch information
jjudd committed Nov 12, 2024
1 parent 3704408 commit ac07080
Showing 1 changed file with 22 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,18 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream
// close them after the async work in the Future is all done.
// If we do something synchronous with Using, then there's a race condition where the
// streams can get closed before the Future is completed.
var outStream: ByteArrayOutputStream = null
var out: PrintStream = null
var maybeOutStream: Option[ByteArrayOutputStream] = None
var maybeOut: Option[PrintStream] = None

def flushOut(): Unit = {
maybeOut.map(_.flush())
}

val workTask = CancellableTask {
outStream = new ByteArrayOutputStream
out = new PrintStream(outStream)
val outStream = new ByteArrayOutputStream()
val out = new PrintStream(outStream)
maybeOutStream = Some(outStream)
maybeOut = Some(out)
try {
work(ctx, args, out, sandboxDir, verbosity)
0
Expand All @@ -137,24 +143,25 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream
.andThen {
// Work task succeeded or failed in an expected way
case Success(code) =>
out.flush()
writeResponse(requestId, Some(outStream), Some(code))
flushOut()
writeResponse(requestId, maybeOutStream, Some(code))
logVerbose(s"WorkResponse $requestId sent with code $code")

case Failure(e: ExecutionException) =>
e.getCause() match {
// Task successfully cancelled
case cancelError: InterruptedException =>
flushOut()
writeResponse(requestId, None, None, wasCancelled = true)
logVerbose(
s"Cancellation WorkResponse sent for request id: $requestId in response to an" +
" InterruptedException",
)
// Work task threw a non-fatal error
case e =>
e.printStackTrace(out)
out.flush()
writeResponse(requestId, Some(outStream), Some(-1))
maybeOut.map(e.printStackTrace(_))
flushOut()
writeResponse(requestId, maybeOutStream, Some(-1))
logVerbose(
"Encountered an uncaught exception that was wrapped in an ExecutionException while" +
s" proccessing the Future for WorkRequest $requestId. This usually means a non-fatal" +
Expand All @@ -165,6 +172,7 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream

// Task successfully cancelled
case Failure(e: CancellationException) =>
flushOut()
writeResponse(requestId, None, None, wasCancelled = true)
logVerbose(
s"Cancellation WorkResponse sent for request id: $requestId in response to a" +
Expand All @@ -173,15 +181,15 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream

// Work task threw an uncaught exception
case Failure(e) =>
e.printStackTrace(out)
out.flush()
writeResponse(requestId, Some(outStream), Some(-1))
maybeOut.map(e.printStackTrace(_))
flushOut()
writeResponse(requestId, maybeOutStream, Some(-1))
logVerbose(s"Uncaught exception in Future while proccessing WorkRequest $requestId:")
e.printStackTrace(System.err)
}(scala.concurrent.ExecutionContext.global)
.andThen { case _ =>
out.close()
outStream.close()
maybeOut.map(_.close())
maybeOutStream.map(_.close())
}(scala.concurrent.ExecutionContext.global)

// putIfAbsent will return a non-null value if there was already a value in the map
Expand Down

0 comments on commit ac07080

Please sign in to comment.