Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use async write for manifest file and use latch for timeout #10968

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,7 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING,
RemoteClusterStateService.INDEX_METADATA_UPLOAD_TIMEOUT_SETTING,
RemoteClusterStateService.GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING,
RemoteClusterStateService.METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING,
RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING,
IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@

public static final TimeValue GLOBAL_METADATA_UPLOAD_TIMEOUT_DEFAULT = TimeValue.timeValueMillis(20000);

public static final TimeValue METADATA_MANIFEST_UPLOAD_TIMEOUT_DEFAULT = TimeValue.timeValueMillis(20000);

public static final Setting<TimeValue> INDEX_METADATA_UPLOAD_TIMEOUT_SETTING = Setting.timeSetting(
"cluster.remote_store.state.index_metadata.upload_timeout",
INDEX_METADATA_UPLOAD_TIMEOUT_DEFAULT,
Expand All @@ -101,6 +103,13 @@
Setting.Property.NodeScope
);

public static final Setting<TimeValue> METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING = Setting.timeSetting(
"cluster.remote_store.state.metadata_manifest.upload_timeout",
METADATA_MANIFEST_UPLOAD_TIMEOUT_DEFAULT,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

public static final ChecksumBlobStoreFormat<IndexMetadata> INDEX_METADATA_FORMAT = new ChecksumBlobStoreFormat<>(
"index-metadata",
METADATA_NAME_FORMAT,
Expand Down Expand Up @@ -157,6 +166,7 @@

private volatile TimeValue indexMetadataUploadTimeout;
private volatile TimeValue globalMetadataUploadTimeout;
private volatile TimeValue metadataManifestUploadTimeout;

private final AtomicBoolean deleteStaleMetadataRunning = new AtomicBoolean(false);
private final RemotePersistenceStats remoteStateStats;
Expand Down Expand Up @@ -190,9 +200,11 @@
this.slowWriteLoggingThreshold = clusterSettings.get(SLOW_WRITE_LOGGING_THRESHOLD);
this.indexMetadataUploadTimeout = clusterSettings.get(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING);
this.globalMetadataUploadTimeout = clusterSettings.get(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING);
this.metadataManifestUploadTimeout = clusterSettings.get(METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING);
clusterSettings.addSettingsUpdateConsumer(SLOW_WRITE_LOGGING_THRESHOLD, this::setSlowWriteLoggingThreshold);
clusterSettings.addSettingsUpdateConsumer(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING, this::setIndexMetadataUploadTimeout);
clusterSettings.addSettingsUpdateConsumer(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, this::setGlobalMetadataUploadTimeout);
clusterSettings.addSettingsUpdateConsumer(METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING, this::setMetadataManifestUploadTimeout);
this.remoteStateStats = new RemotePersistenceStats();
}

Expand Down Expand Up @@ -401,21 +413,21 @@
try {
if (latch.await(getGlobalMetadataUploadTimeout().millis(), TimeUnit.MILLISECONDS) == false) {
// TODO: We should add metrics where transfer is timing out. [Issue: #10687]
GlobalMetadataTransferException ex = new GlobalMetadataTransferException(
RemoteStateTransferException ex = new RemoteStateTransferException(

Check warning on line 416 in server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java#L416

Added line #L416 was not covered by tests
String.format(Locale.ROOT, "Timed out waiting for transfer of global metadata to complete")
);
throw ex;
}
} catch (InterruptedException ex) {
GlobalMetadataTransferException exception = new GlobalMetadataTransferException(
RemoteStateTransferException exception = new RemoteStateTransferException(

Check warning on line 422 in server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java#L422

Added line #L422 was not covered by tests
String.format(Locale.ROOT, "Timed out waiting for transfer of global metadata to complete - %s"),
ex
);
Thread.currentThread().interrupt();
throw exception;
}
if (exceptionReference.get() != null) {
throw new GlobalMetadataTransferException(exceptionReference.get().getMessage(), exceptionReference.get());
throw new RemoteStateTransferException(exceptionReference.get().getMessage(), exceptionReference.get());
}
return result.get();
}
Expand All @@ -440,7 +452,7 @@
);
result.add(uploadedIndexMetadata);
}, ex -> {
assert ex instanceof IndexMetadataTransferException;
assert ex instanceof RemoteStateTransferException;
logger.error(
() -> new ParameterizedMessage("Exception during transfer of IndexMetadata to Remote {}", ex.getMessage()),
ex
Expand All @@ -457,7 +469,7 @@

try {
if (latch.await(getIndexMetadataUploadTimeout().millis(), TimeUnit.MILLISECONDS) == false) {
IndexMetadataTransferException ex = new IndexMetadataTransferException(
RemoteStateTransferException ex = new RemoteStateTransferException(

Check warning on line 472 in server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java#L472

Added line #L472 was not covered by tests
String.format(
Locale.ROOT,
"Timed out waiting for transfer of index metadata to complete - %s",
Expand All @@ -469,7 +481,7 @@
}
} catch (InterruptedException ex) {
exceptionList.forEach(ex::addSuppressed);
IndexMetadataTransferException exception = new IndexMetadataTransferException(
RemoteStateTransferException exception = new RemoteStateTransferException(

Check warning on line 484 in server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java#L484

Added line #L484 was not covered by tests
String.format(
Locale.ROOT,
"Timed out waiting for transfer of index metadata to complete - %s",
Expand All @@ -481,7 +493,7 @@
throw exception;
}
if (exceptionList.size() > 0) {
IndexMetadataTransferException exception = new IndexMetadataTransferException(
RemoteStateTransferException exception = new RemoteStateTransferException(
String.format(
Locale.ROOT,
"Exception during transfer of IndexMetadata to Remote %s",
Expand Down Expand Up @@ -520,7 +532,7 @@
indexMetadataContainer.path().buildAsString() + indexMetadataFilename
)
),
ex -> latchedActionListener.onFailure(new IndexMetadataTransferException(indexMetadata.getIndex().toString(), ex))
ex -> latchedActionListener.onFailure(new RemoteStateTransferException(indexMetadata.getIndex().toString(), ex))
);

INDEX_METADATA_FORMAT.writeAsyncWithUrgentPriority(
Expand Down Expand Up @@ -601,14 +613,45 @@

private void writeMetadataManifest(String clusterName, String clusterUUID, ClusterMetadataManifest uploadManifest, String fileName)
throws IOException {
AtomicReference<String> result = new AtomicReference<String>();
AtomicReference<Exception> exceptionReference = new AtomicReference<Exception>();

final BlobContainer metadataManifestContainer = manifestContainer(clusterName, clusterUUID);
CLUSTER_METADATA_MANIFEST_FORMAT.write(

// latch to wait until upload is not finished
CountDownLatch latch = new CountDownLatch(1);

LatchedActionListener completionListener = new LatchedActionListener<>(ActionListener.wrap(resp -> {
logger.trace(String.format(Locale.ROOT, "Manifest file uploaded successfully."));
}, ex -> { exceptionReference.set(ex); }), latch);

CLUSTER_METADATA_MANIFEST_FORMAT.writeAsyncWithUrgentPriority(
uploadManifest,
metadataManifestContainer,
fileName,
blobStoreRepository.getCompressor(),
completionListener,
FORMAT_PARAMS
);

try {
if (latch.await(getMetadataManifestUploadTimeout().millis(), TimeUnit.MILLISECONDS) == false) {
RemoteStateTransferException ex = new RemoteStateTransferException(
String.format(Locale.ROOT, "Timed out waiting for transfer of manifest file to complete")
);
shwetathareja marked this conversation as resolved.
Show resolved Hide resolved
throw ex;
}
} catch (InterruptedException ex) {
RemoteStateTransferException exception = new RemoteStateTransferException(
String.format(Locale.ROOT, "Timed out waiting for transfer of manifest file to complete - %s"),

Check warning on line 646 in server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java#L644-L646

Added lines #L644 - L646 were not covered by tests
ex
);
Thread.currentThread().interrupt();
throw exception;

Check warning on line 650 in server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java#L649-L650

Added lines #L649 - L650 were not covered by tests
}
if (exceptionReference.get() != null) {
throw new RemoteStateTransferException(exceptionReference.get().getMessage(), exceptionReference.get());

Check warning on line 653 in server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java#L653

Added line #L653 was not covered by tests
}
logger.debug(
"Metadata manifest file [{}] written during [{}] phase. ",
fileName,
Expand Down Expand Up @@ -668,6 +711,10 @@
this.globalMetadataUploadTimeout = newGlobalMetadataUploadTimeout;
}

private void setMetadataManifestUploadTimeout(TimeValue newMetadataManifestUploadTimeout) {
this.metadataManifestUploadTimeout = newMetadataManifestUploadTimeout;
}

public TimeValue getIndexMetadataUploadTimeout() {
return this.indexMetadataUploadTimeout;
}
Expand All @@ -676,6 +723,10 @@
return this.globalMetadataUploadTimeout;
}

public TimeValue getMetadataManifestUploadTimeout() {
return this.metadataManifestUploadTimeout;
}

static String getManifestFileName(long term, long version, boolean committed) {
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest/manifest__<inverted_term>__<inverted_version>__C/P__<inverted__timestamp>__<codec_version>
return String.join(
Expand Down Expand Up @@ -1085,29 +1136,15 @@
}

/**
* Exception for IndexMetadata transfer failures to remote
*/
static class IndexMetadataTransferException extends RuntimeException {

public IndexMetadataTransferException(String errorDesc) {
super(errorDesc);
}

public IndexMetadataTransferException(String errorDesc, Throwable cause) {
super(errorDesc, cause);
}
}

/**
* Exception for GlobalMetadata transfer failures to remote
* Exception for Remote state transfer.
*/
static class GlobalMetadataTransferException extends RuntimeException {
static class RemoteStateTransferException extends RuntimeException {

public GlobalMetadataTransferException(String errorDesc) {
public RemoteStateTransferException(String errorDesc) {
super(errorDesc);
}

public GlobalMetadataTransferException(String errorDesc, Throwable cause) {
public RemoteStateTransferException(String errorDesc, Throwable cause) {
super(errorDesc, cause);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;
Expand Down Expand Up @@ -230,10 +231,17 @@ public void testWriteFullMetadataInParallelSuccess() throws IOException {

ArgumentCaptor<ActionListener<Void>> actionListenerArgumentCaptor = ArgumentCaptor.forClass(ActionListener.class);
ArgumentCaptor<WriteContext> writeContextArgumentCaptor = ArgumentCaptor.forClass(WriteContext.class);

AtomicReference<WriteContext> capturedWriteContext = new AtomicReference<>();
doAnswer((i) -> {
actionListenerArgumentCaptor.getValue().onResponse(null);
return null;
}).doAnswer((i) -> {
actionListenerArgumentCaptor.getValue().onResponse(null);
capturedWriteContext.set(writeContextArgumentCaptor.getValue());
return null;
}).doAnswer((i) -> {
actionListenerArgumentCaptor.getValue().onResponse(null);
return null;
}).when(container).asyncBlobUpload(writeContextArgumentCaptor.capture(), actionListenerArgumentCaptor.capture());

remoteClusterStateService.start();
Expand Down Expand Up @@ -262,27 +270,30 @@ public void testWriteFullMetadataInParallelSuccess() throws IOException {
assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID()));
assertThat(manifest.getPreviousClusterUUID(), is(expectedManifest.getPreviousClusterUUID()));

assertEquals(actionListenerArgumentCaptor.getAllValues().size(), 2);
assertEquals(writeContextArgumentCaptor.getAllValues().size(), 2);
assertEquals(actionListenerArgumentCaptor.getAllValues().size(), 3);
assertEquals(writeContextArgumentCaptor.getAllValues().size(), 3);

WriteContext capturedWriteContext = writeContextArgumentCaptor.getValue();
byte[] writtenBytes = capturedWriteContext.getStreamProvider(Integer.MAX_VALUE).provideStream(0).getInputStream().readAllBytes();
byte[] writtenBytes = capturedWriteContext.get()
.getStreamProvider(Integer.MAX_VALUE)
.provideStream(0)
.getInputStream()
.readAllBytes();
IndexMetadata writtenIndexMetadata = RemoteClusterStateService.INDEX_METADATA_FORMAT.deserialize(
capturedWriteContext.getFileName(),
capturedWriteContext.get().getFileName(),
blobStoreRepository.getNamedXContentRegistry(),
new BytesArray(writtenBytes)
);

assertEquals(capturedWriteContext.getWritePriority(), WritePriority.URGENT);
assertEquals(capturedWriteContext.get().getWritePriority(), WritePriority.URGENT);
assertEquals(writtenIndexMetadata.getNumberOfShards(), 1);
assertEquals(writtenIndexMetadata.getNumberOfReplicas(), 0);
assertEquals(writtenIndexMetadata.getIndex().getName(), "test-index");
assertEquals(writtenIndexMetadata.getIndex().getUUID(), "index-uuid");
long expectedChecksum = RemoteTransferContainer.checksumOfChecksum(new ByteArrayIndexInput("metadata-filename", writtenBytes), 8);
if (capturedWriteContext.doRemoteDataIntegrityCheck()) {
assertEquals(capturedWriteContext.getExpectedChecksum().longValue(), expectedChecksum);
if (capturedWriteContext.get().doRemoteDataIntegrityCheck()) {
assertEquals(capturedWriteContext.get().getExpectedChecksum().longValue(), expectedChecksum);
} else {
assertEquals(capturedWriteContext.getExpectedChecksum(), null);
assertEquals(capturedWriteContext.get().getExpectedChecksum(), null);
}

}
Expand All @@ -306,11 +317,44 @@ public void run() {

remoteClusterStateService.start();
assertThrows(
RemoteClusterStateService.GlobalMetadataTransferException.class,
RemoteClusterStateService.RemoteStateTransferException.class,
() -> remoteClusterStateService.writeFullMetadata(clusterState, randomAlphaOfLength(10))
);
}

public void testTimeoutWhileWritingManifestFile() throws IOException {
// verify update metadata manifest upload timeout
int metadataManifestUploadTimeout = 2;
Settings newSettings = Settings.builder()
.put("cluster.remote_store.state.metadata_manifest.upload_timeout", metadataManifestUploadTimeout + "s")
.build();
clusterSettings.applySettings(newSettings);

final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();
AsyncMultiStreamBlobContainer container = (AsyncMultiStreamBlobContainer) mockBlobStoreObjects(AsyncMultiStreamBlobContainer.class);

ArgumentCaptor<ActionListener<Void>> actionListenerArgumentCaptor = ArgumentCaptor.forClass(ActionListener.class);

doAnswer((i) -> { // For Global Metadata
actionListenerArgumentCaptor.getValue().onResponse(null);
return null;
}).doAnswer((i) -> { // For Index Metadata
actionListenerArgumentCaptor.getValue().onResponse(null);
return null;
}).doAnswer((i) -> {
// For Manifest file perform No Op, so latch in code will timeout
return null;
}).when(container).asyncBlobUpload(any(WriteContext.class), actionListenerArgumentCaptor.capture());

remoteClusterStateService.start();
try {
remoteClusterStateService.writeFullMetadata(clusterState, randomAlphaOfLength(10));
} catch (Exception e) {
assertTrue(e instanceof RemoteClusterStateService.RemoteStateTransferException);
assertTrue(e.getMessage().contains("Timed out waiting for transfer of manifest file to complete"));
}
}

public void testWriteFullMetadataInParallelFailureForIndexMetadata() throws IOException {
final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();
AsyncMultiStreamBlobContainer container = (AsyncMultiStreamBlobContainer) mockBlobStoreObjects(AsyncMultiStreamBlobContainer.class);
Expand All @@ -327,7 +371,7 @@ public void testWriteFullMetadataInParallelFailureForIndexMetadata() throws IOEx

remoteClusterStateService.start();
assertThrows(
RemoteClusterStateService.IndexMetadataTransferException.class,
RemoteClusterStateService.RemoteStateTransferException.class,
() -> remoteClusterStateService.writeFullMetadata(clusterState, randomAlphaOfLength(10))
);
assertEquals(0, remoteClusterStateService.getStats().getSuccessCount());
Expand Down Expand Up @@ -1128,6 +1172,22 @@ public void testIndexMetadataUploadWaitTimeSetting() {
assertEquals(indexMetadataUploadTimeout, remoteClusterStateService.getIndexMetadataUploadTimeout().seconds());
}

public void testMetadataManifestUploadWaitTimeSetting() {
// verify default value
assertEquals(
RemoteClusterStateService.METADATA_MANIFEST_UPLOAD_TIMEOUT_DEFAULT,
remoteClusterStateService.getMetadataManifestUploadTimeout()
);

// verify update metadata manifest upload timeout
int metadataManifestUploadTimeout = randomIntBetween(1, 10);
Settings newSettings = Settings.builder()
.put("cluster.remote_store.state.metadata_manifest.upload_timeout", metadataManifestUploadTimeout + "s")
.build();
clusterSettings.applySettings(newSettings);
assertEquals(metadataManifestUploadTimeout, remoteClusterStateService.getMetadataManifestUploadTimeout().seconds());
}

public void testGlobalMetadataUploadWaitTimeSetting() {
// verify default value
assertEquals(
Expand Down
Loading