diff --git a/src/main/java/org/jenkinsci/plugins/workflow/log/BufferedBuildListener.java b/src/main/java/org/jenkinsci/plugins/workflow/log/BufferedBuildListener.java index e3ab264d..9d0dd81f 100644 --- a/src/main/java/org/jenkinsci/plugins/workflow/log/BufferedBuildListener.java +++ b/src/main/java/org/jenkinsci/plugins/workflow/log/BufferedBuildListener.java @@ -29,12 +29,15 @@ import hudson.remoting.Channel; import hudson.remoting.ChannelClosedException; import hudson.remoting.RemoteOutputStream; +import hudson.util.DaemonThreadFactory; +import hudson.util.NamingThreadFactory; import hudson.util.StreamTaskListener; import java.io.Closeable; import java.io.FilterOutputStream; import java.io.IOException; import java.io.OutputStream; import java.io.PrintStream; +import java.lang.ref.Cleaner; import java.util.logging.Logger; import org.jenkinsci.remoting.SerializableOnlyOverRemoting; @@ -44,12 +47,27 @@ */ final class BufferedBuildListener extends OutputStreamTaskListener.Default implements BuildListener, Closeable, SerializableOnlyOverRemoting { + private static final long serialVersionUID = 1; + private static final Logger LOGGER = Logger.getLogger(BufferedBuildListener.class.getName()); + private static final Cleaner cleaner = Cleaner.create(new NamingThreadFactory(new DaemonThreadFactory(), BufferedBuildListener.class.getName() + ".cleaner")); + private final OutputStream out; + private transient final Channel channel; + private transient final Listener listener; BufferedBuildListener(OutputStream out) { this.out = out; + if (out instanceof CloseableOutputStream) { + channel = Channel.currentOrFail(); + listener = new Listener((CloseableOutputStream) out, channel); + channel.addListener(listener); + cleaner.register(this, listener); + } else { + channel = null; + listener = null; + } } @Override public OutputStream getOutputStream() { @@ -58,12 +76,32 @@ final class BufferedBuildListener extends OutputStreamTaskListener.Default imple @Override public void close() throws IOException { getLogger().close(); + if (listener != null) { + channel.removeListener(listener); + } } private Object writeReplace() { return new Replacement(this); } + private static final class Listener extends Channel.Listener implements Runnable { + private final CloseableOutputStream cos; + private final Channel channel; + Listener(CloseableOutputStream cos, Channel channel) { + this.cos = cos; + this.channel = channel; + } + @Override public void onClosed(Channel channel, IOException cause) { + LOGGER.fine(() -> "closing " + channel.getName()); + cos.close(channel, cause); + channel.removeListener(this); + } + @Override public void run() { + channel.removeListener(this); + } + } + private static final class Replacement implements SerializableOnlyOverRemoting { private static final long serialVersionUID = 1; @@ -76,15 +114,7 @@ private static final class Replacement implements SerializableOnlyOverRemoting { } private Object readResolve() { - var cos = new CloseableOutputStream(new GCFlushedOutputStream(new DelayBufferedOutputStream(ros, tuning))); - Channel.currentOrFail().addListener(new Channel.Listener() { - @Override public void onClosed(Channel channel, IOException cause) { - LOGGER.fine(() -> "closing " + channel.getName()); - cos.close(channel, cause); - channel.removeListener(this); - } - }); - return new BufferedBuildListener(cos); + return new BufferedBuildListener(new CloseableOutputStream(new GCFlushedOutputStream(new DelayBufferedOutputStream(ros, tuning)))); } }