Skip to content

Commit

Permalink
Close underlying resources when no timeout is used (#178)
Browse files Browse the repository at this point in the history
  • Loading branch information
w3stling authored Sep 15, 2024
1 parent 821312a commit 789b1d9
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 29 deletions.
8 changes: 3 additions & 5 deletions .github/workflows/codeql-analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ jobs:

# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below)
- name: Autobuild
uses: github/codeql-action/autobuild@v3
#- name: Autobuild
# uses: github/codeql-action/autobuild@v3

# ℹ️ Command-line programs to run using the OS shell.
# 📚 https://git.io/JvXDl
Expand All @@ -53,9 +53,7 @@ jobs:
# and modify them (or add more) to build your code if your project
# uses a compiled language

#- run: |
# make bootstrap
# make release
- run: ./gradlew clean build -x test

- name: Perform CodeQL analysis
uses: github/codeql-action/analyze@v3
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.ref.Cleaner;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
Expand Down Expand Up @@ -69,6 +70,7 @@
public abstract class AbstractRssReader<C extends Channel, I extends Item> {
private static final Logger LOGGER = Logger.getLogger("com.apptasticsoftware.rssreader");
private static final ScheduledExecutorService EXECUTOR = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory("RssReaderWorker"));
private static final Cleaner CLEANER = Cleaner.create();
private final HttpClient httpClient;
private DateTimeParser dateTimeParser = new DateTime();
private String userAgent = "";
Expand Down Expand Up @@ -556,10 +558,40 @@ private void removeBadData(InputStream inputStream) {
} catch (IOException ignore) { }
}

