Skip to content

Commit

Permalink
flushing outstream take 2
Browse files Browse the repository at this point in the history
  • Loading branch information
jjudd committed Nov 12, 2024
1 parent 1ce90c3 commit 18aa2de
Showing 1 changed file with 22 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,18 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream
// close them after the async work in the Future is all done.
// If we do something synchronous with Using, then there's a race condition where the
// streams can get closed before the Future is completed.
var outStream: ByteArrayOutputStream = null
var out: PrintStream = null
var maybeOutStream: Option[ByteArrayOutputStream] = None
var maybeOut: Option[PrintStream] = None

def flushOut(): Unit = {
maybeOut.map(_.flush())
}

val workTask = CancellableTask {
outStream = new ByteArrayOutputStream
out = new PrintStream(outStream)
val outStream = new ByteArrayOutputStream()
val out = new PrintStream(outStream)
maybeOutStream = Some(outStream)
maybeOut = Some(out)
try {
work(ctx, args, out, sandboxDir, verbosity)
0
Expand All @@ -137,25 +143,25 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream
.andThen {
// Work task succeeded or failed in an expected way
case Success(code) =>
out.flush()
writeResponse(requestId, Some(outStream), Some(code))
flushOut()
writeResponse(requestId, maybeOutStream, Some(code))
logVerbose(s"WorkResponse $requestId sent with code $code")

case Failure(e: ExecutionException) =>
e.getCause() match {
// Task successfully cancelled
case cancelError: InterruptedException =>
out.flush()
flushOut()
writeResponse(requestId, None, None, wasCancelled = true)
logVerbose(
s"Cancellation WorkResponse sent for request id: $requestId in response to an" +
" InterruptedException",
)
// Work task threw a non-fatal error
case e =>
e.printStackTrace(out)
out.flush()
writeResponse(requestId, Some(outStream), Some(-1))
maybeOut.map(e.printStackTrace(_))
flushOut()
writeResponse(requestId, maybeOutStream, Some(-1))
logVerbose(
"Encountered an uncaught exception that was wrapped in an ExecutionException while" +
s" proccessing the Future for WorkRequest $requestId. This usually means a non-fatal" +
Expand All @@ -166,7 +172,7 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream

// Task successfully cancelled
case Failure(e: CancellationException) =>
out.flush()
flushOut()
writeResponse(requestId, None, None, wasCancelled = true)
logVerbose(
s"Cancellation WorkResponse sent for request id: $requestId in response to a" +
Expand All @@ -175,15 +181,15 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream

// Work task threw an uncaught exception
case Failure(e) =>
e.printStackTrace(out)
out.flush()
writeResponse(requestId, Some(outStream), Some(-1))
maybeOut.map(e.printStackTrace(_))
flushOut()
writeResponse(requestId, maybeOutStream, Some(-1))
logVerbose(s"Uncaught exception in Future while proccessing WorkRequest $requestId:")
e.printStackTrace(System.err)
}(scala.concurrent.ExecutionContext.global)
.andThen { case _ =>
out.close()
outStream.close()
maybeOut.map(_.close())
maybeOutStream.map(_.close())
}(scala.concurrent.ExecutionContext.global)

// putIfAbsent will return a non-null value if there was already a value in the map
Expand Down

0 comments on commit 18aa2de

Please sign in to comment.