Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use async write for manifest file and use latch for timeout #10968

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,7 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING,
RemoteClusterStateService.INDEX_METADATA_UPLOAD_TIMEOUT_SETTING,
RemoteClusterStateService.GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING,
RemoteClusterStateService.METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING,
RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING,
IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ public class RemoteClusterStateService implements Closeable {

public static final TimeValue GLOBAL_METADATA_UPLOAD_TIMEOUT_DEFAULT = TimeValue.timeValueMillis(20000);

public static final TimeValue METADATA_MANIFEST_UPLOAD_TIMEOUT_DEFAULT = TimeValue.timeValueMillis(20000);

public static final Setting<TimeValue> INDEX_METADATA_UPLOAD_TIMEOUT_SETTING = Setting.timeSetting(
"cluster.remote_store.state.index_metadata.upload_timeout",
INDEX_METADATA_UPLOAD_TIMEOUT_DEFAULT,
Expand All @@ -101,6 +103,13 @@ public class RemoteClusterStateService implements Closeable {
Setting.Property.NodeScope
);

public static final Setting<TimeValue> METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING = Setting.timeSetting(
"cluster.remote_store.state.metadata_manifest.upload_timeout",
METADATA_MANIFEST_UPLOAD_TIMEOUT_DEFAULT,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

public static final ChecksumBlobStoreFormat<IndexMetadata> INDEX_METADATA_FORMAT = new ChecksumBlobStoreFormat<>(
"index-metadata",
METADATA_NAME_FORMAT,
Expand Down Expand Up @@ -157,6 +166,7 @@ public class RemoteClusterStateService implements Closeable {

private volatile TimeValue indexMetadataUploadTimeout;
private volatile TimeValue globalMetadataUploadTimeout;
private volatile TimeValue metadataManifestUploadTimeout;

private final AtomicBoolean deleteStaleMetadataRunning = new AtomicBoolean(false);
private final RemotePersistenceStats remoteStateStats;
Expand Down Expand Up @@ -190,9 +200,11 @@ public RemoteClusterStateService(
this.slowWriteLoggingThreshold = clusterSettings.get(SLOW_WRITE_LOGGING_THRESHOLD);
this.indexMetadataUploadTimeout = clusterSettings.get(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING);
this.globalMetadataUploadTimeout = clusterSettings.get(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING);
this.metadataManifestUploadTimeout = clusterSettings.get(METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING);
clusterSettings.addSettingsUpdateConsumer(SLOW_WRITE_LOGGING_THRESHOLD, this::setSlowWriteLoggingThreshold);
clusterSettings.addSettingsUpdateConsumer(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING, this::setIndexMetadataUploadTimeout);
clusterSettings.addSettingsUpdateConsumer(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, this::setGlobalMetadataUploadTimeout);
clusterSettings.addSettingsUpdateConsumer(METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING, this::setMetadataManifestUploadTimeout);
this.remoteStateStats = new RemotePersistenceStats();
}

Expand Down Expand Up @@ -601,14 +613,45 @@ private ClusterMetadataManifest uploadManifest(

private void writeMetadataManifest(String clusterName, String clusterUUID, ClusterMetadataManifest uploadManifest, String fileName)
throws IOException {
AtomicReference<String> result = new AtomicReference<String>();
AtomicReference<Exception> exceptionReference = new AtomicReference<Exception>();

final BlobContainer metadataManifestContainer = manifestContainer(clusterName, clusterUUID);
CLUSTER_METADATA_MANIFEST_FORMAT.write(

// latch to wait until upload is not finished
CountDownLatch latch = new CountDownLatch(1);

LatchedActionListener completionListener = new LatchedActionListener<>(ActionListener.wrap(resp -> {
// no op on response
dhwanilpatel marked this conversation as resolved.
Show resolved Hide resolved
}, ex -> { exceptionReference.set(ex); }), latch);

CLUSTER_METADATA_MANIFEST_FORMAT.writeAsyncWithUrgentPriority(
uploadManifest,
metadataManifestContainer,
fileName,
blobStoreRepository.getCompressor(),
completionListener,
FORMAT_PARAMS
);

try {
if (latch.await(getMetadataManifestUploadTimeout().millis(), TimeUnit.MILLISECONDS) == false) {
MetadataManifestTransferException ex = new MetadataManifestTransferException(
String.format(Locale.ROOT, "Timed out waiting for transfer of manifest file to complete")
);
throw ex;
}
} catch (InterruptedException ex) {
MetadataManifestTransferException exception = new MetadataManifestTransferException(
dhwanilpatel marked this conversation as resolved.
Show resolved Hide resolved
String.format(Locale.ROOT, "Timed out waiting for transfer of manifest file to complete - %s"),
ex
);
Thread.currentThread().interrupt();
throw exception;
}
if (exceptionReference.get() != null) {
throw new MetadataManifestTransferException(exceptionReference.get().getMessage(), exceptionReference.get());
}
logger.debug(
"Metadata manifest file [{}] written during [{}] phase. ",
fileName,
Expand Down Expand Up @@ -668,6 +711,10 @@ private void setGlobalMetadataUploadTimeout(TimeValue newGlobalMetadataUploadTim
this.globalMetadataUploadTimeout = newGlobalMetadataUploadTimeout;
}

private void setMetadataManifestUploadTimeout(TimeValue newMetadataManifestUploadTimeout) {
this.metadataManifestUploadTimeout = newMetadataManifestUploadTimeout;
}

public TimeValue getIndexMetadataUploadTimeout() {
return this.indexMetadataUploadTimeout;
}
Expand All @@ -676,6 +723,10 @@ public TimeValue getGlobalMetadataUploadTimeout() {
return this.globalMetadataUploadTimeout;
}

public TimeValue getMetadataManifestUploadTimeout() {
return this.metadataManifestUploadTimeout;
}

static String getManifestFileName(long term, long version, boolean committed) {
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest/manifest__<inverted_term>__<inverted_version>__C/P__<inverted__timestamp>__<codec_version>
return String.join(
Expand Down Expand Up @@ -1112,6 +1163,20 @@ public GlobalMetadataTransferException(String errorDesc, Throwable cause) {
}
}

/**
* Exception for metadata manifest transfer failures to remote
*/
static class MetadataManifestTransferException extends RuntimeException {
dhwanilpatel marked this conversation as resolved.
Show resolved Hide resolved

public MetadataManifestTransferException(String errorDesc) {
super(errorDesc);
}

public MetadataManifestTransferException(String errorDesc, Throwable cause) {
super(errorDesc, cause);
}
}

/**
* Purges all remote cluster state against provided cluster UUIDs
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;
Expand Down Expand Up @@ -230,10 +231,17 @@ public void testWriteFullMetadataInParallelSuccess() throws IOException {

ArgumentCaptor<ActionListener<Void>> actionListenerArgumentCaptor = ArgumentCaptor.forClass(ActionListener.class);
ArgumentCaptor<WriteContext> writeContextArgumentCaptor = ArgumentCaptor.forClass(WriteContext.class);

AtomicReference<WriteContext> capturedWriteContext = new AtomicReference<>();
doAnswer((i) -> {
actionListenerArgumentCaptor.getValue().onResponse(null);
return null;
}).doAnswer((i) -> {
actionListenerArgumentCaptor.getValue().onResponse(null);
capturedWriteContext.set(writeContextArgumentCaptor.getValue());
return null;
}).doAnswer((i) -> {
actionListenerArgumentCaptor.getValue().onResponse(null);
return null;
}).when(container).asyncBlobUpload(writeContextArgumentCaptor.capture(), actionListenerArgumentCaptor.capture());

remoteClusterStateService.start();
Expand Down Expand Up @@ -262,27 +270,30 @@ public void testWriteFullMetadataInParallelSuccess() throws IOException {
assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID()));
assertThat(manifest.getPreviousClusterUUID(), is(expectedManifest.getPreviousClusterUUID()));

assertEquals(actionListenerArgumentCaptor.getAllValues().size(), 2);
assertEquals(writeContextArgumentCaptor.getAllValues().size(), 2);
assertEquals(actionListenerArgumentCaptor.getAllValues().size(), 3);
assertEquals(writeContextArgumentCaptor.getAllValues().size(), 3);

WriteContext capturedWriteContext = writeContextArgumentCaptor.getValue();
byte[] writtenBytes = capturedWriteContext.getStreamProvider(Integer.MAX_VALUE).provideStream(0).getInputStream().readAllBytes();
byte[] writtenBytes = capturedWriteContext.get()
.getStreamProvider(Integer.MAX_VALUE)
.provideStream(0)
.getInputStream()
.readAllBytes();
IndexMetadata writtenIndexMetadata = RemoteClusterStateService.INDEX_METADATA_FORMAT.deserialize(
capturedWriteContext.getFileName(),
capturedWriteContext.get().getFileName(),
blobStoreRepository.getNamedXContentRegistry(),
new BytesArray(writtenBytes)
);

assertEquals(capturedWriteContext.getWritePriority(), WritePriority.URGENT);
assertEquals(capturedWriteContext.get().getWritePriority(), WritePriority.URGENT);
assertEquals(writtenIndexMetadata.getNumberOfShards(), 1);
assertEquals(writtenIndexMetadata.getNumberOfReplicas(), 0);
assertEquals(writtenIndexMetadata.getIndex().getName(), "test-index");
assertEquals(writtenIndexMetadata.getIndex().getUUID(), "index-uuid");
long expectedChecksum = RemoteTransferContainer.checksumOfChecksum(new ByteArrayIndexInput("metadata-filename", writtenBytes), 8);
if (capturedWriteContext.doRemoteDataIntegrityCheck()) {
assertEquals(capturedWriteContext.getExpectedChecksum().longValue(), expectedChecksum);
if (capturedWriteContext.get().doRemoteDataIntegrityCheck()) {
assertEquals(capturedWriteContext.get().getExpectedChecksum().longValue(), expectedChecksum);
} else {
assertEquals(capturedWriteContext.getExpectedChecksum(), null);
assertEquals(capturedWriteContext.get().getExpectedChecksum(), null);
}

}
Expand Down Expand Up @@ -311,6 +322,30 @@ public void run() {
);
}

public void testTimeoutWhileWritingManifestFile() throws IOException {
final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();
AsyncMultiStreamBlobContainer container = (AsyncMultiStreamBlobContainer) mockBlobStoreObjects(AsyncMultiStreamBlobContainer.class);

ArgumentCaptor<ActionListener<Void>> actionListenerArgumentCaptor = ArgumentCaptor.forClass(ActionListener.class);

doAnswer((i) -> { // For Global Metadata
actionListenerArgumentCaptor.getValue().onResponse(null);
return null;
}).doAnswer((i) -> { // For Index Metadata
actionListenerArgumentCaptor.getValue().onResponse(null);
return null;
}).doAnswer((i) -> {
// For Manifest file perform No Op, so latch in code will timeout
return null;
}).when(container).asyncBlobUpload(any(WriteContext.class), actionListenerArgumentCaptor.capture());

remoteClusterStateService.start();
assertThrows(
dhwanilpatel marked this conversation as resolved.
Show resolved Hide resolved
RemoteClusterStateService.MetadataManifestTransferException.class,
() -> remoteClusterStateService.writeFullMetadata(clusterState, randomAlphaOfLength(10))
);
}

public void testWriteFullMetadataInParallelFailureForIndexMetadata() throws IOException {
final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();
AsyncMultiStreamBlobContainer container = (AsyncMultiStreamBlobContainer) mockBlobStoreObjects(AsyncMultiStreamBlobContainer.class);
Expand Down Expand Up @@ -1128,6 +1163,22 @@ public void testIndexMetadataUploadWaitTimeSetting() {
assertEquals(indexMetadataUploadTimeout, remoteClusterStateService.getIndexMetadataUploadTimeout().seconds());
}

public void testMetadataManifestUploadWaitTimeSetting() {
// verify default value
assertEquals(
RemoteClusterStateService.METADATA_MANIFEST_UPLOAD_TIMEOUT_DEFAULT,
remoteClusterStateService.getMetadataManifestUploadTimeout()
);

// verify update metadata manifest upload timeout
int metadataManifestUploadTimeout = randomIntBetween(1, 10);
Settings newSettings = Settings.builder()
.put("cluster.remote_store.state.metadata_manifest.upload_timeout", metadataManifestUploadTimeout + "s")
.build();
clusterSettings.applySettings(newSettings);
assertEquals(metadataManifestUploadTimeout, remoteClusterStateService.getMetadataManifestUploadTimeout().seconds());
}

public void testGlobalMetadataUploadWaitTimeSetting() {
// verify default value
assertEquals(
Expand Down
Loading