Skip to content

Commit

Permalink
Publish APM metrics from the Azure BlobStore (elastic#113913)
Browse files Browse the repository at this point in the history
Closes ES-9550
  • Loading branch information
nicktindall authored Oct 8, 2024
1 parent 740cb2e commit 1c40954
Show file tree
Hide file tree
Showing 13 changed files with 912 additions and 97 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;

import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.OperationPurpose;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.MockSecureSettings;
Expand All @@ -30,8 +32,15 @@
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.RepositoryMissingException;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.telemetry.Measurement;
import org.elasticsearch.telemetry.TestTelemetryPlugin;
import org.elasticsearch.test.BackgroundIndexer;

import java.io.ByteArrayInputStream;
import java.io.IOException;
Expand All @@ -41,22 +50,33 @@
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.elasticsearch.repositories.RepositoriesMetrics.METRIC_OPERATIONS_TOTAL;
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.is;

@SuppressForbidden(reason = "this test uses a HttpServer to emulate an Azure endpoint")
public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTestCase {

private static final String DEFAULT_ACCOUNT_NAME = "account";
protected static final String DEFAULT_ACCOUNT_NAME = "account";
protected static final Predicate<String> LIST_PATTERN = Pattern.compile("GET /[a-zA-Z0-9]+/[a-zA-Z0-9]+\\?.+").asMatchPredicate();
protected static final Predicate<String> GET_BLOB_PATTERN = Pattern.compile("GET /[a-zA-Z0-9]+/[a-zA-Z0-9]+/.+").asMatchPredicate();

@Override
protected String repositoryType() {
Expand All @@ -78,7 +98,7 @@ protected Settings repositorySettings(String repoName) {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singletonList(TestAzureRepositoryPlugin.class);
return List.of(TestAzureRepositoryPlugin.class, TestTelemetryPlugin.class);
}

@Override
Expand All @@ -91,7 +111,7 @@ protected Map<String, HttpHandler> createHttpHandlers() {

@Override
protected HttpHandler createErroneousHttpHandler(final HttpHandler delegate) {
return new AzureErroneousHttpHandler(delegate, AzureStorageSettings.DEFAULT_MAX_RETRIES);
return new AzureHTTPStatsCollectorHandler(new AzureErroneousHttpHandler(delegate, AzureStorageSettings.DEFAULT_MAX_RETRIES));
}

@Override
Expand Down Expand Up @@ -119,6 +139,13 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
.build();
}

protected TestTelemetryPlugin getTelemetryPlugin(String dataNodeName) {
return internalCluster().getInstance(PluginsService.class, dataNodeName)
.filterPlugins(TestTelemetryPlugin.class)
.findFirst()
.orElseThrow();
}

/**
* AzureRepositoryPlugin that allows to set low values for the Azure's client retry policy
* and for BlobRequestOptions#getSingleBlobPutThresholdInBytes().
Expand Down Expand Up @@ -195,9 +222,6 @@ protected String requestUniqueId(final HttpExchange exchange) {
*/
@SuppressForbidden(reason = "this test uses a HttpServer to emulate an Azure endpoint")
private static class AzureHTTPStatsCollectorHandler extends HttpStatsCollectorHandler {
private static final Predicate<String> LIST_PATTERN = Pattern.compile("GET /[a-zA-Z0-9]+/[a-zA-Z0-9]+\\?.+").asMatchPredicate();
private static final Predicate<String> GET_BLOB_PATTERN = Pattern.compile("GET /[a-zA-Z0-9]+/[a-zA-Z0-9]+/.+").asMatchPredicate();

private final Set<String> seenRequestIds = ConcurrentCollections.newConcurrentSet();

private AzureHTTPStatsCollectorHandler(HttpHandler delegate) {
Expand Down Expand Up @@ -303,4 +327,87 @@ public void testReadByteByByte() throws Exception {
container.delete(randomPurpose());
}
}

public void testMetrics() throws Exception {
// Reset all the metrics so there's none lingering from previous tests
internalCluster().getInstances(PluginsService.class)
.forEach(ps -> ps.filterPlugins(TestTelemetryPlugin.class).forEach(TestTelemetryPlugin::resetMeter));

// Create the repository and perform some activities
final String repository = createRepository(randomRepositoryName(), false);
final String index = "index-no-merges";
createIndex(index, 1, 0);

final long nbDocs = randomLongBetween(10_000L, 20_000L);
try (BackgroundIndexer indexer = new BackgroundIndexer(index, client(), (int) nbDocs)) {
waitForDocs(nbDocs, indexer);
}
flushAndRefresh(index);
BroadcastResponse forceMerge = client().admin().indices().prepareForceMerge(index).setFlush(true).setMaxNumSegments(1).get();
assertThat(forceMerge.getSuccessfulShards(), equalTo(1));
assertHitCount(prepareSearch(index).setSize(0).setTrackTotalHits(true), nbDocs);

final String snapshot = "snapshot";
assertSuccessfulSnapshot(
clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, repository, snapshot).setWaitForCompletion(true).setIndices(index)
);
assertAcked(client().admin().indices().prepareDelete(index));
assertSuccessfulRestore(
clusterAdmin().prepareRestoreSnapshot(TEST_REQUEST_TIMEOUT, repository, snapshot).setWaitForCompletion(true)
);
ensureGreen(index);
assertHitCount(prepareSearch(index).setSize(0).setTrackTotalHits(true), nbDocs);
assertAcked(clusterAdmin().prepareDeleteSnapshot(TEST_REQUEST_TIMEOUT, repository, snapshot).get());

final Map<AzureBlobStore.Operation, Long> aggregatedMetrics = new HashMap<>();
// Compare collected stats and metrics for each node and they should be the same
for (var nodeName : internalCluster().getNodeNames()) {
final BlobStoreRepository blobStoreRepository;
try {
blobStoreRepository = (BlobStoreRepository) internalCluster().getInstance(RepositoriesService.class, nodeName)
.repository(repository);
} catch (RepositoryMissingException e) {
continue;
}

final AzureBlobStore blobStore = (AzureBlobStore) blobStoreRepository.blobStore();
final Map<AzureBlobStore.StatsKey, LongAdder> statsCollectors = blobStore.getMetricsRecorder().opsCounters;

final List<Measurement> metrics = Measurement.combine(
getTelemetryPlugin(nodeName).getLongCounterMeasurement(METRIC_OPERATIONS_TOTAL)
);

assertThat(
statsCollectors.keySet().stream().map(AzureBlobStore.StatsKey::operation).collect(Collectors.toSet()),
equalTo(
metrics.stream()
.map(m -> AzureBlobStore.Operation.fromKey((String) m.attributes().get("operation")))
.collect(Collectors.toSet())
)
);
metrics.forEach(metric -> {
assertThat(
metric.attributes(),
allOf(hasEntry("repo_type", AzureRepository.TYPE), hasKey("repo_name"), hasKey("operation"), hasKey("purpose"))
);
final AzureBlobStore.Operation operation = AzureBlobStore.Operation.fromKey((String) metric.attributes().get("operation"));
final AzureBlobStore.StatsKey statsKey = new AzureBlobStore.StatsKey(
operation,
OperationPurpose.parse((String) metric.attributes().get("purpose"))
);
assertThat(nodeName + "/" + statsKey + " exists", statsCollectors, hasKey(statsKey));
assertThat(nodeName + "/" + statsKey + " has correct sum", metric.getLong(), equalTo(statsCollectors.get(statsKey).sum()));
aggregatedMetrics.compute(statsKey.operation(), (k, v) -> v == null ? metric.getLong() : v + metric.getLong());
});
}

// Metrics number should be consistent with server side request count as well.
assertThat(aggregatedMetrics, equalTo(getServerMetrics()));
}

private Map<AzureBlobStore.Operation, Long> getServerMetrics() {
return getMockRequestCounts().entrySet()
.stream()
.collect(Collectors.toMap(e -> AzureBlobStore.Operation.fromKey(e.getKey()), Map.Entry::getValue));
}
}
Loading

0 comments on commit 1c40954

Please sign in to comment.