From ac07080ac48500f9ff7a5b9829df045cb1a0a0e2 Mon Sep 17 00:00:00 2001 From: jjudd Date: Tue, 12 Nov 2024 12:54:30 -0700 Subject: [PATCH] Fix race condition. Also make sure to flush output stream before writing the worker response There was a race condition where a task could be cancelled with the worker output streams still set to null, which would cause a null pointer exception in the worker. We also weren't always flushing the print stream before responding to Bazel about the work request. This fixes that. --- .../common/worker/WorkerMain.scala | 36 +++++++++++-------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/src/main/scala/higherkindness/rules_scala/common/worker/WorkerMain.scala b/src/main/scala/higherkindness/rules_scala/common/worker/WorkerMain.scala index 8adf48ba..960e4a44 100644 --- a/src/main/scala/higherkindness/rules_scala/common/worker/WorkerMain.scala +++ b/src/main/scala/higherkindness/rules_scala/common/worker/WorkerMain.scala @@ -117,12 +117,18 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream // close them after the async work in the Future is all done. // If we do something synchronous with Using, then there's a race condition where the // streams can get closed before the Future is completed. - var outStream: ByteArrayOutputStream = null - var out: PrintStream = null + var maybeOutStream: Option[ByteArrayOutputStream] = None + var maybeOut: Option[PrintStream] = None + + def flushOut(): Unit = { + maybeOut.map(_.flush()) + } val workTask = CancellableTask { - outStream = new ByteArrayOutputStream - out = new PrintStream(outStream) + val outStream = new ByteArrayOutputStream() + val out = new PrintStream(outStream) + maybeOutStream = Some(outStream) + maybeOut = Some(out) try { work(ctx, args, out, sandboxDir, verbosity) 0 @@ -137,14 +143,15 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream .andThen { // Work task succeeded or failed in an expected way case Success(code) => - out.flush() - writeResponse(requestId, Some(outStream), Some(code)) + flushOut() + writeResponse(requestId, maybeOutStream, Some(code)) logVerbose(s"WorkResponse $requestId sent with code $code") case Failure(e: ExecutionException) => e.getCause() match { // Task successfully cancelled case cancelError: InterruptedException => + flushOut() writeResponse(requestId, None, None, wasCancelled = true) logVerbose( s"Cancellation WorkResponse sent for request id: $requestId in response to an" + @@ -152,9 +159,9 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream ) // Work task threw a non-fatal error case e => - e.printStackTrace(out) - out.flush() - writeResponse(requestId, Some(outStream), Some(-1)) + maybeOut.map(e.printStackTrace(_)) + flushOut() + writeResponse(requestId, maybeOutStream, Some(-1)) logVerbose( "Encountered an uncaught exception that was wrapped in an ExecutionException while" + s" proccessing the Future for WorkRequest $requestId. This usually means a non-fatal" + @@ -165,6 +172,7 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream // Task successfully cancelled case Failure(e: CancellationException) => + flushOut() writeResponse(requestId, None, None, wasCancelled = true) logVerbose( s"Cancellation WorkResponse sent for request id: $requestId in response to a" + @@ -173,15 +181,15 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream // Work task threw an uncaught exception case Failure(e) => - e.printStackTrace(out) - out.flush() - writeResponse(requestId, Some(outStream), Some(-1)) + maybeOut.map(e.printStackTrace(_)) + flushOut() + writeResponse(requestId, maybeOutStream, Some(-1)) logVerbose(s"Uncaught exception in Future while proccessing WorkRequest $requestId:") e.printStackTrace(System.err) }(scala.concurrent.ExecutionContext.global) .andThen { case _ => - out.close() - outStream.close() + maybeOut.map(_.close()) + maybeOutStream.map(_.close()) }(scala.concurrent.ExecutionContext.global) // putIfAbsent will return a non-null value if there was already a value in the map