From 10ec23a42cc009a877f39b3176eaa273e8296368 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Fri, 23 Feb 2024 19:51:04 +1100 Subject: [PATCH] Add metrics for retries by S3RetryingInputStream (#105600) This PR exposes retries in S3RetryingInputStream as metrics for easier observability. At the class API level, retries can happen when either opening an input stream and reading from an input stream. Retry reading from an input stream internally can also retry re-opening the input stream. All these retries are counted under the retries for reading since the higher API usage is a read instead of open. The list of new metrics are: * `es.repositories.s3.input_stream.retry.event.total` - Number of times a retry cycle has been triggered. * `es.repositories.s3.input_stream.retry.success.total` - Number of a times a retry cycle has been successfully completed. This should match the above metric in numbers. Otherwise it indicates there are threads stuck in infinite retries. * `es.repositories.s3.input_stream.retry.attempts.histogram` - Number of attempts to complete a retry cycle successfully. Relates: https://github.com/elastic/elasticsearch/pull/103300#discussion_r1444125047 Relates: ES-7666 --- .../repositories/azure/AzureRepository.java | 4 +- .../gcs/GoogleCloudStorageRepository.java | 4 +- .../s3/S3BlobStoreRepositoryTests.java | 5 +- .../s3/S3RepositoryThirdPartyTests.java | 3 +- .../repositories/s3/S3BlobStore.java | 31 ++++-- .../s3/S3RepositoriesMetrics.java | 37 +++++++ .../repositories/s3/S3Repository.java | 11 +- .../repositories/s3/S3RepositoryPlugin.java | 9 +- .../s3/S3RetryingInputStream.java | 39 ++++++- .../s3/RepositoryCredentialsTests.java | 5 +- .../s3/S3BlobContainerRetriesTests.java | 103 +++++++++++++++++- .../repositories/s3/S3RepositoryTests.java | 3 +- .../repositories/RepositoriesMetrics.java | 2 + .../blobstore/MeteredBlobStoreRepository.java | 6 +- .../RepositoriesServiceTests.java | 6 +- .../telemetry/RecordingMeterRegistry.java | 2 +- 16 files changed, 216 insertions(+), 54 deletions(-) create mode 100644 modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoriesMetrics.java diff --git a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java index f58611cb0567a..388474acc75ea 100644 --- a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java +++ b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java @@ -21,7 +21,6 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.indices.recovery.RecoverySettings; -import org.elasticsearch.repositories.RepositoriesMetrics; import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository; import org.elasticsearch.xcontent.NamedXContentRegistry; @@ -108,8 +107,7 @@ public AzureRepository( bigArrays, recoverySettings, buildBasePath(metadata), - buildLocation(metadata), - RepositoriesMetrics.NOOP + buildLocation(metadata) ); this.chunkSize = Repository.CHUNK_SIZE_SETTING.get(metadata.settings()); this.storageService = storageService; diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java index 94d0abe17909f..e2338371cf837 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java @@ -19,7 +19,6 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.indices.recovery.RecoverySettings; -import org.elasticsearch.repositories.RepositoriesMetrics; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository; import org.elasticsearch.xcontent.NamedXContentRegistry; @@ -77,8 +76,7 @@ class GoogleCloudStorageRepository extends MeteredBlobStoreRepository { bigArrays, recoverySettings, buildBasePath(metadata), - buildLocation(metadata), - RepositoriesMetrics.NOOP + buildLocation(metadata) ); this.storageService = storageService; diff --git a/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java b/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java index 248ccc119794e..4080a47c7dabe 100644 --- a/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java +++ b/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java @@ -39,7 +39,6 @@ import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.PluginsService; -import org.elasticsearch.repositories.RepositoriesMetrics; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryData; @@ -460,9 +459,9 @@ protected S3Repository createRepository( ClusterService clusterService, BigArrays bigArrays, RecoverySettings recoverySettings, - RepositoriesMetrics repositoriesMetrics + S3RepositoriesMetrics s3RepositoriesMetrics ) { - return new S3Repository(metadata, registry, getService(), clusterService, bigArrays, recoverySettings, repositoriesMetrics) { + return new S3Repository(metadata, registry, getService(), clusterService, bigArrays, recoverySettings, s3RepositoriesMetrics) { @Override public BlobStore blobStore() { diff --git a/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java b/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java index f182b54b0c696..b8fea485c6276 100644 --- a/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java +++ b/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java @@ -30,7 +30,6 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.repositories.AbstractThirdPartyRepositoryTestCase; -import org.elasticsearch.repositories.RepositoriesMetrics; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.fixtures.minio.MinioTestContainer; @@ -145,7 +144,7 @@ public long absoluteTimeInMillis() { ClusterServiceUtils.createClusterService(threadpool), BigArrays.NON_RECYCLING_INSTANCE, new RecoverySettings(node().settings(), node().injector().getInstance(ClusterService.class).getClusterSettings()), - RepositoriesMetrics.NOOP + S3RepositoriesMetrics.NOOP ) ) { repository.start(); diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java index 78b1e2dba98b3..6b9937b01a433 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java @@ -33,7 +33,6 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.core.TimeValue; -import org.elasticsearch.repositories.RepositoriesMetrics; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; @@ -84,7 +83,7 @@ class S3BlobStore implements BlobStore { private final ThreadPool threadPool; private final Executor snapshotExecutor; - private final RepositoriesMetrics repositoriesMetrics; + private final S3RepositoriesMetrics s3RepositoriesMetrics; private final StatsCollectors statsCollectors = new StatsCollectors(); @@ -98,7 +97,7 @@ class S3BlobStore implements BlobStore { RepositoryMetadata repositoryMetadata, BigArrays bigArrays, ThreadPool threadPool, - RepositoriesMetrics repositoriesMetrics + S3RepositoriesMetrics s3RepositoriesMetrics ) { this.service = service; this.bigArrays = bigArrays; @@ -110,7 +109,7 @@ class S3BlobStore implements BlobStore { this.repositoryMetadata = repositoryMetadata; this.threadPool = threadPool; this.snapshotExecutor = threadPool.executor(ThreadPool.Names.SNAPSHOT); - this.repositoriesMetrics = repositoriesMetrics; + this.s3RepositoriesMetrics = s3RepositoriesMetrics; } RequestMetricCollector getMetricCollector(Operation operation, OperationPurpose purpose) { @@ -174,19 +173,19 @@ public final void collectMetrics(Request request, Response response) { .map(List::size) .orElse(0); - repositoriesMetrics.operationCounter().incrementBy(1, attributes); + s3RepositoriesMetrics.common().operationCounter().incrementBy(1, attributes); if (numberOfAwsErrors == requestCount) { - repositoriesMetrics.unsuccessfulOperationCounter().incrementBy(1, attributes); + s3RepositoriesMetrics.common().unsuccessfulOperationCounter().incrementBy(1, attributes); } - repositoriesMetrics.requestCounter().incrementBy(requestCount, attributes); + s3RepositoriesMetrics.common().requestCounter().incrementBy(requestCount, attributes); if (exceptionCount > 0) { - repositoriesMetrics.exceptionCounter().incrementBy(exceptionCount, attributes); - repositoriesMetrics.exceptionHistogram().record(exceptionCount, attributes); + s3RepositoriesMetrics.common().exceptionCounter().incrementBy(exceptionCount, attributes); + s3RepositoriesMetrics.common().exceptionHistogram().record(exceptionCount, attributes); } if (throttleCount > 0) { - repositoriesMetrics.throttleCounter().incrementBy(throttleCount, attributes); - repositoriesMetrics.throttleHistogram().record(throttleCount, attributes); + s3RepositoriesMetrics.common().throttleCounter().incrementBy(throttleCount, attributes); + s3RepositoriesMetrics.common().throttleHistogram().record(throttleCount, attributes); } maybeRecordHttpRequestTime(request); } @@ -207,7 +206,7 @@ private void maybeRecordHttpRequestTime(Request request) { if (totalTimeInMicros == 0) { logger.warn("Expected HttpRequestTime to be tracked for request [{}] but found no count.", request); } else { - repositoriesMetrics.httpRequestTimeInMicroHistogram().record(totalTimeInMicros, attributes); + s3RepositoriesMetrics.common().httpRequestTimeInMicroHistogram().record(totalTimeInMicros, attributes); } } @@ -293,6 +292,14 @@ public long bufferSizeInBytes() { return bufferSize.getBytes(); } + public RepositoryMetadata getRepositoryMetadata() { + return repositoryMetadata; + } + + public S3RepositoriesMetrics getS3RepositoriesMetrics() { + return s3RepositoriesMetrics; + } + @Override public BlobContainer blobContainer(BlobPath path) { return new S3BlobContainer(path, this); diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoriesMetrics.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoriesMetrics.java new file mode 100644 index 0000000000000..e025214998d5b --- /dev/null +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoriesMetrics.java @@ -0,0 +1,37 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.repositories.s3; + +import org.elasticsearch.repositories.RepositoriesMetrics; +import org.elasticsearch.telemetry.metric.LongCounter; +import org.elasticsearch.telemetry.metric.LongHistogram; + +public record S3RepositoriesMetrics( + RepositoriesMetrics common, + LongCounter retryStartedCounter, + LongCounter retryCompletedCounter, + LongHistogram retryHistogram +) { + + public static S3RepositoriesMetrics NOOP = new S3RepositoriesMetrics(RepositoriesMetrics.NOOP); + + public static final String METRIC_RETRY_EVENT_TOTAL = "es.repositories.s3.input_stream.retry.event.total"; + public static final String METRIC_RETRY_SUCCESS_TOTAL = "es.repositories.s3.input_stream.retry.success.total"; + public static final String METRIC_RETRY_ATTEMPTS_HISTOGRAM = "es.repositories.s3.input_stream.retry.attempts.histogram"; + + public S3RepositoriesMetrics(RepositoriesMetrics common) { + this( + common, + common.meterRegistry().registerLongCounter(METRIC_RETRY_EVENT_TOTAL, "s3 input stream retry event count", "unit"), + common.meterRegistry().registerLongCounter(METRIC_RETRY_SUCCESS_TOTAL, "s3 input stream retry success count", "unit"), + common.meterRegistry() + .registerLongHistogram(METRIC_RETRY_ATTEMPTS_HISTOGRAM, "s3 input stream retry attempts histogram", "unit") + ); + } +} diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java index 624867a2f0c41..26b1b1158dea0 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java @@ -31,7 +31,6 @@ import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.repositories.FinalizeSnapshotContext; -import org.elasticsearch.repositories.RepositoriesMetrics; import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository; @@ -195,6 +194,8 @@ class S3Repository extends MeteredBlobStoreRepository { private final Executor snapshotExecutor; + private final S3RepositoriesMetrics s3RepositoriesMetrics; + /** * Constructs an s3 backed repository */ @@ -205,7 +206,7 @@ class S3Repository extends MeteredBlobStoreRepository { final ClusterService clusterService, final BigArrays bigArrays, final RecoverySettings recoverySettings, - final RepositoriesMetrics repositoriesMetrics + final S3RepositoriesMetrics s3RepositoriesMetrics ) { super( metadata, @@ -214,10 +215,10 @@ class S3Repository extends MeteredBlobStoreRepository { bigArrays, recoverySettings, buildBasePath(metadata), - buildLocation(metadata), - repositoriesMetrics + buildLocation(metadata) ); this.service = service; + this.s3RepositoriesMetrics = s3RepositoriesMetrics; this.snapshotExecutor = threadPool().executor(ThreadPool.Names.SNAPSHOT); // Parse and validate the user's S3 Storage Class setting @@ -408,7 +409,7 @@ protected S3BlobStore createBlobStore() { metadata, bigArrays, threadPool, - repositoriesMetrics + s3RepositoriesMetrics ); } diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java index 83668cc271922..26047c3b416a7 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java @@ -78,9 +78,9 @@ protected S3Repository createRepository( final ClusterService clusterService, final BigArrays bigArrays, final RecoverySettings recoverySettings, - final RepositoriesMetrics repositoriesMetrics + final S3RepositoriesMetrics s3RepositoriesMetrics ) { - return new S3Repository(metadata, registry, service.get(), clusterService, bigArrays, recoverySettings, repositoriesMetrics); + return new S3Repository(metadata, registry, service.get(), clusterService, bigArrays, recoverySettings, s3RepositoriesMetrics); } @Override @@ -101,11 +101,12 @@ public Map getRepositories( final ClusterService clusterService, final BigArrays bigArrays, final RecoverySettings recoverySettings, - RepositoriesMetrics repositoriesMetrics + final RepositoriesMetrics repositoriesMetrics ) { + final S3RepositoriesMetrics s3RepositoriesMetrics = new S3RepositoriesMetrics(repositoriesMetrics); return Collections.singletonMap( S3Repository.TYPE, - metadata -> createRepository(metadata, registry, clusterService, bigArrays, recoverySettings, repositoriesMetrics) + metadata -> createRepository(metadata, registry, clusterService, bigArrays, recoverySettings, s3RepositoriesMetrics) ); } diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java index c457b9d51e8b9..f7a99a399f59f 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java @@ -27,6 +27,7 @@ import java.nio.file.NoSuchFileException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import static org.elasticsearch.core.Strings.format; import static org.elasticsearch.repositories.s3.S3BlobStore.configureRequestForMetrics; @@ -80,7 +81,7 @@ class S3RetryingInputStream extends InputStream { this.end = end; final int initialAttempt = attempt; openStreamWithRetry(); - maybeLogForSuccessAfterRetries(initialAttempt, "opened"); + maybeLogAndRecordMetricsForSuccess(initialAttempt, "open"); } private void openStreamWithRetry() throws IOException { @@ -105,6 +106,9 @@ private void openStreamWithRetry() throws IOException { ); } + if (attempt == 1) { + blobStore.getS3RepositoriesMetrics().retryStartedCounter().incrementBy(1, metricAttributes("open")); + } final long delayInMillis = maybeLogAndComputeRetryDelay("opening", e); delayBeforeRetry(delayInMillis); } @@ -142,9 +146,12 @@ public int read() throws IOException { } else { currentOffset += 1; } - maybeLogForSuccessAfterRetries(initialAttempt, "read"); + maybeLogAndRecordMetricsForSuccess(initialAttempt, "read"); return result; } catch (IOException e) { + if (attempt == initialAttempt) { + blobStore.getS3RepositoriesMetrics().retryStartedCounter().incrementBy(1, metricAttributes("read")); + } reopenStreamOrFail(e); } } @@ -162,9 +169,12 @@ public int read(byte[] b, int off, int len) throws IOException { } else { currentOffset += bytesRead; } - maybeLogForSuccessAfterRetries(initialAttempt, "read"); + maybeLogAndRecordMetricsForSuccess(initialAttempt, "read"); return bytesRead; } catch (IOException e) { + if (attempt == initialAttempt) { + blobStore.getS3RepositoriesMetrics().retryStartedCounter().incrementBy(1, metricAttributes("read")); + } reopenStreamOrFail(e); } } @@ -246,16 +256,20 @@ private void logForRetry(Level level, String action, Exception e) { ); } - private void maybeLogForSuccessAfterRetries(int initialAttempt, String action) { + private void maybeLogAndRecordMetricsForSuccess(int initialAttempt, String action) { if (attempt > initialAttempt) { + final int numberOfRetries = attempt - initialAttempt; logger.info( "successfully {} input stream for [{}/{}] with purpose [{}] after [{}] retries", action, blobStore.bucket(), blobKey, purpose.getKey(), - attempt - initialAttempt + numberOfRetries ); + final Map attributes = metricAttributes(action); + blobStore.getS3RepositoriesMetrics().retryCompletedCounter().incrementBy(1, attributes); + blobStore.getS3RepositoriesMetrics().retryHistogram().record(numberOfRetries, attributes); } } @@ -294,6 +308,21 @@ protected long getRetryDelayInMillis() { return 10L << (Math.min(attempt - 1, 10)); } + private Map metricAttributes(String action) { + return Map.of( + "repo_type", + S3Repository.TYPE, + "repo_name", + blobStore.getRepositoryMetadata().name(), + "operation", + Operation.GET_OBJECT.getKey(), + "purpose", + purpose.getKey(), + "action", + action + ); + } + @Override public void close() throws IOException { maybeAbort(currentStream); diff --git a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java index 28a48c2968f59..cf3bc21526bf6 100644 --- a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java +++ b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java @@ -26,7 +26,6 @@ import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.PluginsService; -import org.elasticsearch.repositories.RepositoriesMetrics; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.rest.AbstractRestChannel; import org.elasticsearch.rest.RestRequest; @@ -264,9 +263,9 @@ protected S3Repository createRepository( ClusterService clusterService, BigArrays bigArrays, RecoverySettings recoverySettings, - RepositoriesMetrics repositoriesMetrics + S3RepositoriesMetrics s3RepositoriesMetrics ) { - return new S3Repository(metadata, registry, getService(), clusterService, bigArrays, recoverySettings, repositoriesMetrics) { + return new S3Repository(metadata, registry, getService(), clusterService, bigArrays, recoverySettings, s3RepositoriesMetrics) { @Override protected void assertSnapshotOrGenericThread() { // eliminate thread name check as we create repo manually on test/main threads diff --git a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java index 0ddd29171b3bd..05268d750637c 100644 --- a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -43,6 +43,9 @@ import org.elasticsearch.repositories.RepositoriesMetrics; import org.elasticsearch.repositories.blobstore.AbstractBlobContainerRetriesTestCase; import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; +import org.elasticsearch.telemetry.InstrumentType; +import org.elasticsearch.telemetry.Measurement; +import org.elasticsearch.telemetry.RecordingMeterRegistry; import org.elasticsearch.watcher.ResourceWatcherService; import org.hamcrest.Matcher; import org.junit.After; @@ -59,7 +62,9 @@ import java.nio.charset.StandardCharsets; import java.nio.file.NoSuchFileException; import java.util.Arrays; +import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Objects; import java.util.OptionalInt; import java.util.concurrent.atomic.AtomicBoolean; @@ -74,10 +79,13 @@ import static org.elasticsearch.repositories.s3.S3ClientSettings.READ_TIMEOUT_SETTING; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; @@ -91,6 +99,7 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes private S3Service service; private AtomicBoolean shouldErrorOnDns; + private RecordingMeterRegistry recordingMeterRegistry; @Before public void setUp() throws Exception { @@ -109,6 +118,7 @@ protected AmazonS3ClientBuilder buildClientBuilder(S3ClientSettings clientSettin return builder; } }; + recordingMeterRegistry = new RecordingMeterRegistry(); super.setUp(); } @@ -185,7 +195,7 @@ protected BlobContainer createBlobContainer( repositoryMetadata, BigArrays.NON_RECYCLING_INSTANCE, new DeterministicTaskQueue().getThreadPool(), - RepositoriesMetrics.NOOP + new S3RepositoriesMetrics(new RepositoriesMetrics(recordingMeterRegistry)) ); return new S3BlobContainer(randomBoolean() ? BlobPath.EMPTY : BlobPath.EMPTY.add("foo"), s3BlobStore) { @Override @@ -669,8 +679,8 @@ public void handle(HttpExchange exchange) throws IOException { } exchange.getResponseBody().write(bytes, rangeStart, length); } else { - failures.incrementAndGet(); if (randomBoolean()) { + failures.incrementAndGet(); exchange.sendResponseHeaders( randomFrom( HttpStatus.SC_INTERNAL_SERVER_ERROR, @@ -686,6 +696,8 @@ public void handle(HttpExchange exchange) throws IOException { if (bytesSent >= meaningfulProgressBytes) { exchange.getResponseBody().flush(); } + } else { + failures.incrementAndGet(); } } } @@ -700,16 +712,28 @@ public void handle(HttpExchange exchange) throws IOException { final int length = between(0, randomBoolean() ? bytes.length : Integer.MAX_VALUE); logger.info("--> position={}, length={}", position, length); try (InputStream inputStream = blobContainer.readBlob(OperationPurpose.INDICES, "read_blob_retries_forever", position, length)) { + assertMetricsForOpeningStream(); + recordingMeterRegistry.getRecorder().resetCalls(); + failures.set(0); + final byte[] bytesRead = BytesReference.toBytes(Streams.readFully(inputStream)); assertArrayEquals(Arrays.copyOfRange(bytes, position, Math.min(bytes.length, position + length)), bytesRead); + assertMetricsForReadingStream(); } assertThat(failures.get(), greaterThan(totalFailures)); // Read the whole blob failures.set(0); + recordingMeterRegistry.getRecorder().resetCalls(); try (InputStream inputStream = blobContainer.readBlob(OperationPurpose.INDICES, "read_blob_retries_forever")) { + assertMetricsForOpeningStream(); + recordingMeterRegistry.getRecorder().resetCalls(); + failures.set(0); + final byte[] bytesRead = BytesReference.toBytes(Streams.readFully(inputStream)); assertArrayEquals(bytes, bytesRead); + + assertMetricsForReadingStream(); } assertThat(failures.get(), greaterThan(totalFailures)); } @@ -737,9 +761,13 @@ public void handle(HttpExchange exchange) throws IOException { : blobContainer.readBlob(randomRetryingPurpose(), "read_blob_not_found", between(0, 100), between(1, 100)) ) { Streams.readFully(inputStream); + } }); assertThat(numberOfReads.get(), equalTo(1)); + assertThat(getRetryStartedMeasurements(), empty()); + assertThat(getRetryCompletedMeasurements(), empty()); + assertThat(getRetryHistogramMeasurements(), empty()); } @Override @@ -761,6 +789,77 @@ protected OperationPurpose randomFiniteRetryingPurpose() { ); } + private void assertMetricsForOpeningStream() { + final long numberOfOperations = getOperationMeasurements(); + // S3 client sdk internally also retries within the configured maxRetries for retryable errors. + // The retries in S3RetryingInputStream are triggered when the client internal retries are unsuccessful + if (numberOfOperations > 1) { + // For opening the stream, there should be exactly one pair of started and completed records. + // There should be one histogram record, the number of retries must be greater than 0 + final Map attributes = metricAttributes("open"); + assertThat(getRetryStartedMeasurements(), contains(new Measurement(1L, attributes, false))); + assertThat(getRetryCompletedMeasurements(), contains(new Measurement(1L, attributes, false))); + final List retryHistogramMeasurements = getRetryHistogramMeasurements(); + assertThat(retryHistogramMeasurements, hasSize(1)); + assertThat(retryHistogramMeasurements.get(0).getLong(), equalTo(numberOfOperations - 1)); + assertThat(retryHistogramMeasurements.get(0).attributes(), equalTo(attributes)); + } else { + assertThat(getRetryStartedMeasurements(), empty()); + assertThat(getRetryCompletedMeasurements(), empty()); + assertThat(getRetryHistogramMeasurements(), empty()); + } + } + + private void assertMetricsForReadingStream() { + // For reading the stream, there could be multiple pairs of started and completed records. + // It is important that they always come in pairs and the number of pairs match the number + // of histogram records. + final Map attributes = metricAttributes("read"); + final List retryHistogramMeasurements = getRetryHistogramMeasurements(); + final int numberOfReads = retryHistogramMeasurements.size(); + retryHistogramMeasurements.forEach(measurement -> { + assertThat(measurement.getLong(), greaterThan(0L)); + assertThat(measurement.attributes(), equalTo(attributes)); + }); + + final List retryStartedMeasurements = getRetryStartedMeasurements(); + assertThat(retryStartedMeasurements, hasSize(1)); + assertThat(retryStartedMeasurements.get(0).getLong(), equalTo((long) numberOfReads)); + assertThat(retryStartedMeasurements.get(0).attributes(), equalTo(attributes)); + assertThat(retryStartedMeasurements, equalTo(getRetryCompletedMeasurements())); + } + + private long getOperationMeasurements() { + final List operationMeasurements = Measurement.combine( + recordingMeterRegistry.getRecorder().getMeasurements(InstrumentType.LONG_COUNTER, RepositoriesMetrics.METRIC_OPERATIONS_TOTAL) + ); + assertThat(operationMeasurements, hasSize(1)); + return operationMeasurements.get(0).getLong(); + } + + private List getRetryStartedMeasurements() { + return Measurement.combine( + recordingMeterRegistry.getRecorder() + .getMeasurements(InstrumentType.LONG_COUNTER, S3RepositoriesMetrics.METRIC_RETRY_EVENT_TOTAL) + ); + } + + private List getRetryCompletedMeasurements() { + return Measurement.combine( + recordingMeterRegistry.getRecorder() + .getMeasurements(InstrumentType.LONG_COUNTER, S3RepositoriesMetrics.METRIC_RETRY_SUCCESS_TOTAL) + ); + } + + private List getRetryHistogramMeasurements() { + return recordingMeterRegistry.getRecorder() + .getMeasurements(InstrumentType.LONG_HISTOGRAM, S3RepositoriesMetrics.METRIC_RETRY_ATTEMPTS_HISTOGRAM); + } + + private Map metricAttributes(String action) { + return Map.of("repo_type", "s3", "repo_name", "repository", "operation", "GetObject", "purpose", "Indices", "action", action); + } + /** * Asserts that an InputStream is fully consumed, or aborted, when it is closed */ diff --git a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryTests.java b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryTests.java index 0a92ed0a28973..50470ec499ef6 100644 --- a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryTests.java +++ b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryTests.java @@ -18,7 +18,6 @@ import org.elasticsearch.common.util.MockBigArrays; import org.elasticsearch.env.Environment; import org.elasticsearch.indices.recovery.RecoverySettings; -import org.elasticsearch.repositories.RepositoriesMetrics; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; import org.elasticsearch.test.ESTestCase; @@ -130,7 +129,7 @@ private S3Repository createS3Repo(RepositoryMetadata metadata) { BlobStoreTestUtil.mockClusterService(), MockBigArrays.NON_RECYCLING_INSTANCE, new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), - RepositoriesMetrics.NOOP + S3RepositoriesMetrics.NOOP ) { @Override protected void assertSnapshotOrGenericThread() { diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesMetrics.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesMetrics.java index b4d79d89ec4c6..50aa7881cd2b6 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesMetrics.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesMetrics.java @@ -13,6 +13,7 @@ import org.elasticsearch.telemetry.metric.MeterRegistry; public record RepositoriesMetrics( + MeterRegistry meterRegistry, LongCounter requestCounter, LongCounter exceptionCounter, LongCounter throttleCounter, @@ -36,6 +37,7 @@ public record RepositoriesMetrics( public RepositoriesMetrics(MeterRegistry meterRegistry) { this( + meterRegistry, meterRegistry.registerLongCounter(METRIC_REQUESTS_TOTAL, "repository request counter", "unit"), meterRegistry.registerLongCounter(METRIC_EXCEPTIONS_TOTAL, "repository request exception counter", "unit"), meterRegistry.registerLongCounter(METRIC_THROTTLES_TOTAL, "repository request throttle counter", "unit"), diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/MeteredBlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/MeteredBlobStoreRepository.java index 6ecab2f8c77f2..c5ea99b0e5c14 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/MeteredBlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/MeteredBlobStoreRepository.java @@ -14,7 +14,6 @@ import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.indices.recovery.RecoverySettings; -import org.elasticsearch.repositories.RepositoriesMetrics; import org.elasticsearch.repositories.RepositoryInfo; import org.elasticsearch.repositories.RepositoryStatsSnapshot; import org.elasticsearch.threadpool.ThreadPool; @@ -24,7 +23,6 @@ public abstract class MeteredBlobStoreRepository extends BlobStoreRepository { private final RepositoryInfo repositoryInfo; - protected final RepositoriesMetrics repositoriesMetrics; public MeteredBlobStoreRepository( RepositoryMetadata metadata, @@ -33,11 +31,9 @@ public MeteredBlobStoreRepository( BigArrays bigArrays, RecoverySettings recoverySettings, BlobPath basePath, - Map location, - RepositoriesMetrics repositoriesMetrics + Map location ) { super(metadata, namedXContentRegistry, clusterService, bigArrays, recoverySettings, basePath); - this.repositoriesMetrics = repositoriesMetrics; ThreadPool threadPool = clusterService.getClusterApplierService().threadPool(); this.repositoryInfo = new RepositoryInfo( UUIDs.randomBase64UUID(), diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java index 4f7001f00e6a7..45e4bb09c1616 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java @@ -482,8 +482,7 @@ private MeteredRepositoryTypeA(RepositoryMetadata metadata, ClusterService clust MockBigArrays.NON_RECYCLING_INSTANCE, mock(RecoverySettings.class), BlobPath.EMPTY, - Map.of("bucket", "bucket-a"), - RepositoriesMetrics.NOOP + Map.of("bucket", "bucket-a") ); } @@ -510,8 +509,7 @@ private MeteredRepositoryTypeB(RepositoryMetadata metadata, ClusterService clust MockBigArrays.NON_RECYCLING_INSTANCE, mock(RecoverySettings.class), BlobPath.EMPTY, - Map.of("bucket", "bucket-b"), - RepositoriesMetrics.NOOP + Map.of("bucket", "bucket-b") ); } diff --git a/test/framework/src/main/java/org/elasticsearch/telemetry/RecordingMeterRegistry.java b/test/framework/src/main/java/org/elasticsearch/telemetry/RecordingMeterRegistry.java index 86bfd9bf38c26..33693c297f166 100644 --- a/test/framework/src/main/java/org/elasticsearch/telemetry/RecordingMeterRegistry.java +++ b/test/framework/src/main/java/org/elasticsearch/telemetry/RecordingMeterRegistry.java @@ -33,7 +33,7 @@ public class RecordingMeterRegistry implements MeterRegistry { protected final MetricRecorder recorder = new MetricRecorder<>(); - MetricRecorder getRecorder() { + public MetricRecorder getRecorder() { return recorder; }