Skip to content

Commit

Permalink
CXF-7396: CachedOutputStream doesn't delete temp files (#2048)
Browse files Browse the repository at this point in the history
* CXF-7396: CachedOutputStream doesn't delete temp files

* Refactor the cleaner implementation and add guardrails for cleaner delay

(cherry picked from commit 03a85a5)
  • Loading branch information
reta committed Sep 22, 2024
1 parent 2cf863e commit 0a97dde
Show file tree
Hide file tree
Showing 9 changed files with 561 additions and 4 deletions.
5 changes: 5 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,11 @@
<artifactId>saaj-impl</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
8 changes: 8 additions & 0 deletions core/src/main/java/org/apache/cxf/io/CachedConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ public final class CachedConstants {
public static final String CIPHER_TRANSFORMATION_BUS_PROP =
"bus.io.CachedOutputStream.CipherTransformation";

/**
* The delay (in ms) for cleaning up unclosed {@code CachedOutputStream} instances. 30 minutes
* is specified by default, the minimum value is 2 seconds. If the value of the delay is set to
* 0 (or is negative), the cleaner will be deactivated.
*/
public static final String CLEANER_DELAY_BUS_PROP =
"bus.io.CachedOutputStreamCleaner.Delay";

private CachedConstants() {
// complete
}
Expand Down
22 changes: 21 additions & 1 deletion core/src/main/java/org/apache/cxf/io/CachedOutputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
Expand Down Expand Up @@ -93,6 +94,7 @@ public class CachedOutputStream extends OutputStream {
private List<CachedOutputStreamCallback> callbacks;

private List<Object> streamList = new ArrayList<>();
private CachedOutputStreamCleaner cachedOutputStreamCleaner;

public CachedOutputStream() {
this(defaultThreshold);
Expand Down Expand Up @@ -127,6 +129,8 @@ private void readBusProperties() {
outputDir = f;
}
}

cachedOutputStreamCleaner = b.getExtension(CachedOutputStreamCleaner.class);
}
}

