diff --git a/src/main/scala/higherkindness/rules_scala/common/worker/WorkerMain.scala b/src/main/scala/higherkindness/rules_scala/common/worker/WorkerMain.scala index 8adf48ba..960e4a44 100644 --- a/src/main/scala/higherkindness/rules_scala/common/worker/WorkerMain.scala +++ b/src/main/scala/higherkindness/rules_scala/common/worker/WorkerMain.scala @@ -117,12 +117,18 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream // close them after the async work in the Future is all done. // If we do something synchronous with Using, then there's a race condition where the // streams can get closed before the Future is completed. - var outStream: ByteArrayOutputStream = null - var out: PrintStream = null + var maybeOutStream: Option[ByteArrayOutputStream] = None + var maybeOut: Option[PrintStream] = None + + def flushOut(): Unit = { + maybeOut.map(_.flush()) + } val workTask = CancellableTask { - outStream = new ByteArrayOutputStream - out = new PrintStream(outStream) + val outStream = new ByteArrayOutputStream() + val out = new PrintStream(outStream) + maybeOutStream = Some(outStream) + maybeOut = Some(out) try { work(ctx, args, out, sandboxDir, verbosity) 0 @@ -137,14 +143,15 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream .andThen { // Work task succeeded or failed in an expected way case Success(code) => - out.flush() - writeResponse(requestId, Some(outStream), Some(code)) + flushOut() + writeResponse(requestId, maybeOutStream, Some(code)) logVerbose(s"WorkResponse $requestId sent with code $code") case Failure(e: ExecutionException) => e.getCause() match { // Task successfully cancelled case cancelError: InterruptedException => + flushOut() writeResponse(requestId, None, None, wasCancelled = true) logVerbose( s"Cancellation WorkResponse sent for request id: $requestId in response to an" + @@ -152,9 +159,9 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream ) // Work task threw a non-fatal error case e => - e.printStackTrace(out) - out.flush() - writeResponse(requestId, Some(outStream), Some(-1)) + maybeOut.map(e.printStackTrace(_)) + flushOut() + writeResponse(requestId, maybeOutStream, Some(-1)) logVerbose( "Encountered an uncaught exception that was wrapped in an ExecutionException while" + s" proccessing the Future for WorkRequest $requestId. This usually means a non-fatal" + @@ -165,6 +172,7 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream // Task successfully cancelled case Failure(e: CancellationException) => + flushOut() writeResponse(requestId, None, None, wasCancelled = true) logVerbose( s"Cancellation WorkResponse sent for request id: $requestId in response to a" + @@ -173,15 +181,15 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream // Work task threw an uncaught exception case Failure(e) => - e.printStackTrace(out) - out.flush() - writeResponse(requestId, Some(outStream), Some(-1)) + maybeOut.map(e.printStackTrace(_)) + flushOut() + writeResponse(requestId, maybeOutStream, Some(-1)) logVerbose(s"Uncaught exception in Future while proccessing WorkRequest $requestId:") e.printStackTrace(System.err) }(scala.concurrent.ExecutionContext.global) .andThen { case _ => - out.close() - outStream.close() + maybeOut.map(_.close()) + maybeOutStream.map(_.close()) }(scala.concurrent.ExecutionContext.global) // putIfAbsent will return a non-null value if there was already a value in the map