Skip to content

Commit

Permalink
Updated Fulcrum to 0.15.0, YConf to 0.4.0, Zerolog to 0.15.0, Hazelca…
Browse files Browse the repository at this point in the history
…st to 3.10.2
  • Loading branch information
ekoutanov committed Jun 21, 2018
1 parent 3935e05 commit f06a29e
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 12 deletions.
8 changes: 4 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ allprojects {
}

ext {
fulcrumVersion = "0.14.0"
yconfVersion = "0.3.1"
zerologVersion = "0.14.0"
hazelcastVersion = "3.10"
fulcrumVersion = "0.15.1"
yconfVersion = "0.4.0"
zerologVersion = "0.15.0"
hazelcastVersion = "3.10.2"
}

dependencies {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.obsidiandynamics.meteor;

import static com.obsidiandynamics.retry.Retry.*;

import java.util.*;
import java.util.concurrent.*;

Expand Down Expand Up @@ -45,7 +47,7 @@ private static class AsyncRecord {
final StreamConfig streamConfig = config.getStreamConfig();

final Retry retry = new Retry()
.withExceptionClass(HazelcastException.class)
.withExceptionMatcher(isA(HazelcastException.class))
.withAttempts(Integer.MAX_VALUE)
.withBackoff(100)
.withFaultHandler(config.getZlg()::w)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.obsidiandynamics.meteor;

import static com.obsidiandynamics.retry.Retry.*;

import java.util.*;
import java.util.concurrent.*;

Expand Down Expand Up @@ -34,6 +36,8 @@ public final class DefaultSubscriber implements Subscriber, Joinable {

private final WorkerThread keeperThread;

private final int readBatchSize;

private volatile long nextReadOffset;

private volatile long lastReadOffset;
Expand All @@ -58,12 +62,13 @@ public final class DefaultSubscriber implements Subscriber, Joinable {

final StreamConfig streamConfig = config.getStreamConfig();
final Retry retry = new Retry()
.withExceptionClass(HazelcastException.class)
.withExceptionMatcher(isA(HazelcastException.class))
.withAttempts(Integer.MAX_VALUE)
.withBackoff(100)
.withFaultHandler(config.getZlg()::w)
.withErrorHandler(config.getZlg()::e);
buffer = new RetryableRingbuffer<>(retry, StreamHelper.getRingbuffer(instance, streamConfig));
readBatchSize = Math.min(1_000, streamConfig.getHeapCapacity());

if (config.hasGroup()) {
// checks for IllegalArgumentException; no initial assignment is made until poll() is called
Expand Down Expand Up @@ -134,7 +139,7 @@ public RecordBatch poll(long timeoutMillis) throws InterruptedException {
lastReadOffset = nextReadOffset - 1;
}

final ICompletableFuture<ReadResultSet<byte[]>> f = buffer.readManyAsync(nextReadOffset, 1, 1_000, StreamHelper::isNotNull);
final ICompletableFuture<ReadResultSet<byte[]>> f = buffer.readManyAsync(nextReadOffset, 1, readBatchSize, StreamHelper::isNotNull);

final long waitMillis = computeWait(wake, Long.MAX_VALUE);
try {
Expand Down
14 changes: 10 additions & 4 deletions core/src/test/java/com/obsidiandynamics/meteor/PublisherTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,12 @@ private static void publish(int numMessages, Publisher publisher, List<Record> r
*/
private static List<byte[]> readRemaining(Ringbuffer<byte[]> buffer, long startSequence) throws InterruptedException, ExecutionException {
long adjStartSequence = startSequence;
final List<byte[]> items = new ArrayList<>();
for (;;) {
final ReadResultSet<byte[]> results;
try {
results = buffer.readManyAsync(adjStartSequence, 0, 1000, null).get();
final int toRead = (int) Math.min(1_000, buffer.capacity());
results = buffer.readManyAsync(adjStartSequence, 0, toRead, null).get();
} catch (ExecutionException e) {
if (e.getCause() instanceof StaleSequenceException) {
System.out.format("SSE: fast-forwarding start sequence to %d\n", buffer.headSequence());
Expand All @@ -222,9 +224,13 @@ private static List<byte[]> readRemaining(Ringbuffer<byte[]> buffer, long startS
throw e;
}
}
final List<byte[]> items = new ArrayList<>(results.size());
results.forEach(items::add);
return items;

if (results.size() > 0) {
results.forEach(items::add);
adjStartSequence += results.size();
} else {
return items;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.obsidiandynamics.meteor;

import static com.obsidiandynamics.retry.Retry.*;

import java.util.*;

import com.hazelcast.core.*;
Expand Down Expand Up @@ -31,7 +33,7 @@ public Election(ElectionConfig config, IMap<String, byte[]> leases) {
this.config = config;

final Retry retry = new Retry()
.withExceptionClass(HazelcastException.class)
.withExceptionMatcher(isA(HazelcastException.class))
.withAttempts(Integer.MAX_VALUE)
.withBackoff(100)
.withFaultHandler(config.getZlg()::w)
Expand Down

0 comments on commit f06a29e

Please sign in to comment.