diff --git a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java index f58611cb0567a..388474acc75ea 100644 --- a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java +++ b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java @@ -21,7 +21,6 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.indices.recovery.RecoverySettings; -import org.elasticsearch.repositories.RepositoriesMetrics; import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository; import org.elasticsearch.xcontent.NamedXContentRegistry; @@ -108,8 +107,7 @@ public AzureRepository( bigArrays, recoverySettings, buildBasePath(metadata), - buildLocation(metadata), - RepositoriesMetrics.NOOP + buildLocation(metadata) ); this.chunkSize = Repository.CHUNK_SIZE_SETTING.get(metadata.settings()); this.storageService = storageService; diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java index 94d0abe17909f..e2338371cf837 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java @@ -19,7 +19,6 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.indices.recovery.RecoverySettings; -import org.elasticsearch.repositories.RepositoriesMetrics; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository; import org.elasticsearch.xcontent.NamedXContentRegistry; @@ -77,8 +76,7 @@ class GoogleCloudStorageRepository extends MeteredBlobStoreRepository { bigArrays, recoverySettings, buildBasePath(metadata), - buildLocation(metadata), - RepositoriesMetrics.NOOP + buildLocation(metadata) ); this.storageService = storageService; diff --git a/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java b/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java index 248ccc119794e..4080a47c7dabe 100644 --- a/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java +++ b/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java @@ -39,7 +39,6 @@ import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.PluginsService; -import org.elasticsearch.repositories.RepositoriesMetrics; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryData; @@ -460,9 +459,9 @@ protected S3Repository createRepository( ClusterService clusterService, BigArrays bigArrays, RecoverySettings recoverySettings, - RepositoriesMetrics repositoriesMetrics + S3RepositoriesMetrics s3RepositoriesMetrics ) { - return new S3Repository(metadata, registry, getService(), clusterService, bigArrays, recoverySettings, repositoriesMetrics) { + return new S3Repository(metadata, registry, getService(), clusterService, bigArrays, recoverySettings, s3RepositoriesMetrics) { @Override public BlobStore blobStore() { diff --git a/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java b/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java index f182b54b0c696..b8fea485c6276 100644 --- a/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java +++ b/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java @@ -30,7 +30,6 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.repositories.AbstractThirdPartyRepositoryTestCase; -import org.elasticsearch.repositories.RepositoriesMetrics; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.fixtures.minio.MinioTestContainer; @@ -145,7 +144,7 @@ public long absoluteTimeInMillis() { ClusterServiceUtils.createClusterService(threadpool), BigArrays.NON_RECYCLING_INSTANCE, new RecoverySettings(node().settings(), node().injector().getInstance(ClusterService.class).getClusterSettings()), - RepositoriesMetrics.NOOP + S3RepositoriesMetrics.NOOP ) ) { repository.start(); diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java index 78b1e2dba98b3..6b9937b01a433 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java @@ -33,7 +33,6 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.core.TimeValue; -import org.elasticsearch.repositories.RepositoriesMetrics; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; @@ -84,7 +83,7 @@ class S3BlobStore implements BlobStore { private final ThreadPool threadPool; private final Executor snapshotExecutor; - private final RepositoriesMetrics repositoriesMetrics; + private final S3RepositoriesMetrics s3RepositoriesMetrics; private final StatsCollectors statsCollectors = new StatsCollectors(); @@ -98,7 +97,7 @@ class S3BlobStore implements BlobStore { RepositoryMetadata repositoryMetadata, BigArrays bigArrays, ThreadPool threadPool, - RepositoriesMetrics repositoriesMetrics + S3RepositoriesMetrics s3RepositoriesMetrics ) { this.service = service; this.bigArrays = bigArrays; @@ -110,7 +109,7 @@ class S3BlobStore implements BlobStore { this.repositoryMetadata = repositoryMetadata; this.threadPool = threadPool; this.snapshotExecutor = threadPool.executor(ThreadPool.Names.SNAPSHOT); - this.repositoriesMetrics = repositoriesMetrics; + this.s3RepositoriesMetrics = s3RepositoriesMetrics; } RequestMetricCollector getMetricCollector(Operation operation, OperationPurpose purpose) { @@ -174,19 +173,19 @@ public final void collectMetrics(Request request, Response response) { .map(List::size) .orElse(0); - repositoriesMetrics.operationCounter().incrementBy(1, attributes); + s3RepositoriesMetrics.common().operationCounter().incrementBy(1, attributes); if (numberOfAwsErrors == requestCount) { - repositoriesMetrics.unsuccessfulOperationCounter().incrementBy(1, attributes); + s3RepositoriesMetrics.common().unsuccessfulOperationCounter().incrementBy(1, attributes); } - repositoriesMetrics.requestCounter().incrementBy(requestCount, attributes); + s3RepositoriesMetrics.common().requestCounter().incrementBy(requestCount, attributes); if (exceptionCount > 0) { - repositoriesMetrics.exceptionCounter().incrementBy(exceptionCount, attributes); - repositoriesMetrics.exceptionHistogram().record(exceptionCount, attributes); + s3RepositoriesMetrics.common().exceptionCounter().incrementBy(exceptionCount, attributes); + s3RepositoriesMetrics.common().exceptionHistogram().record(exceptionCount, attributes); } if (throttleCount > 0) { - repositoriesMetrics.throttleCounter().incrementBy(throttleCount, attributes); - repositoriesMetrics.throttleHistogram().record(throttleCount, attributes); + s3RepositoriesMetrics.common().throttleCounter().incrementBy(throttleCount, attributes); + s3RepositoriesMetrics.common().throttleHistogram().record(throttleCount, attributes); } maybeRecordHttpRequestTime(request); } @@ -207,7 +206,7 @@ private void maybeRecordHttpRequestTime(Request request) { if (totalTimeInMicros == 0) { logger.warn("Expected HttpRequestTime to be tracked for request [{}] but found no count.", request); } else { - repositoriesMetrics.httpRequestTimeInMicroHistogram().record(totalTimeInMicros, attributes); + s3RepositoriesMetrics.common().httpRequestTimeInMicroHistogram().record(totalTimeInMicros, attributes); } } @@ -293,6 +292,14 @@ public long bufferSizeInBytes() { return bufferSize.getBytes(); } + public RepositoryMetadata getRepositoryMetadata() { + return repositoryMetadata; + } + + public S3RepositoriesMetrics getS3RepositoriesMetrics() { + return s3RepositoriesMetrics; + } + @Override public BlobContainer blobContainer(BlobPath path) { return new S3BlobContainer(path, this); diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoriesMetrics.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoriesMetrics.java new file mode 100644 index 0000000000000..e025214998d5b --- /dev/null +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoriesMetrics.java @@ -0,0 +1,37 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.repositories.s3; + +import org.elasticsearch.repositories.RepositoriesMetrics; +import org.elasticsearch.telemetry.metric.LongCounter; +import org.elasticsearch.telemetry.metric.LongHistogram; + +public record S3RepositoriesMetrics( + RepositoriesMetrics common, + LongCounter retryStartedCounter, + LongCounter retryCompletedCounter, + LongHistogram retryHistogram +) { + + public static S3RepositoriesMetrics NOOP = new S3RepositoriesMetrics(RepositoriesMetrics.NOOP); + + public static final String METRIC_RETRY_EVENT_TOTAL = "es.repositories.s3.input_stream.retry.event.total"; + public static final String METRIC_RETRY_SUCCESS_TOTAL = "es.repositories.s3.input_stream.retry.success.total"; + public static final String METRIC_RETRY_ATTEMPTS_HISTOGRAM = "es.repositories.s3.input_stream.retry.attempts.histogram"; + + public S3RepositoriesMetrics(RepositoriesMetrics common) { + this( + common, + common.meterRegistry().registerLongCounter(METRIC_RETRY_EVENT_TOTAL, "s3 input stream retry event count", "unit"), + common.meterRegistry().registerLongCounter(METRIC_RETRY_SUCCESS_TOTAL, "s3 input stream retry success count", "unit"), + common.meterRegistry() + .registerLongHistogram(METRIC_RETRY_ATTEMPTS_HISTOGRAM, "s3 input stream retry attempts histogram", "unit") + ); + } +} diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java index 624867a2f0c41..26b1b1158dea0 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java @@ -31,7 +31,6 @@ import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.repositories.FinalizeSnapshotContext; -import org.elasticsearch.repositories.RepositoriesMetrics; import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository; @@ -195,6 +194,8 @@ class S3Repository extends MeteredBlobStoreRepository { private final Executor snapshotExecutor; + private final S3RepositoriesMetrics s3RepositoriesMetrics; + /** * Constructs an s3 backed repository */ @@ -205,7 +206,7 @@ class S3Repository extends MeteredBlobStoreRepository { final ClusterService clusterService, final BigArrays bigArrays, final RecoverySettings recoverySettings, - final RepositoriesMetrics repositoriesMetrics + final S3RepositoriesMetrics s3RepositoriesMetrics ) { super( metadata, @@ -214,10 +215,10 @@ class S3Repository extends MeteredBlobStoreRepository { bigArrays, recoverySettings, buildBasePath(metadata), - buildLocation(metadata), - repositoriesMetrics + buildLocation(metadata) ); this.service = service; + this.s3RepositoriesMetrics = s3RepositoriesMetrics; this.snapshotExecutor = threadPool().executor(ThreadPool.Names.SNAPSHOT); // Parse and validate the user's S3 Storage Class setting @@ -408,7 +409,7 @@ protected S3BlobStore createBlobStore() { metadata, bigArrays, threadPool, - repositoriesMetrics + s3RepositoriesMetrics ); } diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java index 83668cc271922..26047c3b416a7 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java @@ -78,9 +78,9 @@ protected S3Repository createRepository( final ClusterService clusterService, final BigArrays bigArrays, final RecoverySettings recoverySettings, - final RepositoriesMetrics repositoriesMetrics + final S3RepositoriesMetrics s3RepositoriesMetrics ) { - return new S3Repository(metadata, registry, service.get(), clusterService, bigArrays, recoverySettings, repositoriesMetrics); + return new S3Repository(metadata, registry, service.get(), clusterService, bigArrays, recoverySettings, s3RepositoriesMetrics); } @Override @@ -101,11 +101,12 @@ public Map getRepositories( final ClusterService clusterService, final BigArrays bigArrays, final RecoverySettings recoverySettings, - RepositoriesMetrics repositoriesMetrics + final RepositoriesMetrics repositoriesMetrics ) { + final S3RepositoriesMetrics s3RepositoriesMetrics = new S3RepositoriesMetrics(repositoriesMetrics); return Collections.singletonMap( S3Repository.TYPE, - metadata -> createRepository(metadata, registry, clusterService, bigArrays, recoverySettings, repositoriesMetrics) + metadata -> createRepository(metadata, registry, clusterService, bigArrays, recoverySettings, s3RepositoriesMetrics) ); } diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java index c457b9d51e8b9..f7a99a399f59f 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java @@ -27,6 +27,7 @@ import java.nio.file.NoSuchFileException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import static org.elasticsearch.core.Strings.format; import static org.elasticsearch.repositories.s3.S3BlobStore.configureRequestForMetrics; @@ -80,7 +81,7 @@ class S3RetryingInputStream extends InputStream { this.end = end; final int initialAttempt = attempt; openStreamWithRetry(); - maybeLogForSuccessAfterRetries(initialAttempt, "opened"); + maybeLogAndRecordMetricsForSuccess(initialAttempt, "open"); } private void openStreamWithRetry() throws IOException { @@ -105,6 +106,9 @@ private void openStreamWithRetry() throws IOException { ); } + if (attempt == 1) { + blobStore.getS3RepositoriesMetrics().retryStartedCounter().incrementBy(1, metricAttributes("open")); + } final long delayInMillis = maybeLogAndComputeRetryDelay("opening", e); delayBeforeRetry(delayInMillis); } @@ -142,9 +146,12 @@ public int read() throws IOException { } else { currentOffset += 1; } - maybeLogForSuccessAfterRetries(initialAttempt, "read"); + maybeLogAndRecordMetricsForSuccess(initialAttempt, "read"); return result; } catch (IOException e) { + if (attempt == initialAttempt) { + blobStore.getS3RepositoriesMetrics().retryStartedCounter().incrementBy(1, metricAttributes("read")); + } reopenStreamOrFail(e); } } @@ -162,9 +169,12 @@ public int read(byte[] b, int off, int len) throws IOException { } else { currentOffset += bytesRead; } - maybeLogForSuccessAfterRetries(initialAttempt, "read"); + maybeLogAndRecordMetricsForSuccess(initialAttempt, "read"); return bytesRead; } catch (IOException e) { + if (attempt == initialAttempt) { + blobStore.getS3RepositoriesMetrics().retryStartedCounter().incrementBy(1, metricAttributes("read")); + } reopenStreamOrFail(e); } } @@ -246,16 +256,20 @@ private void logForRetry(Level level, String action, Exception e) { ); } - private void maybeLogForSuccessAfterRetries(int initialAttempt, String action) { + private void maybeLogAndRecordMetricsForSuccess(int initialAttempt, String action) { if (attempt > initialAttempt) { + final int numberOfRetries = attempt - initialAttempt; logger.info( "successfully {} input stream for [{}/{}] with purpose [{}] after [{}] retries", action, blobStore.bucket(), blobKey, purpose.getKey(), - attempt - initialAttempt + numberOfRetries ); + final Map attributes = metricAttributes(action); + blobStore.getS3RepositoriesMetrics().retryCompletedCounter().incrementBy(1, attributes); + blobStore.getS3RepositoriesMetrics().retryHistogram().record(numberOfRetries, attributes); } } @@ -294,6 +308,21 @@ protected long getRetryDelayInMillis() { return 10L << (Math.min(attempt - 1, 10)); } + private Map metricAttributes(String action) { + return Map.of( + "repo_type", + S3Repository.TYPE, + "repo_name", + blobStore.getRepositoryMetadata().name(), + "operation", + Operation.GET_OBJECT.getKey(), + "purpose", + purpose.getKey(), + "action", + action + ); + } + @Override public void close() throws IOException { maybeAbort(currentStream); diff --git a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java index 28a48c2968f59..cf3bc21526bf6 100644 --- a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java +++ b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java @@ -26,7 +26,6 @@ import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.PluginsService; -import org.elasticsearch.repositories.RepositoriesMetrics; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.rest.AbstractRestChannel; import org.elasticsearch.rest.RestRequest; @@ -264,9 +263,9 @@ protected S3Repository createRepository( ClusterService clusterService, BigArrays bigArrays, RecoverySettings recoverySettings, - RepositoriesMetrics repositoriesMetrics + S3RepositoriesMetrics s3RepositoriesMetrics ) { - return new S3Repository(metadata, registry, getService(), clusterService, bigArrays, recoverySettings, repositoriesMetrics) { + return new S3Repository(metadata, registry, getService(), clusterService, bigArrays, recoverySettings, s3RepositoriesMetrics) { @Override protected void assertSnapshotOrGenericThread() { // eliminate thread name check as we create repo manually on test/main threads diff --git a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java index 0ddd29171b3bd..05268d750637c 100644 --- a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -43,6 +43,9 @@ import org.elasticsearch.repositories.RepositoriesMetrics; import org.elasticsearch.repositories.blobstore.AbstractBlobContainerRetriesTestCase; import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; +import org.elasticsearch.telemetry.InstrumentType; +import org.elasticsearch.telemetry.Measurement; +import org.elasticsearch.telemetry.RecordingMeterRegistry; import org.elasticsearch.watcher.ResourceWatcherService; import org.hamcrest.Matcher; import org.junit.After; @@ -59,7 +62,9 @@ import java.nio.charset.StandardCharsets; import java.nio.file.NoSuchFileException; import java.util.Arrays; +import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Objects; import java.util.OptionalInt; import java.util.concurrent.atomic.AtomicBoolean; @@ -74,10 +79,13 @@ import static org.elasticsearch.repositories.s3.S3ClientSettings.READ_TIMEOUT_SETTING; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; @@ -91,6 +99,7 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes private S3Service service; private AtomicBoolean shouldErrorOnDns; + private RecordingMeterRegistry recordingMeterRegistry; @Before public void setUp() throws Exception { @@ -109,6 +118,7 @@ protected AmazonS3ClientBuilder buildClientBuilder(S3ClientSettings clientSettin return builder; } }; + recordingMeterRegistry = new RecordingMeterRegistry(); super.setUp(); } @@ -185,7 +195,7 @@ protected BlobContainer createBlobContainer( repositoryMetadata, BigArrays.NON_RECYCLING_INSTANCE, new DeterministicTaskQueue().getThreadPool(), - RepositoriesMetrics.NOOP + new S3RepositoriesMetrics(new RepositoriesMetrics(recordingMeterRegistry)) ); return new S3BlobContainer(randomBoolean() ? BlobPath.EMPTY : BlobPath.EMPTY.add("foo"), s3BlobStore) { @Override @@ -669,8 +679,8 @@ public void handle(HttpExchange exchange) throws IOException { } exchange.getResponseBody().write(bytes, rangeStart, length); } else { - failures.incrementAndGet(); if (randomBoolean()) { + failures.incrementAndGet(); exchange.sendResponseHeaders( randomFrom( HttpStatus.SC_INTERNAL_SERVER_ERROR, @@ -686,6 +696,8 @@ public void handle(HttpExchange exchange) throws IOException { if (bytesSent >= meaningfulProgressBytes) { exchange.getResponseBody().flush(); } + } else { + failures.incrementAndGet(); } } } @@ -700,16 +712,28 @@ public void handle(HttpExchange exchange) throws IOException { final int length = between(0, randomBoolean() ? bytes.length : Integer.MAX_VALUE); logger.info("--> position={}, length={}", position, length); try (InputStream inputStream = blobContainer.readBlob(OperationPurpose.INDICES, "read_blob_retries_forever", position, length)) { + assertMetricsForOpeningStream(); + recordingMeterRegistry.getRecorder().resetCalls(); + failures.set(0); + final byte[] bytesRead = BytesReference.toBytes(Streams.readFully(inputStream)); assertArrayEquals(Arrays.copyOfRange(bytes, position, Math.min(bytes.length, position + length)), bytesRead); + assertMetricsForReadingStream(); } assertThat(failures.get(), greaterThan(totalFailures)); // Read the whole blob failures.set(0); + recordingMeterRegistry.getRecorder().resetCalls(); try (InputStream inputStream = blobContainer.readBlob(OperationPurpose.INDICES, "read_blob_retries_forever")) { + assertMetricsForOpeningStream(); + recordingMeterRegistry.getRecorder().resetCalls(); + failures.set(0); + final byte[] bytesRead = BytesReference.toBytes(Streams.readFully(inputStream)); assertArrayEquals(bytes, bytesRead); + + assertMetricsForReadingStream(); } assertThat(failures.get(), greaterThan(totalFailures)); } @@ -737,9 +761,13 @@ public void handle(HttpExchange exchange) throws IOException { : blobContainer.readBlob(randomRetryingPurpose(), "read_blob_not_found", between(0, 100), between(1, 100)) ) { Streams.readFully(inputStream); + } }); assertThat(numberOfReads.get(), equalTo(1)); + assertThat(getRetryStartedMeasurements(), empty()); + assertThat(getRetryCompletedMeasurements(), empty()); + assertThat(getRetryHistogramMeasurements(), empty()); } @Override @@ -761,6 +789,77 @@ protected OperationPurpose randomFiniteRetryingPurpose() { ); } + private void assertMetricsForOpeningStream() { + final long numberOfOperations = getOperationMeasurements(); + // S3 client sdk internally also retries within the configured maxRetries for retryable errors. + // The retries in S3RetryingInputStream are triggered when the client internal retries are unsuccessful + if (numberOfOperations > 1) { + // For opening the stream, there should be exactly one pair of started and completed records. + // There should be one histogram record, the number of retries must be greater than 0 + final Map attributes = metricAttributes("open"); + assertThat(getRetryStartedMeasurements(), contains(new Measurement(1L, attributes, false))); + assertThat(getRetryCompletedMeasurements(), contains(new Measurement(1L, attributes, false))); + final List retryHistogramMeasurements = getRetryHistogramMeasurements(); + assertThat(retryHistogramMeasurements, hasSize(1)); + assertThat(retryHistogramMeasurements.get(0).getLong(), equalTo(numberOfOperations - 1)); + assertThat(retryHistogramMeasurements.get(0).attributes(), equalTo(attributes)); + } else { + assertThat(getRetryStartedMeasurements(), empty()); + assertThat(getRetryCompletedMeasurements(), empty()); + assertThat(getRetryHistogramMeasurements(), empty()); + } + } + + private void assertMetricsForReadingStream() { + // For reading the stream, there could be multiple pairs of started and completed records. + // It is important that they always come in pairs and the number of pairs match the number + // of histogram records. + final Map attributes = metricAttributes("read"); + final List retryHistogramMeasurements = getRetryHistogramMeasurements(); + final int numberOfReads = retryHistogramMeasurements.size(); + retryHistogramMeasurements.forEach(measurement -> { + assertThat(measurement.getLong(), greaterThan(0L)); + assertThat(measurement.attributes(), equalTo(attributes)); + }); + + final List retryStartedMeasurements = getRetryStartedMeasurements(); + assertThat(retryStartedMeasurements, hasSize(1)); + assertThat(retryStartedMeasurements.get(0).getLong(), equalTo((long) numberOfReads)); + assertThat(retryStartedMeasurements.get(0).attributes(), equalTo(attributes)); + assertThat(retryStartedMeasurements, equalTo(getRetryCompletedMeasurements())); + } + + private long getOperationMeasurements() { + final List operationMeasurements = Measurement.combine( + recordingMeterRegistry.getRecorder().getMeasurements(InstrumentType.LONG_COUNTER, RepositoriesMetrics.METRIC_OPERATIONS_TOTAL) + ); + assertThat(operationMeasurements, hasSize(1)); + return operationMeasurements.get(0).getLong(); + } + + private List getRetryStartedMeasurements() { + return Measurement.combine( + recordingMeterRegistry.getRecorder() + .getMeasurements(InstrumentType.LONG_COUNTER, S3RepositoriesMetrics.METRIC_RETRY_EVENT_TOTAL) + ); + } + + private List getRetryCompletedMeasurements() { + return Measurement.combine( + recordingMeterRegistry.getRecorder() + .getMeasurements(InstrumentType.LONG_COUNTER, S3RepositoriesMetrics.METRIC_RETRY_SUCCESS_TOTAL) + ); + } + + private List getRetryHistogramMeasurements() { + return recordingMeterRegistry.getRecorder() + .getMeasurements(InstrumentType.LONG_HISTOGRAM, S3RepositoriesMetrics.METRIC_RETRY_ATTEMPTS_HISTOGRAM); + } + + private Map metricAttributes(String action) { + return Map.of("repo_type", "s3", "repo_name", "repository", "operation", "GetObject", "purpose", "Indices", "action", action); + } + /** * Asserts that an InputStream is fully consumed, or aborted, when it is closed */ diff --git a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryTests.java b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryTests.java index 0a92ed0a28973..50470ec499ef6 100644 --- a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryTests.java +++ b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryTests.java @@ -18,7 +18,6 @@ import org.elasticsearch.common.util.MockBigArrays; import org.elasticsearch.env.Environment; import org.elasticsearch.indices.recovery.RecoverySettings; -import org.elasticsearch.repositories.RepositoriesMetrics; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; import org.elasticsearch.test.ESTestCase; @@ -130,7 +129,7 @@ private S3Repository createS3Repo(RepositoryMetadata metadata) { BlobStoreTestUtil.mockClusterService(), MockBigArrays.NON_RECYCLING_INSTANCE, new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), - RepositoriesMetrics.NOOP + S3RepositoriesMetrics.NOOP ) { @Override protected void assertSnapshotOrGenericThread() { diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesMetrics.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesMetrics.java index b4d79d89ec4c6..50aa7881cd2b6 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesMetrics.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesMetrics.java @@ -13,6 +13,7 @@ import org.elasticsearch.telemetry.metric.MeterRegistry; public record RepositoriesMetrics( + MeterRegistry meterRegistry, LongCounter requestCounter, LongCounter exceptionCounter, LongCounter throttleCounter, @@ -36,6 +37,7 @@ public record RepositoriesMetrics( public RepositoriesMetrics(MeterRegistry meterRegistry) { this( + meterRegistry, meterRegistry.registerLongCounter(METRIC_REQUESTS_TOTAL, "repository request counter", "unit"), meterRegistry.registerLongCounter(METRIC_EXCEPTIONS_TOTAL, "repository request exception counter", "unit"), meterRegistry.registerLongCounter(METRIC_THROTTLES_TOTAL, "repository request throttle counter", "unit"), diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/MeteredBlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/MeteredBlobStoreRepository.java index 6ecab2f8c77f2..c5ea99b0e5c14 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/MeteredBlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/MeteredBlobStoreRepository.java @@ -14,7 +14,6 @@ import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.indices.recovery.RecoverySettings; -import org.elasticsearch.repositories.RepositoriesMetrics; import org.elasticsearch.repositories.RepositoryInfo; import org.elasticsearch.repositories.RepositoryStatsSnapshot; import org.elasticsearch.threadpool.ThreadPool; @@ -24,7 +23,6 @@ public abstract class MeteredBlobStoreRepository extends BlobStoreRepository { private final RepositoryInfo repositoryInfo; - protected final RepositoriesMetrics repositoriesMetrics; public MeteredBlobStoreRepository( RepositoryMetadata metadata, @@ -33,11 +31,9 @@ public MeteredBlobStoreRepository( BigArrays bigArrays, RecoverySettings recoverySettings, BlobPath basePath, - Map location, - RepositoriesMetrics repositoriesMetrics + Map location ) { super(metadata, namedXContentRegistry, clusterService, bigArrays, recoverySettings, basePath); - this.repositoriesMetrics = repositoriesMetrics; ThreadPool threadPool = clusterService.getClusterApplierService().threadPool(); this.repositoryInfo = new RepositoryInfo( UUIDs.randomBase64UUID(), diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java index 4f7001f00e6a7..45e4bb09c1616 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java @@ -482,8 +482,7 @@ private MeteredRepositoryTypeA(RepositoryMetadata metadata, ClusterService clust MockBigArrays.NON_RECYCLING_INSTANCE, mock(RecoverySettings.class), BlobPath.EMPTY, - Map.of("bucket", "bucket-a"), - RepositoriesMetrics.NOOP + Map.of("bucket", "bucket-a") ); } @@ -510,8 +509,7 @@ private MeteredRepositoryTypeB(RepositoryMetadata metadata, ClusterService clust MockBigArrays.NON_RECYCLING_INSTANCE, mock(RecoverySettings.class), BlobPath.EMPTY, - Map.of("bucket", "bucket-b"), - RepositoriesMetrics.NOOP + Map.of("bucket", "bucket-b") ); } diff --git a/test/framework/src/main/java/org/elasticsearch/telemetry/RecordingMeterRegistry.java b/test/framework/src/main/java/org/elasticsearch/telemetry/RecordingMeterRegistry.java index 86bfd9bf38c26..33693c297f166 100644 --- a/test/framework/src/main/java/org/elasticsearch/telemetry/RecordingMeterRegistry.java +++ b/test/framework/src/main/java/org/elasticsearch/telemetry/RecordingMeterRegistry.java @@ -33,7 +33,7 @@ public class RecordingMeterRegistry implements MeterRegistry { protected final MetricRecorder recorder = new MetricRecorder<>(); - MetricRecorder getRecorder() { + public MetricRecorder getRecorder() { return recorder; }