Skip to content

Commit

Permalink
Close underlying resources
Browse files Browse the repository at this point in the history
  • Loading branch information
w3stling committed Aug 24, 2024
1 parent efe8dbe commit 77526e7
Show file tree
Hide file tree
Showing 12 changed files with 2,451 additions and 63 deletions.
100 changes: 52 additions & 48 deletions src/main/java/com/apptasticsoftware/rssreader/AbstractRssReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
*/
package com.apptasticsoftware.rssreader;

import com.apptasticsoftware.rssreader.internal.StreamUtil;
import com.apptasticsoftware.rssreader.internal.stream.AutoCloseStream;
import com.apptasticsoftware.rssreader.util.Mapper;
import com.apptasticsoftware.rssreader.util.DaemonThreadFactory;
import com.apptasticsoftware.rssreader.util.XMLInputFactorySecurity;
Expand All @@ -43,11 +45,11 @@
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.*;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import java.util.zip.GZIPInputStream;

import static com.apptasticsoftware.rssreader.util.Mapper.mapLong;
Expand Down Expand Up @@ -363,7 +365,7 @@ public AbstractRssReader<C, I> addItemExtension(String tag, String attribute, Bi
Objects.requireNonNull(consumer, "Item consumer must not be null");

itemAttributes.computeIfAbsent(tag, k -> new HashMap<>())
.put(attribute, consumer);
.put(attribute, consumer);
return this;
}

Expand Down Expand Up @@ -394,7 +396,7 @@ public AbstractRssReader<C, I> addChannelExtension(String tag, String attribute,
Objects.requireNonNull(consumer, "Channel consumer must not be null");

channelAttributes.computeIfAbsent(tag, k -> new HashMap<>())
.put(attribute, consumer);
.put(attribute, consumer);
return this;
}

Expand Down Expand Up @@ -438,29 +440,29 @@ public Stream<Item> read(Collection<String> urls) {
initialize();
isInitialized = true;
}
return urls.stream()
.parallel()
.map(url -> {
try {
return Map.entry(url, readAsync(url));
} catch (Exception e) {
if (LOGGER.isLoggable(Level.WARNING)) {
LOGGER.log(Level.WARNING, () -> String.format("Failed read URL %s. Message: %s", url, e.getMessage()));
}
return null;
return AutoCloseStream.of(urls.stream()
.parallel()
.map(url -> {
try {
return Map.entry(url, readAsync(url));
} catch (Exception e) {
if (LOGGER.isLoggable(Level.WARNING)) {
LOGGER.log(Level.WARNING, () -> String.format("Failed read URL %s. Message: %s", url, e.getMessage()));
}
})
.filter(Objects::nonNull)
.flatMap(f -> {
try {
return f.getValue().join();
} catch (Exception e) {
if (LOGGER.isLoggable(Level.WARNING)) {
LOGGER.log(Level.WARNING, () -> String.format("Failed to read URL %s. Message: %s", f.getKey(), e.getMessage()));
}
return Stream.empty();
}
});
return null;
}
})
.filter(Objects::nonNull)
.flatMap(f -> {
try {
return f.getValue().join();
} catch (Exception e) {
if (LOGGER.isLoggable(Level.WARNING)) {
LOGGER.log(Level.WARNING, () -> String.format("Failed to read URL %s. Message: %s", f.getKey(), e.getMessage()));
}
return Stream.empty();
}
}));
}

