diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java index f371d6f354763..3ff0497b42719 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java @@ -82,6 +82,10 @@ class S3BlobStore implements BlobStore { private final StatsCollectors statsCollectors = new StatsCollectors(); + private static final TimeValue RETRY_STATS_WINDOW = TimeValue.timeValueMinutes(5); + + private volatile S3RequestRetryStats s3RequestRetryStats; + S3BlobStore( S3Service service, String bucket, @@ -105,10 +109,23 @@ class S3BlobStore implements BlobStore { this.threadPool = threadPool; this.snapshotExecutor = threadPool.executor(ThreadPool.Names.SNAPSHOT); this.meter = meter; + s3RequestRetryStats = new S3RequestRetryStats(getMaxRetries()); + threadPool.scheduleWithFixedDelay(() -> { + var priorRetryStats = s3RequestRetryStats; + s3RequestRetryStats = new S3RequestRetryStats(getMaxRetries()); + priorRetryStats.emitMetrics(); + }, RETRY_STATS_WINDOW, threadPool.generic()); } RequestMetricCollector getMetricCollector(Operation operation, OperationPurpose purpose) { - return statsCollectors.getMetricCollector(operation, purpose); + var collector = statsCollectors.getMetricCollector(operation, purpose); + return new RequestMetricCollector() { + @Override + public void collectMetrics(Request request, Response response) { + s3RequestRetryStats.addRequest(request); + collector.collectMetrics(request, response); + } + }; } public Executor getSnapshotExecutor() { @@ -178,7 +195,7 @@ public AmazonS3Reference clientReference() { return service.client(repositoryMetadata); } - int getMaxRetries() { + final int getMaxRetries() { return service.settings(repositoryMetadata).maxRetries; } diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RequestRetryStats.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RequestRetryStats.java new file mode 100644 index 0000000000000..952668f370161 --- /dev/null +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RequestRetryStats.java @@ -0,0 +1,87 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.repositories.s3; + +import com.amazonaws.Request; +import com.amazonaws.util.AWSRequestMetrics; +import com.amazonaws.util.TimingInfo; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.common.logging.ESLogMessage; +import org.elasticsearch.common.util.Maps; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicLongArray; + +/** + * This class emit aws s3 metrics as logs until we have a proper apm integration + */ +public class S3RequestRetryStats { + + private static final Logger logger = LogManager.getLogger(S3RequestRetryStats.class); + + private final AtomicLong requests = new AtomicLong(); + private final AtomicLong exceptions = new AtomicLong(); + private final AtomicLong throttles = new AtomicLong(); + private final AtomicLongArray exceptionsHistogram; + private final AtomicLongArray throttlesHistogram; + + public S3RequestRetryStats(int maxRetries) { + this.exceptionsHistogram = new AtomicLongArray(maxRetries + 1); + this.throttlesHistogram = new AtomicLongArray(maxRetries + 1); + } + + public void addRequest(Request request) { + if (request == null) { + return; + } + var info = request.getAWSRequestMetrics().getTimingInfo(); + long requests = getCounter(info, AWSRequestMetrics.Field.RequestCount); + long exceptions = getCounter(info, AWSRequestMetrics.Field.Exception); + long throttles = getCounter(info, AWSRequestMetrics.Field.ThrottleException); + + this.requests.addAndGet(requests); + this.exceptions.addAndGet(exceptions); + this.throttles.addAndGet(throttles); + if (exceptions >= 0 && exceptions < this.exceptionsHistogram.length()) { + this.exceptionsHistogram.incrementAndGet((int) exceptions); + } + if (throttles >= 0 && throttles < this.throttlesHistogram.length()) { + this.throttlesHistogram.incrementAndGet((int) throttles); + } + } + + private static long getCounter(TimingInfo info, AWSRequestMetrics.Field field) { + var counter = info.getCounter(field.name()); + return counter != null ? counter.longValue() : 0L; + } + + public void emitMetrics() { + if (logger.isDebugEnabled()) { + var metrics = Maps.newMapWithExpectedSize(3); + metrics.put("elasticsearch.metrics.s3.requests", requests.get()); + metrics.put("elasticsearch.metrics.s3.exceptions", exceptions.get()); + metrics.put("elasticsearch.metrics.s3.throttles", throttles.get()); + for (int i = 0; i < exceptionsHistogram.length(); i++) { + long exceptions = exceptionsHistogram.get(i); + if (exceptions != 0) { + metrics.put("elasticsearch.metrics.s3.exceptions.h" + i, exceptions); + } + } + for (int i = 0; i < throttlesHistogram.length(); i++) { + long throttles = throttlesHistogram.get(i); + if (throttles != 0) { + metrics.put("elasticsearch.metrics.s3.throttles.h" + i, throttles); + } + } + logger.debug(new ESLogMessage().withFields(metrics)); + } + } +}