Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Tiered Caching] Stats rework (3/4): Exposing new cache stats API #13237

Merged
merged 23 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Tiered Caching] Add dimension-based stats to ICache implementations. ([#12531](https://github.com/opensearch-project/OpenSearch/pull/12531))
- Add changes for overriding remote store and replication settings during snapshot restore. ([#11868](https://github.com/opensearch-project/OpenSearch/pull/11868))
- Add an individual setting of rate limiter for segment replication ([#12959](https://github.com/opensearch-project/OpenSearch/pull/12959))
- [Tiered Caching] Expose new cache stats API ([#13237](https://github.com/opensearch-project/OpenSearch/pull/13237))

### Dependencies
- Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ private EhcacheDiskCache(Builder<K, V> builder) {
this.ehCacheEventListener = new EhCacheEventListener(builder.getRemovalListener(), builder.getWeigher());
this.cache = buildCache(Duration.ofMillis(expireAfterAccess.getMillis()), builder);
List<String> dimensionNames = Objects.requireNonNull(builder.dimensionNames, "Dimension names can't be null");
this.cacheStatsHolder = new CacheStatsHolder(dimensionNames);
this.cacheStatsHolder = new CacheStatsHolder(dimensionNames, EhcacheDiskCacheFactory.EHCACHE_DISK_CACHE_NAME);
}

@SuppressWarnings({ "rawtypes" })
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,286 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices;

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.indices.stats.CommonStatsFlags;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.Randomness;
import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.service.NodeCacheStats;
import org.opensearch.common.cache.stats.ImmutableCacheStats;
import org.opensearch.common.cache.stats.ImmutableCacheStatsHolderTests;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.cache.request.RequestCacheStats;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase;
import org.opensearch.test.hamcrest.OpenSearchAssertions;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse;

// Use a single data node to simplify logic about cache stats across different shards.
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 1)
public class CacheStatsAPIIndicesRequestCacheIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase {
public CacheStatsAPIIndicesRequestCacheIT(Settings settings) {
super(settings);
}

@ParametersFactory
public static Collection<Object[]> parameters() {
return Arrays.asList(
new Object[] { Settings.builder().put(FeatureFlags.PLUGGABLE_CACHE, "true").build() },
new Object[] { Settings.builder().put(FeatureFlags.PLUGGABLE_CACHE, "false").build() }
);
}

public void testCacheStatsAPIWIthOnHeapCache() throws Exception {
String index1Name = "index1";
String index2Name = "index2";
Client client = client();

startIndex(client, index1Name);
startIndex(client, index2Name);

// Search twice for the same doc in index 1
for (int i = 0; i < 2; i++) {
searchIndex(client, index1Name, "");
}

// Search once for a doc in index 2
searchIndex(client, index2Name, "");

// First, aggregate by indices only
Map<String, Object> xContentMap = getNodeCacheStatsXContentMap(client, List.of(IndicesRequestCache.INDEX_DIMENSION_NAME));

List<String> index1Keys = List.of(
CacheType.INDICES_REQUEST_CACHE.getApiRepresentation(),
IndicesRequestCache.INDEX_DIMENSION_NAME,
index1Name
);
// Since we searched twice, we expect to see 1 hit, 1 miss and 1 entry for index 1
ImmutableCacheStats expectedStats = new ImmutableCacheStats(1, 1, 0, 0, 1);
checkCacheStatsAPIResponse(xContentMap, index1Keys, expectedStats, false, true);
// Get the request size for one request, so we can reuse it for next index
int requestSize = (int) ((Map<String, Object>) ImmutableCacheStatsHolderTests.getValueFromNestedXContentMap(
xContentMap,
index1Keys
)).get(ImmutableCacheStats.Fields.MEMORY_SIZE_IN_BYTES);
assertTrue(requestSize > 0);

List<String> index2Keys = List.of(
CacheType.INDICES_REQUEST_CACHE.getApiRepresentation(),
IndicesRequestCache.INDEX_DIMENSION_NAME,
index2Name
);
// We searched once in index 2, we expect 1 miss + 1 entry
expectedStats = new ImmutableCacheStats(0, 1, 0, requestSize, 1);
checkCacheStatsAPIResponse(xContentMap, index2Keys, expectedStats, true, true);

// The total stats for the node should be 1 hit, 2 misses, and 2 entries
expectedStats = new ImmutableCacheStats(1, 2, 0, 2 * requestSize, 2);
List<String> totalStatsKeys = List.of(CacheType.INDICES_REQUEST_CACHE.getApiRepresentation());
checkCacheStatsAPIResponse(xContentMap, totalStatsKeys, expectedStats, true, true);

// Aggregate by shards only
xContentMap = getNodeCacheStatsXContentMap(client, List.of(IndicesRequestCache.SHARD_ID_DIMENSION_NAME));

List<String> index1Shard0Keys = List.of(
CacheType.INDICES_REQUEST_CACHE.getApiRepresentation(),
IndicesRequestCache.SHARD_ID_DIMENSION_NAME,
"[" + index1Name + "][0]"
);

expectedStats = new ImmutableCacheStats(1, 1, 0, requestSize, 1);
checkCacheStatsAPIResponse(xContentMap, index1Shard0Keys, expectedStats, true, true);

List<String> index2Shard0Keys = List.of(
CacheType.INDICES_REQUEST_CACHE.getApiRepresentation(),
IndicesRequestCache.SHARD_ID_DIMENSION_NAME,
"[" + index2Name + "][0]"
);
expectedStats = new ImmutableCacheStats(0, 1, 0, requestSize, 1);
checkCacheStatsAPIResponse(xContentMap, index2Shard0Keys, expectedStats, true, true);

// Aggregate by indices and shards
xContentMap = getNodeCacheStatsXContentMap(
client,
List.of(IndicesRequestCache.INDEX_DIMENSION_NAME, IndicesRequestCache.SHARD_ID_DIMENSION_NAME)
);

index1Keys = List.of(
CacheType.INDICES_REQUEST_CACHE.getApiRepresentation(),
IndicesRequestCache.INDEX_DIMENSION_NAME,
index1Name,
IndicesRequestCache.SHARD_ID_DIMENSION_NAME,
"[" + index1Name + "][0]"
);

expectedStats = new ImmutableCacheStats(1, 1, 0, requestSize, 1);
checkCacheStatsAPIResponse(xContentMap, index1Keys, expectedStats, true, true);

index2Keys = List.of(
CacheType.INDICES_REQUEST_CACHE.getApiRepresentation(),
IndicesRequestCache.INDEX_DIMENSION_NAME,
index2Name,
IndicesRequestCache.SHARD_ID_DIMENSION_NAME,
"[" + index2Name + "][0]"
);

expectedStats = new ImmutableCacheStats(0, 1, 0, requestSize, 1);
checkCacheStatsAPIResponse(xContentMap, index2Keys, expectedStats, true, true);

}

// TODO: Add testCacheStatsAPIWithTieredCache when TSC stats implementation PR is merged

public void testStatsMatchOldApi() throws Exception {
// The main purpose of this test is to check that the new and old APIs are both correctly estimating memory size,
// using the logic that includes the overhead memory in ICacheKey.
String index = "index";
Client client = client();
startIndex(client, index);

int numKeys = Randomness.get().nextInt(100);
for (int i = 0; i < numKeys; i++) {
searchIndex(client, index, String.valueOf(i));
}
// Get some hits as well
for (int i = 0; i < numKeys / 2; i++) {
searchIndex(client, index, String.valueOf(i));
}

RequestCacheStats oldApiStats = client.admin()
.indices()
.prepareStats(index)
.setRequestCache(true)
.get()
.getTotal()
.getRequestCache();
assertNotEquals(0, oldApiStats.getMemorySizeInBytes());

List<String> xContentMapKeys = List.of(CacheType.INDICES_REQUEST_CACHE.getApiRepresentation());
Map<String, Object> xContentMap = getNodeCacheStatsXContentMap(client, List.of());
ImmutableCacheStats expected = new ImmutableCacheStats(
oldApiStats.getHitCount(),
oldApiStats.getMissCount(),
oldApiStats.getEvictions(),
oldApiStats.getMemorySizeInBytes(),
0
);
// Don't check entries, as the old API doesn't track this
checkCacheStatsAPIResponse(xContentMap, xContentMapKeys, expected, true, false);
}

private void startIndex(Client client, String indexName) throws InterruptedException {
assertAcked(
client.admin()
.indices()
.prepareCreate(indexName)
.setMapping("k", "type=keyword")
.setSettings(
Settings.builder()
.put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
)
.get()
);
indexRandom(true, client.prepareIndex(indexName).setSource("k", "hello"));
ensureSearchable(indexName);
}

private SearchResponse searchIndex(Client client, String index, String searchSuffix) {
SearchResponse resp = client.prepareSearch(index)
.setRequestCache(true)
.setQuery(QueryBuilders.termQuery("k", "hello" + searchSuffix))
.get();
assertSearchResponse(resp);
OpenSearchAssertions.assertAllSuccessful(resp);
// assertEquals(1, resp.getHits().getTotalHits().value);
return resp;
}

private static Map<String, Object> getNodeCacheStatsXContentMap(Client client, List<String> aggregationLevels) throws IOException {

CommonStatsFlags statsFlags = new CommonStatsFlags();
statsFlags.includeAllCacheTypes();

NodesStatsResponse nodeStatsResponse = client.admin()
.cluster()
.prepareNodesStats("data:true")
.addMetric(NodesStatsRequest.Metric.CACHE_STATS.metricName())
.setIndices(statsFlags)
.get();
// Can always get the first data node as there's only one in this test suite
assertEquals(1, nodeStatsResponse.getNodes().size());
NodeCacheStats ncs = nodeStatsResponse.getNodes().get(0).getNodeCacheStats();

XContentBuilder builder = XContentFactory.jsonBuilder();
Map<String, String> paramMap = new HashMap<>();
if (!aggregationLevels.isEmpty()) {
paramMap.put("level", String.join(",", aggregationLevels));
}
ToXContent.Params params = new ToXContent.MapParams(paramMap);

builder.startObject();
ncs.toXContent(builder, params);
builder.endObject();

String resultString = builder.toString();
return XContentHelper.convertToMap(MediaTypeRegistry.JSON.xContent(), resultString, true);
}

private static void checkCacheStatsAPIResponse(
Map<String, Object> xContentMap,
List<String> xContentMapKeys,
ImmutableCacheStats expectedStats,
boolean checkMemorySize,
boolean checkEntries
) {
// Assumes the keys point to a level whose keys are the field values ("size_in_bytes", "evictions", etc) and whose values store
// those stats
Map<String, Object> aggregatedStatsResponse = (Map<String, Object>) ImmutableCacheStatsHolderTests.getValueFromNestedXContentMap(
xContentMap,
xContentMapKeys
);
assertNotNull(aggregatedStatsResponse);
assertEquals(expectedStats.getHits(), (int) aggregatedStatsResponse.get(ImmutableCacheStats.Fields.HIT_COUNT));
assertEquals(expectedStats.getMisses(), (int) aggregatedStatsResponse.get(ImmutableCacheStats.Fields.MISS_COUNT));
assertEquals(expectedStats.getEvictions(), (int) aggregatedStatsResponse.get(ImmutableCacheStats.Fields.EVICTIONS));
if (checkMemorySize) {
assertEquals(
expectedStats.getSizeInBytes(),
(int) aggregatedStatsResponse.get(ImmutableCacheStats.Fields.MEMORY_SIZE_IN_BYTES)
);
}
if (checkEntries) {
assertEquals(expectedStats.getEntries(), (int) aggregatedStatsResponse.get(ImmutableCacheStats.Fields.ENTRIES));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.cluster.routing.WeightedRoutingStats;
import org.opensearch.cluster.service.ClusterManagerThrottlingStats;
import org.opensearch.common.Nullable;
import org.opensearch.common.cache.service.NodeCacheStats;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.indices.breaker.AllCircuitBreakerStats;
Expand Down Expand Up @@ -158,6 +159,9 @@
@Nullable
private AdmissionControlStats admissionControlStats;

@Nullable
private NodeCacheStats nodeCacheStats;

public NodeStats(StreamInput in) throws IOException {
super(in);
timestamp = in.readVLong();
Expand Down Expand Up @@ -234,6 +238,11 @@
} else {
admissionControlStats = null;
}
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
nodeCacheStats = in.readOptionalWriteable(NodeCacheStats::new);
} else {
nodeCacheStats = null;

Check warning on line 244 in server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java#L244

Added line #L244 was not covered by tests
}
peteralfonsi marked this conversation as resolved.
Show resolved Hide resolved
}

public NodeStats(
Expand Down Expand Up @@ -264,7 +273,8 @@
@Nullable SearchPipelineStats searchPipelineStats,
@Nullable SegmentReplicationRejectionStats segmentReplicationRejectionStats,
@Nullable RepositoriesStats repositoriesStats,
@Nullable AdmissionControlStats admissionControlStats
@Nullable AdmissionControlStats admissionControlStats,
@Nullable NodeCacheStats nodeCacheStats
) {
super(node);
this.timestamp = timestamp;
Expand Down Expand Up @@ -294,6 +304,7 @@
this.segmentReplicationRejectionStats = segmentReplicationRejectionStats;
this.repositoriesStats = repositoriesStats;
this.admissionControlStats = admissionControlStats;
this.nodeCacheStats = nodeCacheStats;
}

public long getTimestamp() {
Expand Down Expand Up @@ -451,6 +462,11 @@
return admissionControlStats;
}

@Nullable
public NodeCacheStats getNodeCacheStats() {
return nodeCacheStats;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down Expand Up @@ -506,6 +522,9 @@
if (out.getVersion().onOrAfter(Version.V_2_12_0)) {
out.writeOptionalWriteable(admissionControlStats);
}
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeOptionalWriteable(nodeCacheStats);
}
}

@Override
Expand Down Expand Up @@ -609,6 +628,9 @@
if (getAdmissionControlStats() != null) {
getAdmissionControlStats().toXContent(builder, params);
}
if (getNodeCacheStats() != null) {
getNodeCacheStats().toXContent(builder, params);

Check warning on line 632 in server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java#L632

Added line #L632 was not covered by tests
}
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ public enum Metric {
RESOURCE_USAGE_STATS("resource_usage_stats"),
SEGMENT_REPLICATION_BACKPRESSURE("segment_replication_backpressure"),
REPOSITORIES("repositories"),
ADMISSION_CONTROL("admission_control");
ADMISSION_CONTROL("admission_control"),
CACHE_STATS("caches");
peteralfonsi marked this conversation as resolved.
Show resolved Hide resolved

private String metricName;

Expand Down
Loading
Loading