class RssItemIterator implements Iterator<I> {
private static class CleaningAction implements Runnable {
private final XMLStreamReader xmlStreamReader;
private final List<AutoCloseable> resources;

public CleaningAction(XMLStreamReader xmlStreamReader, AutoCloseable... resources) {
this.xmlStreamReader = xmlStreamReader;
this.resources = List.of(resources);
}

@Override
public void run() {
try {
if (xmlStreamReader != null) {
xmlStreamReader.close();
}
} catch (XMLStreamException e) {
LOGGER.log(Level.WARNING, "Failed to close XML stream. ", e);
}

for (AutoCloseable resource : resources) {
try {
if (resource != null) {
resource.close();
}
} catch (Exception e) {
LOGGER.log(Level.WARNING, "Failed to close resource. ", e);
}
}
}
}

class RssItemIterator implements Iterator<I>, AutoCloseable {
private final StringBuilder textBuilder;
private final Map<String, StringBuilder> childNodeTextBuilder;
private final InputStream is;
private final Deque<String> elementStack;
private XMLStreamReader reader;
private C channel;
Expand All @@ -569,9 +601,9 @@ class RssItemIterator implements Iterator<I> {
private boolean isItemPart = false;
private ScheduledFuture<?> parseWatchdog;
private final AtomicBoolean isClosed;
private Cleaner.Cleanable cleanable;

public RssItemIterator(InputStream is) {
this.is = is;
nextItem = null;
textBuilder = new StringBuilder();
childNodeTextBuilder = new HashMap<>();
Expand All @@ -585,6 +617,7 @@ public RssItemIterator(InputStream is) {
xmlInputFactory.setProperty(XMLInputFactory.IS_NAMESPACE_AWARE, false);

reader = xmlInputFactory.createXMLStreamReader(is);
cleanable = CLEANER.register(this, new CleaningAction(reader, is));
if (!readTimeout.isZero()) {
parseWatchdog = EXECUTOR.schedule(this::close, readTimeout.toMillis(), TimeUnit.MILLISECONDS);
}
Expand All @@ -595,15 +628,10 @@ public RssItemIterator(InputStream is) {
}

public void close() {
if (isClosed.compareAndSet(false,true)) {
try {
if (parseWatchdog != null) {
parseWatchdog.cancel(false);
}
reader.close();
is.close();
} catch (XMLStreamException | IOException e) {
LOGGER.log(Level.WARNING, "Failed to close XML stream. ", e);
if (isClosed.compareAndSet(false, true)) {
cleanable.clean();
if (parseWatchdog != null) {
parseWatchdog.cancel(false);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import javax.net.ssl.SSLContext;
import java.io.*;
import java.lang.ref.Reference;
import java.net.*;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
Expand Down Expand Up @@ -492,11 +493,13 @@ private String getRssFeedAsString(String url) throws IOException, InterruptedExc
.GET()
.build();

var client = HttpClient.newBuilder()
HttpResponse<String> response;
try (var client = HttpClient.newBuilder()
.followRedirects(HttpClient.Redirect.NORMAL)
.build();
.build()) {

var response = client.send(req, HttpResponse.BodyHandlers.ofString());
response = client.send(req, HttpResponse.BodyHandlers.ofString());
}
return response.body();
}

Expand Down Expand Up @@ -605,6 +608,31 @@ void testCloseTwice() throws IOException {
}
}

@SuppressWarnings("java:S2925")
@Test
void testCloseWithCleaner() {
var fileInputSteam = fromFile("atom-feed.xml");
var stream = new RssReader().setReadTimeout(Duration.ZERO).read(fileInputSteam);
var iterator = stream.iterator();
var item = iterator.next();
assertNotNull(item);

iterator = null; // Remove this reference to iterator
Reference.reachabilityFence(iterator); // Ensure data is not over-optimitically gc'd
stream = null; // Remove this reference to stream
Reference.reachabilityFence(stream); // Ensure data is not over-optimitically gc'd

for (int retries = 10; retries > 0; retries--) {
System.gc();
try {
Thread.sleep(10L);
} catch (InterruptedException ignored) { /* ignore */ }
}

IOException thrown = assertThrows(IOException.class, fileInputSteam::available);
assertEquals("Stream closed", thrown.getMessage());
}

@SuppressWarnings("java:S5961")
@Test
void testAtomFeed() {
Expand Down
17 changes: 8 additions & 9 deletions src/test/java/com/apptasticsoftware/rssreader/RssReaderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ void badFormattedXml() throws IOException {
"</channel>\n" +
"</rss>\n";

CompletableFuture<HttpResponse<InputStream>> httpResponse = createMock(response);
var httpResponse = createMock(response);
RssReader readerMock = Mockito.spy(RssReader.class);
doReturn(httpResponse).when(readerMock).sendAsyncRequest(anyString());

Expand Down Expand Up @@ -78,7 +78,7 @@ void leadingCRCharacter() throws IOException {
"</channel>\n" +
"</rss>\n";

CompletableFuture<HttpResponse<InputStream>> httpResponse = createMock(response);
var httpResponse = createMock(response);
RssReader readerMock = Mockito.spy(RssReader.class);
doReturn(httpResponse).when(readerMock).sendAsyncRequest(anyString());

Expand Down Expand Up @@ -125,7 +125,7 @@ void leadingCRLDCharacters() throws IOException {
"</channel>\n" +
"</rss>\n";

CompletableFuture<HttpResponse<InputStream>> httpResponse = createMock(response);
var httpResponse = createMock(response);
RssReader readerMock = Mockito.spy(RssReader.class);
doReturn(httpResponse).when(readerMock).sendAsyncRequest(anyString());

Expand Down Expand Up @@ -172,7 +172,7 @@ void leadingWhitespace() throws IOException {
"</channel>\n" +
"</rss>\n";

CompletableFuture<HttpResponse<InputStream>> httpResponse = createMock(response);
var httpResponse = createMock(response);
RssReader readerMock = Mockito.spy(RssReader.class);
doReturn(httpResponse).when(readerMock).sendAsyncRequest(anyString());

Expand Down Expand Up @@ -219,7 +219,7 @@ void Cdata() throws IOException {
"</channel>\n" +
"</rss>\n";

CompletableFuture<HttpResponse<InputStream>> httpResponse = createMock(response);
var httpResponse = createMock(response);
RssReader readerMock = Mockito.spy(RssReader.class);
doReturn(httpResponse).when(readerMock).sendAsyncRequest(anyString());

Expand Down Expand Up @@ -248,7 +248,7 @@ void Cdata() throws IOException {
void emptyResponse() throws IOException {
String response = "";

CompletableFuture<HttpResponse<InputStream>> httpResponse = createMock(response);
var httpResponse = createMock(response);
RssReader readerMock = Mockito.spy(RssReader.class);
doReturn(httpResponse).when(readerMock).sendAsyncRequest(anyString());

Expand Down Expand Up @@ -514,9 +514,8 @@ void equalsContract() {
}


private CompletableFuture<HttpResponse<InputStream>> createMock(String response) {
HttpResponse<InputStream> httpResponse = mock(HttpResponse.class);

private CompletableFuture createMock(String response) {
var httpResponse = mock(HttpResponse.class);
InputStream responseStream = new ByteArrayInputStream(response.getBytes());
doReturn(responseStream).when(httpResponse).body();

Expand Down

0 comments on commit 789b1d9

Please sign in to comment.