diff --git a/build.gradle b/build.gradle index f8f618e..66c9186 100644 --- a/build.gradle +++ b/build.gradle @@ -37,10 +37,10 @@ allprojects { } ext { - fulcrumVersion = "0.14.0" - yconfVersion = "0.3.1" - zerologVersion = "0.14.0" - hazelcastVersion = "3.10" + fulcrumVersion = "0.15.1" + yconfVersion = "0.4.0" + zerologVersion = "0.15.0" + hazelcastVersion = "3.10.2" } dependencies { diff --git a/core/src/main/java/com/obsidiandynamics/meteor/DefaultPublisher.java b/core/src/main/java/com/obsidiandynamics/meteor/DefaultPublisher.java index d96b7ac..b5ff343 100644 --- a/core/src/main/java/com/obsidiandynamics/meteor/DefaultPublisher.java +++ b/core/src/main/java/com/obsidiandynamics/meteor/DefaultPublisher.java @@ -1,5 +1,7 @@ package com.obsidiandynamics.meteor; +import static com.obsidiandynamics.retry.Retry.*; + import java.util.*; import java.util.concurrent.*; @@ -45,7 +47,7 @@ private static class AsyncRecord { final StreamConfig streamConfig = config.getStreamConfig(); final Retry retry = new Retry() - .withExceptionClass(HazelcastException.class) + .withExceptionMatcher(isA(HazelcastException.class)) .withAttempts(Integer.MAX_VALUE) .withBackoff(100) .withFaultHandler(config.getZlg()::w) diff --git a/core/src/main/java/com/obsidiandynamics/meteor/DefaultSubscriber.java b/core/src/main/java/com/obsidiandynamics/meteor/DefaultSubscriber.java index 8841924..9d12808 100644 --- a/core/src/main/java/com/obsidiandynamics/meteor/DefaultSubscriber.java +++ b/core/src/main/java/com/obsidiandynamics/meteor/DefaultSubscriber.java @@ -1,5 +1,7 @@ package com.obsidiandynamics.meteor; +import static com.obsidiandynamics.retry.Retry.*; + import java.util.*; import java.util.concurrent.*; @@ -34,6 +36,8 @@ public final class DefaultSubscriber implements Subscriber, Joinable { private final WorkerThread keeperThread; + private final int readBatchSize; + private volatile long nextReadOffset; private volatile long lastReadOffset; @@ -58,12 +62,13 @@ public final class DefaultSubscriber implements Subscriber, Joinable { final StreamConfig streamConfig = config.getStreamConfig(); final Retry retry = new Retry() - .withExceptionClass(HazelcastException.class) + .withExceptionMatcher(isA(HazelcastException.class)) .withAttempts(Integer.MAX_VALUE) .withBackoff(100) .withFaultHandler(config.getZlg()::w) .withErrorHandler(config.getZlg()::e); buffer = new RetryableRingbuffer<>(retry, StreamHelper.getRingbuffer(instance, streamConfig)); + readBatchSize = Math.min(1_000, streamConfig.getHeapCapacity()); if (config.hasGroup()) { // checks for IllegalArgumentException; no initial assignment is made until poll() is called @@ -134,7 +139,7 @@ public RecordBatch poll(long timeoutMillis) throws InterruptedException { lastReadOffset = nextReadOffset - 1; } - final ICompletableFuture> f = buffer.readManyAsync(nextReadOffset, 1, 1_000, StreamHelper::isNotNull); + final ICompletableFuture> f = buffer.readManyAsync(nextReadOffset, 1, readBatchSize, StreamHelper::isNotNull); final long waitMillis = computeWait(wake, Long.MAX_VALUE); try { diff --git a/core/src/test/java/com/obsidiandynamics/meteor/PublisherTest.java b/core/src/test/java/com/obsidiandynamics/meteor/PublisherTest.java index c486687..71dfb20 100644 --- a/core/src/test/java/com/obsidiandynamics/meteor/PublisherTest.java +++ b/core/src/test/java/com/obsidiandynamics/meteor/PublisherTest.java @@ -209,10 +209,12 @@ private static void publish(int numMessages, Publisher publisher, List r */ private static List readRemaining(Ringbuffer buffer, long startSequence) throws InterruptedException, ExecutionException { long adjStartSequence = startSequence; + final List items = new ArrayList<>(); for (;;) { final ReadResultSet results; try { - results = buffer.readManyAsync(adjStartSequence, 0, 1000, null).get(); + final int toRead = (int) Math.min(1_000, buffer.capacity()); + results = buffer.readManyAsync(adjStartSequence, 0, toRead, null).get(); } catch (ExecutionException e) { if (e.getCause() instanceof StaleSequenceException) { System.out.format("SSE: fast-forwarding start sequence to %d\n", buffer.headSequence()); @@ -222,9 +224,13 @@ private static List readRemaining(Ringbuffer buffer, long startS throw e; } } - final List items = new ArrayList<>(results.size()); - results.forEach(items::add); - return items; + + if (results.size() > 0) { + results.forEach(items::add); + adjStartSequence += results.size(); + } else { + return items; + } } } diff --git a/elect/src/main/java/com/obsidiandynamics/meteor/Election.java b/elect/src/main/java/com/obsidiandynamics/meteor/Election.java index 8637a62..6e43028 100644 --- a/elect/src/main/java/com/obsidiandynamics/meteor/Election.java +++ b/elect/src/main/java/com/obsidiandynamics/meteor/Election.java @@ -1,5 +1,7 @@ package com.obsidiandynamics.meteor; +import static com.obsidiandynamics.retry.Retry.*; + import java.util.*; import com.hazelcast.core.*; @@ -31,7 +33,7 @@ public Election(ElectionConfig config, IMap leases) { this.config = config; final Retry retry = new Retry() - .withExceptionClass(HazelcastException.class) + .withExceptionMatcher(isA(HazelcastException.class)) .withAttempts(Integer.MAX_VALUE) .withBackoff(100) .withFaultHandler(config.getZlg()::w)