Expand Down Expand Up @@ -279,6 +283,9 @@ public void resetOut(OutputStream out, boolean copyOldContent) throws IOExceptio
}
} finally {
streamList.remove(currentStream);
if (cachedOutputStreamCleaner != null) {
cachedOutputStreamCleaner.unregister(currentStream);
}
deleteTempFile();
inmem = true;
}
Expand Down Expand Up @@ -481,6 +488,9 @@ private void createFileOutputStream() throws IOException {
bout.writeTo(currentStream);
inmem = false;
streamList.add(currentStream);
if (cachedOutputStreamCleaner != null) {
cachedOutputStreamCleaner.register(this);
}
} catch (Exception ex) {
//Could be IOException or SecurityException or other issues.
//Don't care what, just keep it in memory.
Expand Down Expand Up @@ -512,6 +522,10 @@ public InputStream getInputStream() throws IOException {
try {
InputStream fileInputStream = new TransferableFileInputStream(tempFile);
streamList.add(fileInputStream);
if (cachedOutputStreamCleaner != null) {
cachedOutputStreamCleaner.register(fileInputStream);
}

if (cipherTransformation != null) {
fileInputStream = new CipherInputStream(fileInputStream, ciphers.getDecryptor()) {
boolean closed;
Expand All @@ -537,7 +551,7 @@ private synchronized void deleteTempFile() {
FileUtils.delete(file);
}
}
private boolean maybeDeleteTempFile(Object stream) {
private boolean maybeDeleteTempFile(Closeable stream) {
boolean postClosedInvoked = false;
streamList.remove(stream);
if (!inmem && tempFile != null && streamList.isEmpty() && allowDeleteOfFile) {
Expand All @@ -549,6 +563,9 @@ private boolean maybeDeleteTempFile(Object stream) {
//ignore
}
postClosedInvoked = true;
if (cachedOutputStreamCleaner != null) {
cachedOutputStreamCleaner.unregister(this);
}
}
deleteTempFile();
currentStream = new LoadingByteArrayOutputStream(1024);
Expand Down Expand Up @@ -665,6 +682,9 @@ public void close() throws IOException {
if (!closed) {
super.close();
maybeDeleteTempFile(this);
if (cachedOutputStreamCleaner != null) {
cachedOutputStreamCleaner.unregister(this);
}
}
closed = true;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.cxf.io;

import java.io.Closeable;

/**
* The {@link Bus} extension to clean up unclosed {@link CachedOutputStream} instances (and alike) backed by
* temporary files (leading to disk fill, see https://issues.apache.org/jira/browse/CXF-7396.
*/
public interface CachedOutputStreamCleaner {
/**
* Run the clean up
*/
void clean();

/**
* Register the stream instance for the clean up
*/
void unregister(Closeable closeable);

/**
* Unregister the stream instance from the clean up (closed properly)
*/
void register(Closeable closeable);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.cxf.io;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Objects;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

import jakarta.annotation.Resource;
import org.apache.cxf.Bus;
import org.apache.cxf.buslifecycle.BusLifeCycleListener;
import org.apache.cxf.buslifecycle.BusLifeCycleManager;
import org.apache.cxf.common.logging.LogUtils;

public final class DelayedCachedOutputStreamCleaner implements CachedOutputStreamCleaner, BusLifeCycleListener {
private static final Logger LOG = LogUtils.getL7dLogger(DelayedCachedOutputStreamCleaner.class);
private static final long MIN_DELAY = 2000; /* 2 seconds */
private static final DelayedCleaner NOOP_CLEANER = new DelayedCleaner() {
// NOOP
};

private DelayedCleaner cleaner = NOOP_CLEANER;

private interface DelayedCleaner extends CachedOutputStreamCleaner, Closeable {
@Override
default void register(Closeable closeable) {
}

@Override
default void unregister(Closeable closeable) {
}

@Override
default void close() {
}

@Override
default void clean() {
}

default void forceClean() {
}
}

private static final class DelayedCleanerImpl implements DelayedCleaner {
private final long delay; /* default is 30 minutes, in milliseconds */
private final DelayQueue<DelayedCloseable> queue = new DelayQueue<>();
private final Timer timer;

DelayedCleanerImpl(final long delay) {
this.delay = delay;
this.timer = new Timer("DelayedCachedOutputStreamCleaner", true);
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
clean();
}
}, 0, Math.max(MIN_DELAY, delay >> 1));
}

@Override
public void register(Closeable closeable) {
queue.put(new DelayedCloseable(closeable, delay));
}

@Override
public void unregister(Closeable closeable) {
queue.remove(new DelayedCloseable(closeable, delay));
}

@Override
public void clean() {
final Collection<DelayedCloseable> closeables = new ArrayList<>();
queue.drainTo(closeables);
clean(closeables);
}

@Override
public void forceClean() {
clean(queue);
}

@Override
public void close() {
timer.cancel();
queue.clear();
}

private void clean(Collection<DelayedCloseable> closeables) {
final Iterator<DelayedCloseable> iterator = closeables.iterator();
while (iterator.hasNext()) {
final DelayedCloseable next = iterator.next();
try {
iterator.remove();
LOG.warning("Unclosed (leaked?) stream detected: " + next.closeable);
next.closeable.close();
} catch (final IOException | RuntimeException ex) {
LOG.warning("Unable to close (leaked?) stream: " + ex.getMessage());
}
}
}
}

private static final class DelayedCloseable implements Delayed {
private final Closeable closeable;
private final long expireAt;

DelayedCloseable(final Closeable closeable, final long delay) {
this.closeable = closeable;
this.expireAt = System.nanoTime() + delay;
}

@Override
public int compareTo(Delayed o) {
return Long.compare(expireAt, ((DelayedCloseable) o).expireAt);
}

@Override
public long getDelay(TimeUnit unit) {
return unit.convert(expireAt - System.nanoTime(), TimeUnit.NANOSECONDS);
}

@Override
public int hashCode() {
return Objects.hash(closeable);
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}

if (obj == null) {
return false;
}

if (getClass() != obj.getClass()) {
return false;
}

final DelayedCloseable other = (DelayedCloseable) obj;
return Objects.equals(closeable, other.closeable);
}
}

@Resource
public void setBus(Bus bus) {
Number delayValue = null;
BusLifeCycleManager busLifeCycleManager = null;

if (bus != null) {
delayValue = (Number) bus.getProperty(CachedConstants.CLEANER_DELAY_BUS_PROP);
busLifeCycleManager = bus.getExtension(BusLifeCycleManager.class);
}

if (cleaner != null) {
cleaner.close();
}

if (delayValue == null) {
// Default delay is set to 30 mins
cleaner = new DelayedCleanerImpl(TimeUnit.MILLISECONDS.convert(30, TimeUnit.MINUTES));
} else {
final long value = delayValue.longValue();
if (value > 0 && value >= MIN_DELAY) {
cleaner = new DelayedCleanerImpl(value); /* already in milliseconds */
} else {
cleaner = NOOP_CLEANER;
if (value != 0) {
throw new IllegalArgumentException("The value of " + CachedConstants.CLEANER_DELAY_BUS_PROP
+ " property is invalid: " + value + " (should be >= " + MIN_DELAY + ", 0 to deactivate)");
}
}
}

if (busLifeCycleManager != null) {
busLifeCycleManager.registerLifeCycleListener(this);
}
}

@Override
public void register(Closeable closeable) {
cleaner.register(closeable);
}

@Override
public void unregister(Closeable closeable) {
cleaner.unregister(closeable);
}

@Override
public void clean() {
cleaner.clean();
}

@Override
public void initComplete() {
}

@Override
public void postShutdown() {
}

@Override
public void preShutdown() {
cleaner.close();
}

public void forceClean() {
cleaner.forceClean();
}
}
Loading

0 comments on commit 0a97dde

Please sign in to comment.