Skip to content

Commit

Permalink
Simplify InternalClusterInfoService (elastic#103370)
Browse files Browse the repository at this point in the history
  • Loading branch information
idegtiarenko authored Dec 18, 2023
1 parent c702105 commit 997fbb3
Show file tree
Hide file tree
Showing 21 changed files with 59 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ public void testValuesSmokeScreen() throws IOException, ExecutionException, Inte
ClusterStatsResponse response = clusterAdmin().prepareClusterStats().get();
String msg = response.toString();
assertThat(msg, response.getTimestamp(), greaterThan(946681200000L)); // 1 Jan 2000
assertThat(msg, response.indicesStats.getStore().getSizeInBytes(), greaterThan(0L));
assertThat(msg, response.indicesStats.getStore().sizeInBytes(), greaterThan(0L));

assertThat(msg, response.nodesStats.getFs().getTotal().getBytes(), greaterThan(0L));
assertThat(msg, response.nodesStats.getJvm().getVersions().size(), greaterThan(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1745,12 +1745,12 @@ public void testReservesBytesDuringPeerRecoveryPhaseOne() throws Exception {
.getNodes()
.get(0)
.getIndices();
assertThat(nodeIndicesStats.getStore().getReservedSize().getBytes(), equalTo(0L));
assertThat(nodeIndicesStats.getStore().reservedSizeInBytes(), equalTo(0L));
assertThat(
nodeIndicesStats.getShardStats(clusterState.metadata().index(indexName).getIndex())
.stream()
.flatMap(s -> Arrays.stream(s.getShards()))
.map(s -> s.getStats().getStore().getReservedSize().getBytes())
.map(s -> s.getStats().getStore().reservedSizeInBytes())
.toList(),
everyItem(equalTo(StoreStats.UNKNOWN_RESERVED_BYTES))
);
Expand All @@ -1766,8 +1766,7 @@ public void testReservesBytesDuringPeerRecoveryPhaseOne() throws Exception {
.get(0)
.getIndices()
.getStore()
.getReservedSize()
.getBytes(),
.reservedSizeInBytes(),
greaterThan(0L)
);
}
Expand All @@ -1785,7 +1784,7 @@ public void testReservesBytesDuringPeerRecoveryPhaseOne() throws Exception {
.get()
.getNodes()
.stream()
.mapToLong(n -> n.getIndices().getStore().getReservedSize().getBytes())
.mapToLong(n -> n.getIndices().getStore().reservedSizeInBytes())
.sum(),
equalTo(0L)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public void testRepositoryThrottlingStats() throws Exception {
IndexStats indexStats = indicesStats.getIndex("test-idx");
long totalSizeInBytes = 0;
for (ShardStats shard : indexStats.getShards()) {
totalSizeInBytes += shard.getStats().getStore().getSizeInBytes();
totalSizeInBytes += shard.getStats().getStore().sizeInBytes();
}
logger.info("--> total shards size: {} bytes", totalSizeInBytes);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public int calculate(Integer numberOfShards, ByteSizeValue maxPrimaryShardSize,
}
} else if (maxPrimaryShardSize != null) {
int sourceIndexShardsNum = sourceMetadata.getNumberOfShards();
long sourceIndexStorageBytes = indexStoreStats.getSizeInBytes();
long sourceIndexStorageBytes = indexStoreStats.sizeInBytes();
long maxPrimaryShardSizeBytes = maxPrimaryShardSize.getBytes();
long minShardsNum = sourceIndexStorageBytes / maxPrimaryShardSizeBytes;
if (minShardsNum * maxPrimaryShardSizeBytes < sourceIndexStorageBytes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
import org.elasticsearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -97,7 +96,6 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt
private final Object mutex = new Object();
private final List<ActionListener<ClusterInfo>> nextRefreshListeners = new ArrayList<>();

private final ClusterService clusterService;
private AsyncRefresh currentRefresh;
private RefreshScheduler refreshScheduler;

Expand All @@ -108,7 +106,6 @@ public InternalClusterInfoService(Settings settings, ClusterService clusterServi
this.indicesStatsSummary = IndicesStatsSummary.EMPTY;
this.threadPool = threadPool;
this.client = client;
this.clusterService = clusterService;
this.updateFrequency = INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.get(settings);
this.fetchTimeout = INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING.get(settings);
this.enabled = DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.get(settings);
Expand Down Expand Up @@ -250,7 +247,6 @@ public void onResponse(IndicesStatsResponse indicesStatsResponse) {
final Map<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace.Builder> reservedSpaceBuilders =
new HashMap<>();
buildShardLevelInfo(
clusterService.state().routingTable(),
adjustShardStats(stats),
shardSizeByIdentifierBuilder,
shardDataSetSizeBuilder,
Expand Down Expand Up @@ -445,15 +441,14 @@ public void addListener(Consumer<ClusterInfo> clusterInfoConsumer) {
}

static void buildShardLevelInfo(
RoutingTable routingTable,
ShardStats[] stats,
Map<String, Long> shardSizes,
Map<ShardId, Long> shardDataSetSizeBuilder,
Map<ClusterInfo.NodeAndShard, String> dataPathByShard,
Map<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace.Builder> reservedSpaceByShard
) {
for (ShardStats s : stats) {
final ShardRouting shardRouting = routingTable.deduplicate(s.getShardRouting());
final ShardRouting shardRouting = s.getShardRouting();
dataPathByShard.put(ClusterInfo.NodeAndShard.from(shardRouting), s.getDataPath());

final StoreStats storeStats = s.getStats().getStore();
Expand All @@ -462,7 +457,7 @@ static void buildShardLevelInfo(
}
final long size = storeStats.sizeInBytes();
final long dataSetSize = storeStats.totalDataSetSizeInBytes();
final long reserved = storeStats.getReservedSize().getBytes();
final long reserved = storeStats.reservedSizeInBytes();

final String shardIdentifier = ClusterInfo.shardIdentifierFromRouting(shardRouting);
logger.trace("shard: {} size: {} reserved: {}", shardIdentifier, size, reserved);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,33 +148,6 @@ public IndexShardRoutingTable shardRoutingTable(ShardId shardId) {
return shard;
}

/**
* Try to deduplicate the given shard routing with an equal instance found in this routing table. This is used by the logic of the
* {@link org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider} and
* {@link org.elasticsearch.cluster.InternalClusterInfoService} to deduplicate instances created by a master node and those read from
* the network to speed up the use of {@link ShardRouting} as a map key in {@link org.elasticsearch.cluster.ClusterInfo#getDataPath}.
*
* @param shardRouting shard routing to deduplicate
* @return deduplicated shard routing from this routing table if an equivalent shard routing was found or the given instance otherwise
*/
public ShardRouting deduplicate(ShardRouting shardRouting) {
final IndexRoutingTable indexShardRoutingTable = indicesRouting.get(shardRouting.index().getName());
if (indexShardRoutingTable == null) {
return shardRouting;
}
final IndexShardRoutingTable shardRoutingTable = indexShardRoutingTable.shard(shardRouting.id());
if (shardRoutingTable == null) {
return shardRouting;
}
for (int i = 0; i < shardRoutingTable.size(); i++) {
ShardRouting found = shardRoutingTable.shard(i);
if (shardRouting.equals(found)) {
return found;
}
}
return shardRouting;
}

@Nullable
public ShardRouting getByAllocationId(ShardId shardId, String allocationId) {
final IndexRoutingTable indexRoutingTable = index(shardId.getIndex());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public long getDeleted() {

/**
* Returns the total size in bytes of all documents in this stats.
* This value may be more reliable than {@link StoreStats#getSizeInBytes()} in estimating the index size.
* This value may be more reliable than {@link StoreStats#sizeInBytes()} in estimating the index size.
*/
public long getTotalSizeInBytes() {
return totalSizeInBytes;
Expand Down
34 changes: 13 additions & 21 deletions server/src/main/java/org/elasticsearch/index/store/StoreStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class StoreStats implements Writeable, ToXContentFragment {

private long sizeInBytes;
private long totalDataSetSizeInBytes;
private long reservedSize;
private long reservedSizeInBytes;

public StoreStats() {

Expand All @@ -47,9 +47,9 @@ public StoreStats(StreamInput in) throws IOException {
totalDataSetSizeInBytes = sizeInBytes;
}
if (in.getTransportVersion().onOrAfter(RESERVED_BYTES_VERSION)) {
reservedSize = in.readZLong();
reservedSizeInBytes = in.readZLong();
} else {
reservedSize = UNKNOWN_RESERVED_BYTES;
reservedSizeInBytes = UNKNOWN_RESERVED_BYTES;
}
}

Expand All @@ -63,7 +63,7 @@ public StoreStats(long sizeInBytes, long totalDataSetSizeInBytes, long reservedS
assert reservedSize == UNKNOWN_RESERVED_BYTES || reservedSize >= 0 : reservedSize;
this.sizeInBytes = sizeInBytes;
this.totalDataSetSizeInBytes = totalDataSetSizeInBytes;
this.reservedSize = reservedSize;
this.reservedSizeInBytes = reservedSize;
}

public void add(StoreStats stats) {
Expand All @@ -72,7 +72,7 @@ public void add(StoreStats stats) {
}
sizeInBytes += stats.sizeInBytes;
totalDataSetSizeInBytes += stats.totalDataSetSizeInBytes;
reservedSize = ignoreIfUnknown(reservedSize) + ignoreIfUnknown(stats.reservedSize);
reservedSizeInBytes = ignoreIfUnknown(reservedSizeInBytes) + ignoreIfUnknown(stats.reservedSizeInBytes);
}

private static long ignoreIfUnknown(long reservedSize) {
Expand All @@ -83,28 +83,20 @@ public long sizeInBytes() {
return sizeInBytes;
}

public long getSizeInBytes() {
return sizeInBytes;
}

public ByteSizeValue size() {
return ByteSizeValue.ofBytes(sizeInBytes);
}

public ByteSizeValue getSize() {
return size();
public long totalDataSetSizeInBytes() {
return totalDataSetSizeInBytes;
}

public ByteSizeValue totalDataSetSize() {
return ByteSizeValue.ofBytes(totalDataSetSizeInBytes);
}

public ByteSizeValue getTotalDataSetSize() {
return totalDataSetSize();
}

public long totalDataSetSizeInBytes() {
return totalDataSetSizeInBytes;
public long reservedSizeInBytes() {
return reservedSizeInBytes;
}

/**
Expand All @@ -113,7 +105,7 @@ public long totalDataSetSizeInBytes() {
* the reserved size is unknown.
*/
public ByteSizeValue getReservedSize() {
return ByteSizeValue.ofBytes(reservedSize);
return ByteSizeValue.ofBytes(reservedSizeInBytes);
}

@Override
Expand All @@ -123,7 +115,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(totalDataSetSizeInBytes);
}
if (out.getTransportVersion().onOrAfter(RESERVED_BYTES_VERSION)) {
out.writeZLong(reservedSize);
out.writeZLong(reservedSizeInBytes);
}
}

Expand All @@ -144,12 +136,12 @@ public boolean equals(Object o) {
StoreStats that = (StoreStats) o;
return sizeInBytes == that.sizeInBytes
&& totalDataSetSizeInBytes == that.totalDataSetSizeInBytes
&& reservedSize == that.reservedSize;
&& reservedSizeInBytes == that.reservedSizeInBytes;
}

@Override
public int hashCode() {
return Objects.hash(sizeInBytes, totalDataSetSizeInBytes, reservedSize);
return Objects.hash(sizeInBytes, totalDataSetSizeInBytes, reservedSizeInBytes);
}

static final class Fields {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ private Table buildTable(RestRequest request, final ClusterStateResponse state,

table.startRow();
table.addCell(shardCount);
table.addCell(nodeStats.getIndices().getStore().getSize());
table.addCell(nodeStats.getIndices().getStore().size());
table.addCell(used < 0 ? null : ByteSizeValue.ofBytes(used));
table.addCell(avail.getBytes() < 0 ? null : avail);
table.addCell(total.getBytes() < 0 ? null : total);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,8 @@ Table buildTable(RestRequest request, ClusterStateResponse state, IndicesStatsRe
}
table.addCell(shard.state());
table.addCell(getOrNull(commonStats, CommonStats::getDocs, DocsStats::getCount));
table.addCell(getOrNull(commonStats, CommonStats::getStore, StoreStats::getSize));
table.addCell(getOrNull(commonStats, CommonStats::getStore, StoreStats::getTotalDataSetSize));
table.addCell(getOrNull(commonStats, CommonStats::getStore, StoreStats::size));
table.addCell(getOrNull(commonStats, CommonStats::getStore, StoreStats::totalDataSetSize));
if (shard.assignedToNode()) {
String ip = state.getState().nodes().get(shard.currentNodeId()).getHostAddress();
String nodeId = shard.currentNodeId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
import org.elasticsearch.cluster.routing.RecoverySource.PeerRecoverySource;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
import org.elasticsearch.cluster.routing.UnassignedInfo;
Expand Down Expand Up @@ -137,14 +136,7 @@ public void testFillShardLevelInfo() {
Map<String, Long> shardSizes = new HashMap<>();
Map<ShardId, Long> shardDataSetSizes = new HashMap<>();
Map<ClusterInfo.NodeAndShard, String> routingToPath = new HashMap<>();
InternalClusterInfoService.buildShardLevelInfo(
RoutingTable.EMPTY_ROUTING_TABLE,
stats,
shardSizes,
shardDataSetSizes,
routingToPath,
new HashMap<>()
);
InternalClusterInfoService.buildShardLevelInfo(stats, shardSizes, shardDataSetSizes, routingToPath, new HashMap<>());

assertThat(
shardSizes,
Expand Down
22 changes: 11 additions & 11 deletions server/src/test/java/org/elasticsearch/index/store/StoreTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -775,22 +775,22 @@ public void testStoreStats() throws IOException {
final long localStoreSizeDelta = randomLongBetween(-initialStoreSize, initialStoreSize);
final long reservedBytes = randomBoolean() ? StoreStats.UNKNOWN_RESERVED_BYTES : randomLongBetween(0L, Integer.MAX_VALUE);
StoreStats stats = store.stats(reservedBytes, size -> size + localStoreSizeDelta);
assertEquals(initialStoreSize, stats.totalDataSetSize().getBytes());
assertEquals(initialStoreSize + localStoreSizeDelta, stats.getSize().getBytes());
assertEquals(reservedBytes, stats.getReservedSize().getBytes());
assertEquals(initialStoreSize, stats.totalDataSetSizeInBytes());
assertEquals(initialStoreSize + localStoreSizeDelta, stats.sizeInBytes());
assertEquals(reservedBytes, stats.reservedSizeInBytes());

stats.add(null);
assertEquals(initialStoreSize, stats.totalDataSetSize().getBytes());
assertEquals(initialStoreSize + localStoreSizeDelta, stats.getSize().getBytes());
assertEquals(reservedBytes, stats.getReservedSize().getBytes());
assertEquals(initialStoreSize, stats.totalDataSetSizeInBytes());
assertEquals(initialStoreSize + localStoreSizeDelta, stats.sizeInBytes());
assertEquals(reservedBytes, stats.reservedSizeInBytes());

final long otherStatsDataSetBytes = randomLongBetween(0L, Integer.MAX_VALUE);
final long otherStatsLocalBytes = randomLongBetween(0L, Integer.MAX_VALUE);
final long otherStatsReservedBytes = randomBoolean() ? StoreStats.UNKNOWN_RESERVED_BYTES : randomLongBetween(0L, Integer.MAX_VALUE);
stats.add(new StoreStats(otherStatsLocalBytes, otherStatsDataSetBytes, otherStatsReservedBytes));
assertEquals(initialStoreSize + otherStatsDataSetBytes, stats.totalDataSetSize().getBytes());
assertEquals(initialStoreSize + otherStatsLocalBytes + localStoreSizeDelta, stats.getSize().getBytes());
assertEquals(Math.max(reservedBytes, 0L) + Math.max(otherStatsReservedBytes, 0L), stats.getReservedSize().getBytes());
assertEquals(initialStoreSize + otherStatsDataSetBytes, stats.totalDataSetSizeInBytes());
assertEquals(initialStoreSize + otherStatsLocalBytes + localStoreSizeDelta, stats.sizeInBytes());
assertEquals(Math.max(reservedBytes, 0L) + Math.max(otherStatsReservedBytes, 0L), stats.reservedSizeInBytes());

Directory dir = store.directory();
final long length;
Expand All @@ -805,8 +805,8 @@ public void testStoreStats() throws IOException {

assertTrue(numNonExtraFiles(store) > 0);
stats = store.stats(0L, size -> size + localStoreSizeDelta);
assertEquals(initialStoreSize + length, stats.totalDataSetSize().getBytes());
assertEquals(initialStoreSize + localStoreSizeDelta + length, stats.getSizeInBytes());
assertEquals(initialStoreSize + length, stats.totalDataSetSizeInBytes());
assertEquals(initialStoreSize + localStoreSizeDelta + length, stats.sizeInBytes());

deleteContent(store.directory());
IOUtils.close(store);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ ShardStats[] adjustShardStats(ShardStats[] shardsStats) {
var storeStats = new StoreStats(
shardSizeFunctionCopy.apply(shardRouting),
shardSizeFunctionCopy.apply(shardRouting),
shardStats.getStats().store.getReservedSize().getBytes()
shardStats.getStats().store.reservedSizeInBytes()
);
var commonStats = new CommonStats(new CommonStatsFlags(CommonStatsFlags.Flag.Store));
commonStats.store = storeStats;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ public void testScale() throws Exception {
capacity().results().get("frozen").requiredCapacity().total().storage(),
equalTo(
ByteSizeValue.ofBytes(
(long) (statsResponse.getPrimaries().store.totalDataSetSize().getBytes()
* FrozenStorageDeciderService.DEFAULT_PERCENTAGE) / 100
(long) (statsResponse.getPrimaries().store.totalDataSetSizeInBytes() * FrozenStorageDeciderService.DEFAULT_PERCENTAGE)
/ 100
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void testScaleUp() throws IOException, InterruptedException {
capacity();

IndicesStatsResponse stats = indicesAdmin().prepareStats(dsName).clear().setStore(true).get();
long used = stats.getTotal().getStore().getSizeInBytes();
long used = stats.getTotal().getStore().sizeInBytes();
long maxShardSize = Arrays.stream(stats.getShards()).mapToLong(s -> s.getStats().getStore().sizeInBytes()).max().orElseThrow();
// As long as usage is above low watermark, we will trigger a proactive scale up, since the simulated shards have an in-sync
// set and therefore allocating these do not skip the low watermark check in the disk threshold decider.
Expand Down
Loading

0 comments on commit 997fbb3

Please sign in to comment.