diff --git a/src/main/java/com/apptasticsoftware/rssreader/AbstractRssReader.java b/src/main/java/com/apptasticsoftware/rssreader/AbstractRssReader.java
index 16d8dbb..0cc144a 100644
--- a/src/main/java/com/apptasticsoftware/rssreader/AbstractRssReader.java
+++ b/src/main/java/com/apptasticsoftware/rssreader/AbstractRssReader.java
@@ -23,6 +23,8 @@
*/
package com.apptasticsoftware.rssreader;
+import com.apptasticsoftware.rssreader.internal.StreamUtil;
+import com.apptasticsoftware.rssreader.internal.stream.AutoCloseStream;
import com.apptasticsoftware.rssreader.util.Mapper;
import com.apptasticsoftware.rssreader.util.DaemonThreadFactory;
import com.apptasticsoftware.rssreader.util.XMLInputFactorySecurity;
@@ -43,11 +45,11 @@
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.*;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Stream;
-import java.util.stream.StreamSupport;
import java.util.zip.GZIPInputStream;
import static com.apptasticsoftware.rssreader.util.Mapper.mapLong;
@@ -363,7 +365,7 @@ public AbstractRssReader addItemExtension(String tag, String attribute, Bi
Objects.requireNonNull(consumer, "Item consumer must not be null");
itemAttributes.computeIfAbsent(tag, k -> new HashMap<>())
- .put(attribute, consumer);
+ .put(attribute, consumer);
return this;
}
@@ -394,7 +396,7 @@ public AbstractRssReader addChannelExtension(String tag, String attribute,
Objects.requireNonNull(consumer, "Channel consumer must not be null");
channelAttributes.computeIfAbsent(tag, k -> new HashMap<>())
- .put(attribute, consumer);
+ .put(attribute, consumer);
return this;
}
@@ -438,29 +440,29 @@ public Stream read(Collection urls) {
initialize();
isInitialized = true;
}
- return urls.stream()
- .parallel()
- .map(url -> {
- try {
- return Map.entry(url, readAsync(url));
- } catch (Exception e) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.log(Level.WARNING, () -> String.format("Failed read URL %s. Message: %s", url, e.getMessage()));
- }
- return null;
+ return AutoCloseStream.of(urls.stream()
+ .parallel()
+ .map(url -> {
+ try {
+ return Map.entry(url, readAsync(url));
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.log(Level.WARNING, () -> String.format("Failed read URL %s. Message: %s", url, e.getMessage()));
}
- })
- .filter(Objects::nonNull)
- .flatMap(f -> {
- try {
- return f.getValue().join();
- } catch (Exception e) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.log(Level.WARNING, () -> String.format("Failed to read URL %s. Message: %s", f.getKey(), e.getMessage()));
- }
- return Stream.empty();
- }
- });
+ return null;
+ }
+ })
+ .filter(Objects::nonNull)
+ .flatMap(f -> {
+ try {
+ return f.getValue().join();
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.log(Level.WARNING, () -> String.format("Failed to read URL %s. Message: %s", f.getKey(), e.getMessage()));
+ }
+ return Stream.empty();
+ }
+ }));
}
/**
@@ -479,8 +481,7 @@ public Stream read(InputStream inputStream) {
inputStream = new BufferedInputStream(inputStream);
removeBadData(inputStream);
var itemIterator = new RssItemIterator(inputStream);
- return StreamSupport.stream(Spliterators.spliteratorUnknownSize(itemIterator, Spliterator.ORDERED), false)
- .onClose(itemIterator::close);
+ return AutoCloseStream.of(StreamUtil.asStream(itemIterator).onClose(itemIterator::close));
}
/**
@@ -505,7 +506,7 @@ public CompletableFuture> readAsync(String url) {
*/
protected CompletableFuture> sendAsyncRequest(String url) {
var builder = HttpRequest.newBuilder(URI.create(url))
- .header("Accept-Encoding", "gzip");
+ .header("Accept-Encoding", "gzip");
if (requestTimeout.toMillis() > 0) {
builder.timeout(requestTimeout);
}
@@ -533,8 +534,7 @@ private Function, Stream> processResponse() {
inputStream = new BufferedInputStream(inputStream);
removeBadData(inputStream);
var itemIterator = new RssItemIterator(inputStream);
- return StreamSupport.stream(Spliterators.spliteratorUnknownSize(itemIterator, Spliterator.ORDERED), false)
- .onClose(itemIterator::close);
+ return AutoCloseStream.of(StreamUtil.asStream(itemIterator).onClose(itemIterator::close));
} catch (IOException e) {
throw new CompletionException(e);
}
@@ -568,6 +568,7 @@ class RssItemIterator implements Iterator {
private boolean isChannelPart = false;
private boolean isItemPart = false;
private ScheduledFuture> parseWatchdog;
+ private final AtomicBoolean isClosed;
public RssItemIterator(InputStream is) {
this.is = is;
@@ -575,6 +576,7 @@ public RssItemIterator(InputStream is) {
textBuilder = new StringBuilder();
childNodeTextBuilder = new HashMap<>();
elementStack = new ArrayDeque<>();
+ isClosed = new AtomicBoolean(false);
try {
// disable XML external entity (XXE) processing
@@ -595,15 +597,17 @@ public RssItemIterator(InputStream is) {
}
public void close() {
- try {
- if (parseWatchdog != null) {
- parseWatchdog.cancel(false);
- }
- reader.close();
- is.close();
- } catch (XMLStreamException | IOException e) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.log(Level.WARNING, "Failed to close XML stream. ", e);
+ if (isClosed.compareAndSet(false,true)) {
+ try {
+ if (parseWatchdog != null) {
+ parseWatchdog.cancel(false);
+ }
+ reader.close();
+ is.close();
+ } catch (XMLStreamException | IOException e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.log(Level.WARNING, "Failed to close XML stream. ", e);
+ }
}
}
}
@@ -669,16 +673,16 @@ private void collectChildNodes(int type) {
// Add namespaces to start tag
for (int i = 0; i < reader.getNamespaceCount(); ++i) {
startTagBuilder.append(" ")
- .append(toNamespacePrefix(reader.getNamespacePrefix(i)))
- .append("=")
- .append(reader.getNamespaceURI(i));
+ .append(toNamespacePrefix(reader.getNamespacePrefix(i)))
+ .append("=")
+ .append(reader.getNamespaceURI(i));
}
// Add attributes to start tag
for (int i = 0; i < reader.getAttributeCount(); ++i) {
startTagBuilder.append(" ")
- .append(toNsName(reader.getAttributePrefix(i), reader.getAttributeLocalName(i)))
- .append("=")
- .append(reader.getAttributeValue(i));
+ .append(toNsName(reader.getAttributePrefix(i), reader.getAttributeLocalName(i)))
+ .append("=")
+ .append(reader.getAttributeValue(i));
}
startTagBuilder.append(">");
var startTag = startTagBuilder.toString();
@@ -699,9 +703,9 @@ private void collectChildNodes(int type) {
var nsTagName = toNsName(reader.getPrefix(), reader.getLocalName());
var endTag = "" + nsTagName + ">";
childNodeTextBuilder.entrySet()
- .stream()
- .filter(e -> !e.getKey().equals(nsTagName))
- .forEach(e -> e.getValue().append(endTag));
+ .stream()
+ .filter(e -> !e.getKey().equals(nsTagName))
+ .forEach(e -> e.getValue().append(endTag));
}
}
diff --git a/src/main/java/com/apptasticsoftware/rssreader/internal/StreamUtil.java b/src/main/java/com/apptasticsoftware/rssreader/internal/StreamUtil.java
new file mode 100644
index 0000000..8f87e7a
--- /dev/null
+++ b/src/main/java/com/apptasticsoftware/rssreader/internal/StreamUtil.java
@@ -0,0 +1,52 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2024, Apptastic Software
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+package com.apptasticsoftware.rssreader.internal;
+
+import java.util.Iterator;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Internal utility class for working with streams.
+ */
+public class StreamUtil {
+
+ private StreamUtil() { }
+
+ /**
+ * Creates a Stream from an Iterator.
+ *
+ * @param iterator The Iterator to create the Stream from.
+ * @param The type of the elements in the Stream.
+ * @return A Stream created from the Iterator.
+ */
+ public static Stream asStream(Iterator iterator) {
+ requireNonNull(iterator);
+ return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false);
+ }
+}
diff --git a/src/main/java/com/apptasticsoftware/rssreader/internal/stream/AbstractAutoCloseStream.java b/src/main/java/com/apptasticsoftware/rssreader/internal/stream/AbstractAutoCloseStream.java
new file mode 100644
index 0000000..0c32753
--- /dev/null
+++ b/src/main/java/com/apptasticsoftware/rssreader/internal/stream/AbstractAutoCloseStream.java
@@ -0,0 +1,80 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2024, Apptastic Software
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+package com.apptasticsoftware.rssreader.internal.stream;
+
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.*;
+import java.util.stream.*;
+
+public class AbstractAutoCloseStream> implements AutoCloseable {
+ private final S stream;
+ private final AtomicBoolean isClosed;
+
+ AbstractAutoCloseStream(S stream) {
+ this.stream = Objects.requireNonNull(stream);
+ this.isClosed = new AtomicBoolean();
+ }
+
+ protected S stream() {
+ return stream;
+ }
+
+ @Override
+ public void close() {
+ if (isClosed.compareAndSet(false,true)) {
+ stream().close();
+ }
+ }
+
+ R autoClose(Function function) {
+ try (S s = stream()) {
+ return function.apply(s);
+ }
+ }
+
+ Stream asAutoCloseStream(Stream stream) {
+ return asAutoCloseStream(stream, AutoCloseStream::new);
+ }
+
+ IntStream asAutoCloseStream(IntStream stream) {
+ return asAutoCloseStream(stream, AutoCloseIntStream::new);
+ }
+
+ LongStream asAutoCloseStream(LongStream stream) {
+ return asAutoCloseStream(stream, AutoCloseLongStream::new);
+ }
+
+ DoubleStream asAutoCloseStream(DoubleStream stream) {
+ return asAutoCloseStream(stream, AutoCloseDoubleStream::new);
+ }
+
+ private U asAutoCloseStream(U stream, Function wrapper) {
+ if (stream instanceof AbstractAutoCloseStream) {
+ return stream;
+ }
+ return wrapper.apply(stream);
+ }
+
+}
diff --git a/src/main/java/com/apptasticsoftware/rssreader/internal/stream/AutoCloseDoubleStream.java b/src/main/java/com/apptasticsoftware/rssreader/internal/stream/AutoCloseDoubleStream.java
new file mode 100644
index 0000000..8fb4dae
--- /dev/null
+++ b/src/main/java/com/apptasticsoftware/rssreader/internal/stream/AutoCloseDoubleStream.java
@@ -0,0 +1,227 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2024, Apptastic Software
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+package com.apptasticsoftware.rssreader.internal.stream;
+
+import java.util.DoubleSummaryStatistics;
+import java.util.OptionalDouble;
+import java.util.PrimitiveIterator;
+import java.util.Spliterator;
+import java.util.function.*;
+import java.util.stream.DoubleStream;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+import java.util.stream.Stream;
+
+public class AutoCloseDoubleStream extends AbstractAutoCloseStream implements DoubleStream {
+
+ AutoCloseDoubleStream(DoubleStream stream) {
+ super(stream);
+ }
+
+ @Override
+ public DoubleStream filter(DoublePredicate predicate) {
+ return asAutoCloseStream(stream().filter(predicate));
+ }
+
+ @Override
+ public DoubleStream map(DoubleUnaryOperator mapper) {
+ return asAutoCloseStream(stream().map(mapper));
+ }
+
+ @Override
+ public Stream mapToObj(DoubleFunction extends U> mapper) {
+ return asAutoCloseStream(stream().mapToObj(mapper));
+ }
+
+ @Override
+ public IntStream mapToInt(DoubleToIntFunction mapper) {
+ return asAutoCloseStream(stream().mapToInt(mapper));
+ }
+
+ @Override
+ public LongStream mapToLong(DoubleToLongFunction mapper) {
+ return asAutoCloseStream(stream().mapToLong(mapper));
+ }
+
+ @Override
+ public DoubleStream flatMap(DoubleFunction extends DoubleStream> mapper) {
+ return asAutoCloseStream(stream().flatMap(mapper));
+ }
+
+ @Override
+ public DoubleStream distinct() {
+ return asAutoCloseStream(stream().distinct());
+ }
+
+ @Override
+ public DoubleStream sorted() {
+ return asAutoCloseStream(stream().sorted());
+ }
+
+ @Override
+ public DoubleStream peek(DoubleConsumer action) {
+ return asAutoCloseStream(stream().peek(action));
+ }
+
+ @Override
+ public DoubleStream limit(long maxSize) {
+ return asAutoCloseStream(stream().limit(maxSize));
+ }
+
+ @Override
+ public DoubleStream skip(long n) {
+ return asAutoCloseStream(stream().skip(n));
+ }
+
+ @Override
+ public void forEach(DoubleConsumer action) {
+ autoClose(stream -> {
+ stream.forEach(action);
+ return null;
+ });
+ }
+
+ @Override
+ public void forEachOrdered(DoubleConsumer action) {
+ autoClose(stream -> {
+ stream.forEachOrdered(action);
+ return null;
+ });
+ }
+
+ @Override
+ public double[] toArray() {
+ return autoClose(DoubleStream::toArray);
+ }
+
+ @Override
+ public double reduce(double identity, DoubleBinaryOperator op) {
+ return autoClose(stream -> stream.reduce(identity, op));
+ }
+
+ @Override
+ public OptionalDouble reduce(DoubleBinaryOperator op) {
+ return autoClose(stream -> stream.reduce(op));
+ }
+
+ @Override
+ public R collect(Supplier supplier, ObjDoubleConsumer accumulator, BiConsumer combiner) {
+ return autoClose(stream -> stream.collect(supplier, accumulator, combiner));
+ }
+
+ @Override
+ public double sum() {
+ return autoClose(DoubleStream::sum);
+ }
+
+ @Override
+ public OptionalDouble min() {
+ return autoClose(DoubleStream::min);
+ }
+
+ @Override
+ public OptionalDouble max() {
+ return autoClose(DoubleStream::max);
+ }
+
+ @Override
+ public long count() {
+ return autoClose(DoubleStream::count);
+ }
+
+ @Override
+ public OptionalDouble average() {
+ return autoClose(DoubleStream::average);
+ }
+
+ @Override
+ public DoubleSummaryStatistics summaryStatistics() {
+ return autoClose(DoubleStream::summaryStatistics);
+ }
+
+ @Override
+ public boolean anyMatch(DoublePredicate predicate) {
+ return autoClose(stream -> stream.anyMatch(predicate));
+ }
+
+ @Override
+ public boolean allMatch(DoublePredicate predicate) {
+ return autoClose(stream -> stream.allMatch(predicate));
+ }
+
+ @Override
+ public boolean noneMatch(DoublePredicate predicate) {
+ return autoClose(stream -> stream.noneMatch(predicate));
+ }
+
+ @Override
+ public OptionalDouble findFirst() {
+ return autoClose(DoubleStream::findFirst);
+ }
+
+ @Override
+ public OptionalDouble findAny() {
+ return autoClose(DoubleStream::findAny);
+ }
+
+ @Override
+ public Stream boxed() {
+ return asAutoCloseStream(stream().boxed());
+ }
+
+ @Override
+ public DoubleStream sequential() {
+ return asAutoCloseStream(stream().sequential());
+ }
+
+ @Override
+ public DoubleStream parallel() {
+ return asAutoCloseStream(stream().parallel());
+ }
+
+ @Override
+ public PrimitiveIterator.OfDouble iterator() {
+ return stream().iterator();
+ }
+
+ @Override
+ public Spliterator.OfDouble spliterator() {
+ return stream().spliterator();
+ }
+
+ @Override
+ public boolean isParallel() {
+ return stream().isParallel();
+ }
+
+ @Override
+ public DoubleStream unordered() {
+ return asAutoCloseStream(stream().unordered());
+ }
+
+ @Override
+ public DoubleStream onClose(Runnable closeHandler) {
+ return asAutoCloseStream(stream().onClose(closeHandler));
+ }
+}
diff --git a/src/main/java/com/apptasticsoftware/rssreader/internal/stream/AutoCloseIntStream.java b/src/main/java/com/apptasticsoftware/rssreader/internal/stream/AutoCloseIntStream.java
new file mode 100644
index 0000000..46e51f3
--- /dev/null
+++ b/src/main/java/com/apptasticsoftware/rssreader/internal/stream/AutoCloseIntStream.java
@@ -0,0 +1,234 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2024, Apptastic Software
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+package com.apptasticsoftware.rssreader.internal.stream;
+
+import java.util.*;
+import java.util.function.*;
+import java.util.stream.DoubleStream;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+import java.util.stream.Stream;
+
+public class AutoCloseIntStream extends AbstractAutoCloseStream implements IntStream {
+
+ AutoCloseIntStream(IntStream stream) {
+ super(stream);
+ }
+
+ @Override
+ public IntStream filter(IntPredicate predicate) {
+ return asAutoCloseStream(stream().filter(predicate));
+ }
+
+ @Override
+ public IntStream map(IntUnaryOperator mapper) {
+ return asAutoCloseStream(stream().map(mapper));
+ }
+
+ @Override
+ public Stream mapToObj(IntFunction extends U> mapper) {
+ return asAutoCloseStream(stream().mapToObj(mapper));
+ }
+
+ @Override
+ public LongStream mapToLong(IntToLongFunction mapper) {
+ return asAutoCloseStream(stream().mapToLong(mapper));
+ }
+
+ @Override
+ public DoubleStream mapToDouble(IntToDoubleFunction mapper) {
+ return asAutoCloseStream(stream().mapToDouble(mapper));
+ }
+
+ @Override
+ public IntStream flatMap(IntFunction extends IntStream> mapper) {
+ return asAutoCloseStream(stream().flatMap(mapper));
+ }
+
+ @Override
+ public IntStream distinct() {
+ return asAutoCloseStream(stream().distinct());
+ }
+
+ @Override
+ public IntStream sorted() {
+ return asAutoCloseStream(stream().sorted());
+ }
+
+ @Override
+ public IntStream peek(IntConsumer action) {
+ return asAutoCloseStream(stream().peek(action));
+ }
+
+ @Override
+ public IntStream limit(long maxSize) {
+ return asAutoCloseStream(stream().limit(maxSize));
+ }
+
+ @Override
+ public IntStream skip(long n) {
+ return asAutoCloseStream(stream().skip(n));
+ }
+
+ @Override
+ public void forEach(IntConsumer action) {
+ autoClose(stream -> {
+ stream.forEach(action);
+ return null;
+ });
+ }
+
+ @Override
+ public void forEachOrdered(IntConsumer action) {
+ autoClose(stream -> {
+ stream.forEachOrdered(action);
+ return null;
+ });
+ }
+
+ @Override
+ public int[] toArray() {
+ return autoClose(IntStream::toArray);
+ }
+
+ @Override
+ public int reduce(int identity, IntBinaryOperator op) {
+ return autoClose(stream -> stream.reduce(identity, op));
+ }
+
+ @Override
+ public OptionalInt reduce(IntBinaryOperator op) {
+ return autoClose(stream -> stream.reduce(op));
+ }
+
+ @Override
+ public R collect(Supplier supplier, ObjIntConsumer accumulator, BiConsumer combiner) {
+ return autoClose(stream -> stream.collect(supplier, accumulator, combiner));
+ }
+
+ @Override
+ public int sum() {
+ return autoClose(IntStream::sum);
+ }
+
+ @Override
+ public OptionalInt min() {
+ return autoClose(IntStream::min);
+ }
+
+ @Override
+ public OptionalInt max() {
+ return autoClose(IntStream::max);
+ }
+
+ @Override
+ public long count() {
+ return autoClose(IntStream::count);
+ }
+
+ @Override
+ public OptionalDouble average() {
+ return autoClose(IntStream::average);
+ }
+
+ @Override
+ public IntSummaryStatistics summaryStatistics() {
+ return autoClose(IntStream::summaryStatistics);
+ }
+
+ @Override
+ public boolean anyMatch(IntPredicate predicate) {
+ return autoClose(stream -> stream.anyMatch(predicate));
+ }
+
+ @Override
+ public boolean allMatch(IntPredicate predicate) {
+ return autoClose(stream -> stream.allMatch(predicate));
+ }
+
+ @Override
+ public boolean noneMatch(IntPredicate predicate) {
+ return autoClose(stream -> stream.noneMatch(predicate));
+ }
+
+ @Override
+ public OptionalInt findFirst() {
+ return autoClose(IntStream::findFirst);
+ }
+
+ @Override
+ public OptionalInt findAny() {
+ return autoClose(IntStream::findAny);
+ }
+
+ @Override
+ public LongStream asLongStream() {
+ return asAutoCloseStream(stream().asLongStream());
+ }
+
+ @Override
+ public DoubleStream asDoubleStream() {
+ return asAutoCloseStream(stream().asDoubleStream());
+ }
+
+ @Override
+ public Stream boxed() {
+ return asAutoCloseStream(stream().boxed());
+ }
+
+ @Override
+ public IntStream sequential() {
+ return asAutoCloseStream(stream().sequential());
+ }
+
+ @Override
+ public IntStream parallel() {
+ return asAutoCloseStream(stream().parallel());
+ }
+
+ @Override
+ public PrimitiveIterator.OfInt iterator() {
+ return stream().iterator();
+ }
+
+ @Override
+ public Spliterator.OfInt spliterator() {
+ return stream().spliterator();
+ }
+
+ @Override
+ public boolean isParallel() {
+ return stream().isParallel();
+ }
+
+ @Override
+ public IntStream unordered() {
+ return asAutoCloseStream(stream().unordered());
+ }
+
+ @Override
+ public IntStream onClose(Runnable closeHandler) {
+ return asAutoCloseStream(stream().onClose(closeHandler));
+ }
+}
diff --git a/src/main/java/com/apptasticsoftware/rssreader/internal/stream/AutoCloseLongStream.java b/src/main/java/com/apptasticsoftware/rssreader/internal/stream/AutoCloseLongStream.java
new file mode 100644
index 0000000..ed4ee16
--- /dev/null
+++ b/src/main/java/com/apptasticsoftware/rssreader/internal/stream/AutoCloseLongStream.java
@@ -0,0 +1,230 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2024, Apptastic Software
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+package com.apptasticsoftware.rssreader.internal.stream;
+
+import java.util.*;
+import java.util.function.*;
+import java.util.stream.DoubleStream;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+import java.util.stream.Stream;
+
+public class AutoCloseLongStream extends AbstractAutoCloseStream implements LongStream {
+
+ AutoCloseLongStream(LongStream stream) {
+ super(stream);
+ }
+
+ @Override
+ public LongStream filter(LongPredicate predicate) {
+ return asAutoCloseStream(stream().filter(predicate));
+ }
+
+ @Override
+ public LongStream map(LongUnaryOperator mapper) {
+ return asAutoCloseStream(stream().map(mapper));
+ }
+
+ @Override
+ public Stream mapToObj(LongFunction extends U> mapper) {
+ return asAutoCloseStream(stream().mapToObj(mapper));
+ }
+
+ @Override
+ public IntStream mapToInt(LongToIntFunction mapper) {
+ return asAutoCloseStream(stream().mapToInt(mapper));
+ }
+
+ @Override
+ public DoubleStream mapToDouble(LongToDoubleFunction mapper) {
+ return asAutoCloseStream(stream().mapToDouble(mapper));
+ }
+
+ @Override
+ public LongStream flatMap(LongFunction extends LongStream> mapper) {
+ return asAutoCloseStream(stream().flatMap(mapper));
+ }
+
+ @Override
+ public LongStream distinct() {
+ return asAutoCloseStream(stream().distinct());
+ }
+
+ @Override
+ public LongStream sorted() {
+ return asAutoCloseStream(stream().sorted());
+ }
+
+ @Override
+ public LongStream peek(LongConsumer action) {
+ return asAutoCloseStream(stream().peek(action));
+ }
+
+ @Override
+ public LongStream limit(long maxSize) {
+ return asAutoCloseStream(stream().limit(maxSize));
+ }
+
+ @Override
+ public LongStream skip(long n) {
+ return asAutoCloseStream(stream().skip(n));
+ }
+
+ @Override
+ public void forEach(LongConsumer action) {
+ autoClose(stream -> {
+ stream.forEach(action);
+ return null;
+ });
+ }
+
+ @Override
+ public void forEachOrdered(LongConsumer action) {
+ autoClose(stream -> {
+ stream.forEachOrdered(action);
+ return null;
+ });
+ }
+
+ @Override
+ public long[] toArray() {
+ return autoClose(LongStream::toArray);
+ }
+
+ @Override
+ public long reduce(long identity, LongBinaryOperator op) {
+ return autoClose(stream -> stream.reduce(identity, op));
+ }
+
+ @Override
+ public OptionalLong reduce(LongBinaryOperator op) {
+ return autoClose(stream -> stream.reduce(op));
+ }
+
+ @Override
+ public R collect(Supplier supplier, ObjLongConsumer accumulator, BiConsumer combiner) {
+ return autoClose(stream -> stream.collect(supplier, accumulator, combiner));
+ }
+
+ @Override
+ public long sum() {
+ return autoClose(LongStream::sum);
+ }
+
+ @Override
+ public OptionalLong min() {
+ return autoClose(LongStream::min);
+ }
+
+ @Override
+ public OptionalLong max() {
+ return autoClose(LongStream::max);
+ }
+
+ @Override
+ public long count() {
+ return autoClose(LongStream::count);
+ }
+
+ @Override
+ public OptionalDouble average() {
+ return autoClose(LongStream::average);
+ }
+
+ @Override
+ public LongSummaryStatistics summaryStatistics() {
+ return autoClose(LongStream::summaryStatistics);
+ }
+
+ @Override
+ public boolean anyMatch(LongPredicate predicate) {
+ return autoClose(stream -> stream.anyMatch(predicate));
+ }
+
+ @Override
+ public boolean allMatch(LongPredicate predicate) {
+ return autoClose(stream -> stream.allMatch(predicate));
+ }
+
+ @Override
+ public boolean noneMatch(LongPredicate predicate) {
+ return autoClose(stream -> stream.noneMatch(predicate));
+ }
+
+ @Override
+ public OptionalLong findFirst() {
+ return autoClose(LongStream::findFirst);
+ }
+
+ @Override
+ public OptionalLong findAny() {
+ return autoClose(LongStream::findAny);
+ }
+
+ @Override
+ public DoubleStream asDoubleStream() {
+ return asAutoCloseStream(stream().asDoubleStream());
+ }
+
+ @Override
+ public Stream boxed() {
+ return asAutoCloseStream(stream().boxed());
+ }
+
+ @Override
+ public LongStream sequential() {
+ return asAutoCloseStream(stream().sequential());
+ }
+
+ @Override
+ public LongStream parallel() {
+ return asAutoCloseStream(stream().parallel());
+ }
+
+ @Override
+ public PrimitiveIterator.OfLong iterator() {
+ return stream().iterator();
+ }
+
+ @Override
+ public Spliterator.OfLong spliterator() {
+ return stream().spliterator();
+ }
+
+ @Override
+ public boolean isParallel() {
+ return stream().isParallel();
+ }
+
+ @Override
+ public LongStream unordered() {
+ return asAutoCloseStream(stream().unordered());
+ }
+
+ @Override
+ public LongStream onClose(Runnable closeHandler) {
+ return asAutoCloseStream(stream().onClose(closeHandler));
+ }
+
+}
diff --git a/src/main/java/com/apptasticsoftware/rssreader/internal/stream/AutoCloseStream.java b/src/main/java/com/apptasticsoftware/rssreader/internal/stream/AutoCloseStream.java
new file mode 100644
index 0000000..2108c59
--- /dev/null
+++ b/src/main/java/com/apptasticsoftware/rssreader/internal/stream/AutoCloseStream.java
@@ -0,0 +1,255 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2024, Apptastic Software
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+package com.apptasticsoftware.rssreader.internal.stream;
+
+import java.util.*;
+import java.util.function.*;
+import java.util.stream.*;
+
+/**
+ * A Stream that automatically calls its {@link #close()} method after a terminating operation, such as limit(), forEach(), or collect(), has been executed.
+ *
+ * This class is useful for working with streams that have resources that need to be closed, such as file streams or network connections.
+ *
+ * If the {@link #close()} method is called manually, the stream will be closed and any subsequent operations will throw an {@link IllegalStateException}.
+ *
+]]>
+
+
+ NASA to decide Saturday whether astronauts will ride Boeing’s Starliner home — or use SpaceX’s Dragon instead
+ https://techcrunch.com/2024/08/22/nasa-to-decide-saturday-whether-astronauts-will-ride-boeings-starliner-home-or-use-spacexs-dragon-instead/
+
+ Thu, 22 Aug 2024 22:14:04 +0000
+
+
+ https://techcrunch.com/?p=2841608
+ NASA officials will announce their final decision on Saturday as to whether two NASA astronauts — Butch Wilmore and Sunita Williams — will return to Earth on Boeing’s Starliner spacecraft or hitch a ride home with SpaceX instead — a decision that could have a huge impact across the rapidly evolving space industry. Here’s the […]
+]]>
+
+
+ Gimbal Space takes on legacy suppliers with fast-paced component supply chain
+ https://techcrunch.com/2024/08/22/gimbal-space-takes-on-legacy-suppliers-with-fast-paced-component-supply-chain/
+
+ Thu, 22 Aug 2024 21:15:16 +0000
+
+
+ https://techcrunch.com/?p=2840801
+ America’s space industry seems mature, but the supply chain that provides all the parts and components for rockets, satellites, and other spacecraft is considerably less so. Gimbal Space is aiming to change that, starting with components in the crucial subsystem that enables a spacecraft to orient itself in space — but delivered cheaper and a whole […]
+
+]]>
+
+
+ Waymo wants to chauffeur your kids
+ https://techcrunch.com/2024/08/22/waymo-wants-to-chauffeur-your-kids/
+
+ Thu, 22 Aug 2024 20:36:06 +0000
+
+
+
+
+
+
+
+
+
+
+ https://techcrunch.com/?p=2841545
+ Soon, parents in range of Waymo robotaxis might not have to worry about picking up their kids from after-school activities — or any time, really. The San Francisco Standard reports that Waymo, the Alphabet subsidiary, is considering a subscription program that would let teens hail one of its cars solo and send pickup and drop-off […]
+
+]]>
+
+
+ Synapse’s collapse has frozen nearly $160M from fintech users — here’s how it happened
+ https://techcrunch.com/2024/08/22/synapses-collapse-has-frozen-nearly-160m-from-fintech-users-heres-how-it-happened/
+
+ Thu, 22 Aug 2024 20:33:27 +0000
+
+
+
+
+
+ https://techcrunch.com/?p=2807823
+ Here is a timeline of Synapse’s troubles and the ongoing impact it is having on banking consumers.
+
+]]>
+
+
+ Founder of failed fintech Synapse says he’s raised $11M for new robotics startup
+ https://techcrunch.com/2024/08/22/founder-of-failed-fintech-synapse-says-hes-raised-11m-for-new-robotics-startup/
+
+ Thu, 22 Aug 2024 20:21:36 +0000
+
+
+
+
+
+
+ https://techcrunch.com/?p=2841538
+ Tens of millions of customer dollars remain unaccounted for at his previous startup, fintech Synapse. But that’s not deterring Sankaet Pathak from forging full steam ahead with his new robotics venture. Foundation is a robotics startup with a self-proclaimed mission “to create advanced humanoid robots that can operate in complex environments” to address the labor […]
+
+]]>
+
+
+ Cruise’s robotaxis are coming to the Uber app in 2025
+ https://techcrunch.com/2024/08/22/cruises-robotaxis-are-coming-to-the-uber-app-in-2025/
+
+ Thu, 22 Aug 2024 20:04:47 +0000
+
+
+
+
+
+
+
+ https://techcrunch.com/?p=2841539
+ Cruise, General Motors’ self-driving subsidiary, said it has signed a multi-year partnership with ride-hailing giant Uber to bring its robotaxis to the ride-hailing platform in 2025. Cruise didn’t say when exactly customers would see its vehicles on Uber’s platform, but a spokesperson told TechCrunch that this partnership will follow the re-launch of Cruise’s own driverless […]
+
+]]>
+
+
+ Boston Dynamics’ new electric Atlas can do push-ups
+ https://techcrunch.com/2024/08/22/boston-dynamics-new-electric-atlas-can-do-push-ups/
+
+ Thu, 22 Aug 2024 19:28:43 +0000
+
+
+
+ https://techcrunch.com/?p=2841507
+ Until today, we’ve seen exactly 40 seconds of Boston Dynamics’ new electric Atlas in action. The Hyundai-owned robotics stalwart is very much still in the early stages of commercializing the biped for factory floors. For now, however, it’s doing the thing Boston Dynamics does second best after building robots: showing off in viral video form. […]
+
+]]>
+
+
+ AI sales rep startups are booming. So why are VCs wary?
+ https://techcrunch.com/2024/08/22/ai-sdr-startups-are-booming-so-why-are-vcs-wary/
+
+ Thu, 22 Aug 2024 18:23:41 +0000
+
+
+
+
+
+
+
+
+
+ https://techcrunch.com/?p=2841348
+ When you really probe venture capitalists about investing in AI startups, they’ll tell you that businesses are experimenting wildly but are very slow to add AI solutions into their ongoing business processes. But there are some exceptions. And one of them appears to be an area known as AI sales development representatives, or AI SDRs. […]
+
+]]>
+
+
+ Former Alphabet X spinout Mineral sells technology to John Deere
+ https://techcrunch.com/2024/08/22/former-alphabet-x-spinout-mineral-sells-technology-to-john-deere/
+
+ Thu, 22 Aug 2024 17:29:59 +0000
+
+
+
+
+
+
+
+ https://techcrunch.com/?p=2841333
+ Citing a crowded market and profit concerns, Mineral ceased operation and pivoted to technology licensing.
+
+]]>
+
+
+ Relive the Myspace days by adding a favorite song to your Instagram profile
+ https://techcrunch.com/2024/08/22/relive-the-myspace-days-by-adding-a-favorite-song-to-your-instagram-profile/
+
+ Thu, 22 Aug 2024 17:11:15 +0000
+
+
+
+ https://techcrunch.com/?p=2841261
+ A nod to the Myspace era, Instagram just launched a new feature that allows you to add music to your profile. The feature is in collaboration with singer Sabrina Carpenter, who released a teaser of her new song “Taste” on her Instagram profile ahead of her album release on Friday. To add a song to […]
+
+]]>
+
+
+ Ford pivots on EVs, Waymo doubles its robotaxi ridership and Canoo leaves California
+ https://techcrunch.com/2024/08/22/ford-pivots-on-evs-waymo-doubles-its-robotaxi-ridership-and-canoo-leaves-california/
+
+ Thu, 22 Aug 2024 17:05:00 +0000
+
+
+
+
+
+ https://techcrunch.com/?p=2840820
+ Welcome back to TechCrunch Mobility — your central hub for news and insights on the future of transportation. Sign up here for free — just click TechCrunch Mobility! Monterey Car Week is a wrap. And while it is still steeped in tradition, the collection of parties, auctions and car shows that were held throughout the […]
+
+]]>
+
+
+ Ecovacs says it will fix bugs that can be abused to spy on robot owners
+ https://techcrunch.com/2024/08/22/ecovacs-says-it-will-fix-bugs-that-can-be-abused-to-spy-on-robot-owners/
+
+ Thu, 22 Aug 2024 16:52:22 +0000
+
+
+
+
+
+
+
+
+
+ https://techcrunch.com/?p=2841191
+ After saying users "do not need to worry excessively" about a series of security flaws, Ecovacs said it will — in fact — roll out fixes.
+
+]]>
+
+
+ Alphabet X’s latest spinout brings computer vision and AI to salmon farms
+ https://techcrunch.com/2024/08/22/alphabet-xs-latest-spinout-brings-computer-vision-and-ai-to-salmon-farms/
+
+ Thu, 22 Aug 2024 16:32:24 +0000
+
+
+
+
+
+ https://techcrunch.com/?p=2841276
+ Tidal, which quietly spun out of the department in mid-July, has its own grand ambitions to “feed humanity sustainably.”
+
+]]>
+
+
+ India’s open commerce network expands into digital lending
+ https://techcrunch.com/2024/08/22/indias-open-commerce-network-expands-into-digital-lending/
+
+ Thu, 22 Aug 2024 16:21:32 +0000
+
+
+
+
+ https://techcrunch.com/?p=2841078
+ India's ONDC has launched digital lending on its network as it expands into financial services after powering e-commerce, mobility and logistics with its standardized framework.
+
+]]>
+
+
+ Under DMA probe, Apple tweaks design of EU browser choice screens, expands app default settings
+ https://techcrunch.com/2024/08/22/under-dma-probe-apple-tweaks-design-of-eu-browser-choice-screens-expands-app-default-settings/
+
+ Thu, 22 Aug 2024 16:02:40 +0000
+
+
+
+
+ https://techcrunch.com/?p=2841174
+ Apple continues to adjust its approach to compliance with the European Union’s Digital Markets Act (DMA): Announcing another batch of changes Thursday, the iPhone maker showed off redesigned browser choice screens it said would be coming to iOS and iPadOS “later this year,” with version 18 of its mobile software platforms. The tweaked browser choice […]
+
+]]>
+
+
+ TechCrunch Minute: AI could help design and test F1 cars faster
+ https://techcrunch.com/video/techcrunch-minute-ai-could-help-design-and-test-f1-cars-faster/
+
+ Thu, 22 Aug 2024 16:00:00 +0000
+
+
+
+
+ https://techcrunch.com/?post_type=tc_video&p=2841127
+ Formula One teams are looking at a startup called BeyondMath to bring their car construction to the next level. BeyondMath is working in the field of computational fluid dynamics, which attempts to digitally model how an object moves through water or air. But that’s a lot easier said than done — even with excessive computing […]
+
+]]>
+
+
+ Threads spotted exploring ads, but says ‘no immediate timeline’ toward monetization
+ https://techcrunch.com/2024/08/22/threads-explores-ads-but-says-no-immediate-timeline-toward-monetization/
+
+ Thu, 22 Aug 2024 15:36:56 +0000
+
+
+
+
+ https://techcrunch.com/?p=2841142
+ These findings indicate that Threads engineers are exploring ad technology, but that doesn't mean Threads will debut ads anytime soon, as some suspect.
+
+]]>
+
+
+ Cruise recalls robotaxi fleet to resolve federal safety probe
+ https://techcrunch.com/2024/08/22/cruise-recall-av-fleet-nhtsa-probe-closed/
+
+ Thu, 22 Aug 2024 15:33:05 +0000
+
+
+
+
+
+
+
+ https://techcrunch.com/?p=2841150
+ The recall and the conclusion of the probe takes one worry off of Cruise's plate at a time when the company is under great scrutiny.
+
+]]>
+
+
+ Cache Energy’s mysterious white pellets could help kill coal and natural gas
+ https://techcrunch.com/2024/08/22/cache-energys-mysterious-white-pellets-could-help-kill-coal-and-natural-gas/
+
+ Thu, 22 Aug 2024 15:30:00 +0000
+
+
+
+
+
+
+
+ https://techcrunch.com/?p=2839682
+ The pellets can be stored in piles or silos, moved around using conveyor belts, and transported via rail cars.
+