Skip to content

Commit

Permalink
Test case fix ups
Browse files Browse the repository at this point in the history
Signed-off-by: Bukhtawar Khan <[email protected]>
  • Loading branch information
Bukhtawar committed May 31, 2022
1 parent 72dd53d commit 14187a7
Show file tree
Hide file tree
Showing 15 changed files with 94 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,20 @@ public TranslogManager translogManager() {
engineConfig,
shardId,
readLock,
getLocalCheckpointTracker(),
this::getLocalCheckpointTracker,
translogUUID,
this::revisitIndexDeletionPolicyOnTranslogSynced,
new TranslogManager.TranslogEventListener() {
@Override
public void onTranslogSync() {
revisitIndexDeletionPolicyOnTranslogSynced();
}

@Override
public void onTranslogRecovery() {
flush(false, true);
translogManager.trimUnreferencedTranslogFiles();
}
},
() -> ensureOpen(null),
this::failEngine,
this::failOnTragicEvent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ public NRTReplicationEngine(EngineConfig engineConfig) {
engineConfig,
shardId,
readLock,
getLocalCheckpointTracker(),
this::getLocalCheckpointTracker,
translogUUID,
() -> {},
TranslogManager.TranslogEventListener.NOOP_TRANSLOG_EVENT_LISTENER,
() -> ensureOpen(null),
this::failEngine,
(ex) -> null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1692,8 +1692,7 @@ public long recoverLocallyUpToGlobalCheckpoint() {
.recoverFromTranslog(
translogRecoveryRunner,
getEngine().getProcessedLocalCheckpoint(),
globalCheckpoint,
() -> getEngine().flush(false, true)
globalCheckpoint
);
logger.trace("shard locally recovered up to {}", getEngine().getSeqNoStats(globalCheckpoint));
} finally {
Expand Down Expand Up @@ -1879,8 +1878,7 @@ public void openEngineAndRecoverFromTranslog() throws IOException {
.recoverFromTranslog(
translogRecoveryRunner,
getEngine().getProcessedLocalCheckpoint(),
Long.MAX_VALUE,
() -> getEngine().flush(false, true)
Long.MAX_VALUE
);
}

Expand Down Expand Up @@ -3962,8 +3960,7 @@ public void close() throws IOException {
.recoverFromTranslog(
translogRunner,
newEngineReference.get().getProcessedLocalCheckpoint(),
globalCheckpoint,
() -> newEngineReference.get().flush(false, true)
globalCheckpoint
);
newEngineReference.get().refresh("reset_engine");
synchronized (engineMutex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.function.Function;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.stream.Stream;

/**
Expand All @@ -45,7 +46,7 @@ public InternalTranslogManager(
EngineConfig engineConfig,
ShardId shardId,
ReleasableLock readLock,
LocalCheckpointTracker tracker,
Supplier<LocalCheckpointTracker> localCheckpointTrackerSupplier,
String translogUUID,
TranslogManager.TranslogEventListener translogEventListener,
Runnable ensureOpen,
Expand All @@ -60,6 +61,7 @@ public InternalTranslogManager(
this.translogEventListener = translogEventListener;
final TranslogDeletionPolicy translogDeletionPolicy;
TranslogDeletionPolicy customTranslogDeletionPolicy = null;
Translog translog;
if (engineConfig.getCustomTranslogDeletionPolicyFactory() != null) {
customTranslogDeletionPolicy = engineConfig.getCustomTranslogDeletionPolicyFactory()
.create(engineConfig.getIndexSettings(), engineConfig.retentionLeasesSupplier());
Expand All @@ -73,7 +75,8 @@ public InternalTranslogManager(
engineConfig.getIndexSettings().getTranslogRetentionTotalFiles()
);
}
Translog translog = openTranslog(engineConfig, translogDeletionPolicy, engineConfig.getGlobalCheckpointSupplier(), seqNo -> {
translog = openTranslog(engineConfig, translogDeletionPolicy, engineConfig.getGlobalCheckpointSupplier(), seqNo -> {
final LocalCheckpointTracker tracker = localCheckpointTrackerSupplier.get();
assert tracker != null || getTranslog(true).isOpen() == false;
if (tracker != null) {
tracker.markSeqNoAsPersisted(seqNo);
Expand Down Expand Up @@ -119,16 +122,15 @@ public void rollTranslogGeneration() throws TranslogException {
public void recoverFromTranslog(
TranslogRecoveryRunner translogRecoveryRunner,
long localCheckpoint,
long recoverUpToSeqNo,
Runnable flush
long recoverUpToSeqNo
) throws IOException {
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen.run();
if (pendingTranslogRecovery.get() == false) {
throw new IllegalStateException("Engine has already been recovered");
}
try {
recoverFromTranslogInternal(translogRecoveryRunner, localCheckpoint, recoverUpToSeqNo, flush);
recoverFromTranslogInternal(translogRecoveryRunner, localCheckpoint, recoverUpToSeqNo);
} catch (Exception e) {
try {
pendingTranslogRecovery.set(true); // just play safe and never allow commits on this see #ensureCanFlush
Expand All @@ -144,8 +146,7 @@ public void recoverFromTranslog(
private void recoverFromTranslogInternal(
TranslogRecoveryRunner translogRecoveryRunner,
long localCheckpoint,
long recoverUpToSeqNo,
Runnable flush
long recoverUpToSeqNo
) throws IOException {
final int opsRecovered;
if (localCheckpoint < recoverUpToSeqNo) {
Expand All @@ -168,8 +169,7 @@ private void recoverFromTranslogInternal(
translog.currentFileGeneration()
)
);
flush.run();
translog.trimUnreferencedReaders();
translogEventListener.onTranslogRecovery();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ public void rollTranslogGeneration() throws TranslogException {}
public void recoverFromTranslog(
TranslogRecoveryRunner translogRecoveryRunner,
long localCheckpoint,
long recoverUpToSeqNo,
Runnable flush
long recoverUpToSeqNo
) throws IOException {
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ public abstract class TranslogManager {
abstract public void recoverFromTranslog(
TranslogRecoveryRunner translogRecoveryRunner,
long localCheckpoint,
long recoverUpToSeqNo,
Runnable flush
long recoverUpToSeqNo
) throws IOException;

/**
Expand Down Expand Up @@ -98,6 +97,17 @@ abstract public int restoreLocalHistoryFromTranslog(long processedCheckpoint, Tr
public abstract void ensureCanFlush();

public interface TranslogEventListener {
void onTranslogSync();

TranslogEventListener NOOP_TRANSLOG_EVENT_LISTENER = new TranslogEventListener() {};
/**
* Invoked after translog sync operations
*/
default void onTranslogSync() {};

/**
* Invoked after recovering operations from translog
*/
default void onTranslogRecovery() {};

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.io.IOException;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;

/***
*
Expand All @@ -27,14 +28,14 @@ public WriteOnlyTranslogManager(
EngineConfig engineConfig,
ShardId shardId,
ReleasableLock readLock,
LocalCheckpointTracker tracker,
Supplier<LocalCheckpointTracker> localCheckpointTrackerSupplier,
String translogUUID,
TranslogEventListener translogEventListener,
Runnable ensureOpen,
BiConsumer<String, Exception> failEngine,
Function<AlreadyClosedException, Boolean> failOnTragicEvent
) throws IOException {
super(engineConfig, shardId, readLock, tracker, translogUUID, translogEventListener, ensureOpen, failEngine, failOnTragicEvent);
super(engineConfig, shardId, readLock, localCheckpointTrackerSupplier, translogUUID, translogEventListener, ensureOpen, failEngine, failOnTragicEvent);
}

@Override
Expand All @@ -46,8 +47,7 @@ public int restoreLocalHistoryFromTranslog(long processedCheckpoint, TranslogRec
public void recoverFromTranslog(
TranslogRecoveryRunner translogRecoveryRunner,
long localCheckpoint,
long recoverUpToSeqNo,
Runnable flush
long recoverUpToSeqNo
) throws IOException {
throw new UnsupportedOperationException("Read only replicas do not have an IndexWriter and cannot recover from a translog.");
}
Expand Down
Loading

0 comments on commit 14187a7

Please sign in to comment.