/**
Expand All @@ -479,8 +481,7 @@ public Stream<I> read(InputStream inputStream) {
inputStream = new BufferedInputStream(inputStream);
removeBadData(inputStream);
var itemIterator = new RssItemIterator(inputStream);
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(itemIterator, Spliterator.ORDERED), false)
.onClose(itemIterator::close);
return AutoCloseStream.of(StreamUtil.asStream(itemIterator).onClose(itemIterator::close));
}

/**
Expand All @@ -505,7 +506,7 @@ public CompletableFuture<Stream<I>> readAsync(String url) {
*/
protected CompletableFuture<HttpResponse<InputStream>> sendAsyncRequest(String url) {
var builder = HttpRequest.newBuilder(URI.create(url))
.header("Accept-Encoding", "gzip");
.header("Accept-Encoding", "gzip");
if (requestTimeout.toMillis() > 0) {
builder.timeout(requestTimeout);
}
Expand Down Expand Up @@ -533,8 +534,7 @@ private Function<HttpResponse<InputStream>, Stream<I>> processResponse() {
inputStream = new BufferedInputStream(inputStream);
removeBadData(inputStream);
var itemIterator = new RssItemIterator(inputStream);
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(itemIterator, Spliterator.ORDERED), false)
.onClose(itemIterator::close);
return AutoCloseStream.of(StreamUtil.asStream(itemIterator).onClose(itemIterator::close));
} catch (IOException e) {
throw new CompletionException(e);
}
Expand Down Expand Up @@ -568,13 +568,15 @@ class RssItemIterator implements Iterator<I> {
private boolean isChannelPart = false;
private boolean isItemPart = false;
private ScheduledFuture<?> parseWatchdog;
private final AtomicBoolean isClosed;

public RssItemIterator(InputStream is) {
this.is = is;
nextItem = null;
textBuilder = new StringBuilder();
childNodeTextBuilder = new HashMap<>();
elementStack = new ArrayDeque<>();
isClosed = new AtomicBoolean(false);

try {
// disable XML external entity (XXE) processing
Expand All @@ -595,15 +597,17 @@ public RssItemIterator(InputStream is) {
}

public void close() {
try {
if (parseWatchdog != null) {
parseWatchdog.cancel(false);
}
reader.close();
is.close();
} catch (XMLStreamException | IOException e) {
if (LOGGER.isLoggable(Level.WARNING)) {
LOGGER.log(Level.WARNING, "Failed to close XML stream. ", e);
if (isClosed.compareAndSet(false,true)) {
try {
if (parseWatchdog != null) {
parseWatchdog.cancel(false);
}
reader.close();
is.close();
} catch (XMLStreamException | IOException e) {
if (LOGGER.isLoggable(Level.WARNING)) {
LOGGER.log(Level.WARNING, "Failed to close XML stream. ", e);
}
}
}
}
Expand Down Expand Up @@ -669,16 +673,16 @@ private void collectChildNodes(int type) {
// Add namespaces to start tag
for (int i = 0; i < reader.getNamespaceCount(); ++i) {
startTagBuilder.append(" ")
.append(toNamespacePrefix(reader.getNamespacePrefix(i)))
.append("=")
.append(reader.getNamespaceURI(i));
.append(toNamespacePrefix(reader.getNamespacePrefix(i)))
.append("=")
.append(reader.getNamespaceURI(i));
}
// Add attributes to start tag
for (int i = 0; i < reader.getAttributeCount(); ++i) {
startTagBuilder.append(" ")
.append(toNsName(reader.getAttributePrefix(i), reader.getAttributeLocalName(i)))
.append("=")
.append(reader.getAttributeValue(i));
.append(toNsName(reader.getAttributePrefix(i), reader.getAttributeLocalName(i)))
.append("=")
.append(reader.getAttributeValue(i));
}
startTagBuilder.append(">");
var startTag = startTagBuilder.toString();
Expand All @@ -699,9 +703,9 @@ private void collectChildNodes(int type) {
var nsTagName = toNsName(reader.getPrefix(), reader.getLocalName());
var endTag = "</" + nsTagName + ">";
childNodeTextBuilder.entrySet()
.stream()
.filter(e -> !e.getKey().equals(nsTagName))
.forEach(e -> e.getValue().append(endTag));
.stream()
.filter(e -> !e.getKey().equals(nsTagName))
.forEach(e -> e.getValue().append(endTag));
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* MIT License
*
* Copyright (c) 2024, Apptastic Software
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package com.apptasticsoftware.rssreader.internal;

import java.util.Iterator;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import static java.util.Objects.requireNonNull;

/**
* Internal utility class for working with streams.
*/
public class StreamUtil {

private StreamUtil() { }

/**
* Creates a Stream from an Iterator.
*
* @param iterator The Iterator to create the Stream from.
* @param <T> The type of the elements in the Stream.
* @return A Stream created from the Iterator.
*/
public static <T> Stream<T> asStream(Iterator<T> iterator) {
requireNonNull(iterator);
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* MIT License
*
* Copyright (c) 2024, Apptastic Software
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package com.apptasticsoftware.rssreader.internal.stream;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.*;
import java.util.stream.*;

public class AbstractAutoCloseStream<T, S extends BaseStream<T, S>> implements AutoCloseable {
private final S stream;
private final AtomicBoolean isClosed;

AbstractAutoCloseStream(S stream) {
this.stream = Objects.requireNonNull(stream);
this.isClosed = new AtomicBoolean();
}

protected S stream() {
return stream;
}

@Override
public void close() {
if (isClosed.compareAndSet(false,true)) {
stream().close();
}
}

<R> R autoClose(Function<S, R> function) {
try (S s = stream()) {
return function.apply(s);
}
}

<U> Stream<U> asAutoCloseStream(Stream<U> stream) {
return asAutoCloseStream(stream, AutoCloseStream::new);
}

IntStream asAutoCloseStream(IntStream stream) {
return asAutoCloseStream(stream, AutoCloseIntStream::new);
}

LongStream asAutoCloseStream(LongStream stream) {
return asAutoCloseStream(stream, AutoCloseLongStream::new);
}

DoubleStream asAutoCloseStream(DoubleStream stream) {
return asAutoCloseStream(stream, AutoCloseDoubleStream::new);
}

private <U> U asAutoCloseStream(U stream, Function<U, U> wrapper) {
if (stream instanceof AbstractAutoCloseStream) {
return stream;
}
return wrapper.apply(stream);
}

}
Loading

0 comments on commit 77526e7

Please sign in to comment.