Skip to content

Commit

Permalink
Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
vikasvb90 committed Dec 10, 2024
1 parent 80a0654 commit ee2ccde
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1072,15 +1072,10 @@ protected void doRun() {

ShardRouting primary = null;
if (indexMetadata.getSplitShardsMetadata().isEmptyParentShard(request.shardId().id())) {
if (state.version() < request.routedBasedOnClusterVersion()) {
// This will get retried on coordinator. Entire request will be re-driven on respective child shards.
// Since, we are throwing a custom exception, coordinator will re-drive it explicitly on child shards
// even if it is also stale and yet to receive update from cluster manager.
throw new PrimaryShardSplitException("Primary shard is already split. Cannot perform replication operation on parent primary.");
} else {
finishAsFailed(new IndexNotFoundException(request.shardId().getIndex()));
return;
}
// This will get retried on coordinator. Entire request will be re-driven on respective child shards.
// Since, we are throwing a custom exception, coordinator will re-drive it explicitly on child shards
// even if coordinator is also stale and yet to receive update from cluster manager.
throw new PrimaryShardSplitException("Primary shard is already split. Cannot perform replication operation on parent primary.");
} else {
IndexRoutingTable indexRoutingTable = state.getRoutingTable().index(request.shardId().getIndex());
IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(request.shardId().id());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -537,18 +537,6 @@ protected Releasable drainSync() {
}
}

@Override
protected void copyTranslogToTarget(Translog translog) {
assert translog instanceof RemoteFsTranslog;
RemoteFsTranslog targetTranslog = (RemoteFsTranslog) translog;
translogTransferManager.copyTranslogToTarget(targetTranslog.translogTransferManager);
}

public Releasable acquireRemoteDeletionPermits() throws InterruptedException {
remoteGenerationDeletionPermits.acquire(REMOTE_DELETION_PERMITS);
return Releasables.releaseOnce(() -> remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS));
}

@Override
public void trimUnreferencedReaders() throws IOException {
// clean up local translog files and updates readers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,7 @@ public GatedCloseable<Long> acquireRetentionLockWithMinGen() {
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
final long viewGen = getMinFileGeneration();
logger.info("Minimum translog with gen " + viewGen + " seq number " + current.getCheckpoint().minSeqNo);
Closeable closeable = acquireTranslogGenFromDeletionPolicy(viewGen);
return new GatedCloseable<>(viewGen, closeable::close);
}
Expand Down Expand Up @@ -1832,12 +1833,6 @@ protected void setMinSeqNoToKeep(long seqNo) {}

protected void onDelete() {}

protected void copyTranslogToTarget(Translog translog) {}

protected Releasable acquireRemoteDeletionPermits() throws InterruptedException {
return Releasables.releaseOnce(() -> {});
}

/**
* Drains ongoing syncs to the underlying store. It returns a releasable which can be closed to resume the syncs back.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,9 @@ protected void innerRecoveryToTarget(ActionListener<RecoveryResponse> listener,
initiateTracking();

final long endingSeqNo = sourceShard.seqNoStats().getMaxSeqNo();
// Syncing here because sequence number can be greater than local checkpoint and operations may not yet be
// present in translog.
sourceShard.sync();
final Translog.Snapshot phase2Snapshot;
if (startingSeqNo > endingSeqNo) {
phase2Snapshot = new EmptySnapshot();
Expand Down

0 comments on commit ee2ccde

Please sign in to comment.