Skip to content

Commit

Permalink
Add local or remote stats using array to extend upcoming stats automa…
Browse files Browse the repository at this point in the history
…tically

Signed-off-by: Aman Khare <[email protected]>
  • Loading branch information
Aman Khare committed Oct 21, 2023
1 parent f66d81e commit 6386f7a
Show file tree
Hide file tree
Showing 11 changed files with 98 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ public interface PersistedState extends Closeable {
* Returns the stats for the persistence layer for {@link CoordinationState}.
* @return PersistedStateStats
*/
PersistedStateStats getPersistedStateStats();
PersistedStateStats getStats();

/**
* Marks the last accepted cluster state as committed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -866,13 +866,16 @@ protected void doStart() {

@Override
public DiscoveryStats stats() {
CoordinationState.PersistedState remotePersistedState = persistedStateRegistry.getPersistedState(
PersistedStateRegistry.PersistedStateType.REMOTE
);
ClusterStateStats clusterStateStats = clusterManagerService.getStateStats();
if (remotePersistedState != null) {
clusterStateStats.setRemoteStateStats(remotePersistedState.getPersistedStateStats());
}
ArrayList<PersistedStateStats> stats = new ArrayList<>();
Stream.of(PersistedStateRegistry.PersistedStateType.values()).forEach(stateType -> {
if (persistedStateRegistry.getPersistedState(stateType) != null) {
if (persistedStateRegistry.getPersistedState(stateType).getStats() != null) {
stats.add(persistedStateRegistry.getPersistedState(stateType).getStats());
}
}
});
clusterStateStats.setPersistenceStats(stats);
return new DiscoveryStats(new PendingClusterStateStats(0, 0, 0), publicationHandler.stats(), clusterStateStats);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void setLastAcceptedState(ClusterState clusterState) {
}

@Override
public PersistedStateStats getPersistedStateStats() {
public PersistedStateStats getStats() {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,11 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return builder;
}

public void stateUploadFailed() {
public void stateFailed() {
failedCount.incrementAndGet();
}

public void stateUploaded() {
public void stateSucceeded() {
successCount.incrementAndGet();
}

Expand All @@ -93,7 +93,7 @@ public void stateUploaded() {
*
* @param timeTakenInUpload time taken in uploading the cluster state to remote
*/
public void stateUploadTook(long timeTakenInUpload) {
public void stateTook(long timeTakenInUpload) {
totalTimeInMillis.addAndGet(timeTakenInUpload);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

/**
Expand All @@ -25,73 +27,79 @@
*/
public class ClusterStateStats implements Writeable, ToXContentObject {

private AtomicLong stateUpdateSuccess = new AtomicLong(0);
private AtomicLong stateUpdateTotalTimeInMillis = new AtomicLong(0);
private AtomicLong stateUpdateFailed = new AtomicLong(0);
private PersistedStateStats remoteStateStats = null;
private AtomicLong updateSuccess = new AtomicLong(0);
private AtomicLong updateTotalTimeInMillis = new AtomicLong(0);
private AtomicLong updateFailed = new AtomicLong(0);
private List<PersistedStateStats> persistenceStats = new ArrayList<>();

public ClusterStateStats() {}

public long getStateUpdateSuccess() {
return stateUpdateSuccess.get();
public long getUpdateSuccess() {
return updateSuccess.get();
}

public long getStateUpdateTotalTimeInMillis() {
return stateUpdateTotalTimeInMillis.get();
public long getUpdateTotalTimeInMillis() {
return updateTotalTimeInMillis.get();
}

public long getStateUpdateFailed() {
return stateUpdateFailed.get();
public long getUpdateFailed() {
return updateFailed.get();
}

public List<PersistedStateStats> getPersistenceStats() {
return persistenceStats;
}

public void stateUpdated() {
stateUpdateSuccess.incrementAndGet();
updateSuccess.incrementAndGet();
}

public void stateUpdateFailed() {
stateUpdateFailed.incrementAndGet();
updateFailed.incrementAndGet();
}

public void stateUpdateTook(long stateUpdateTime) {
stateUpdateTotalTimeInMillis.addAndGet(stateUpdateTime);
updateTotalTimeInMillis.addAndGet(stateUpdateTime);
}

public void setRemoteStateStats(PersistedStateStats remoteStateStats) {
this.remoteStateStats = remoteStateStats;
public ClusterStateStats setPersistenceStats(List<PersistedStateStats> persistenceStats) {
this.persistenceStats = persistenceStats;
return this;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(stateUpdateSuccess.get());
out.writeVLong(stateUpdateTotalTimeInMillis.get());
out.writeVLong(stateUpdateFailed.get());
if (remoteStateStats != null) {
out.writeBoolean(true);
remoteStateStats.writeTo(out);
} else {
out.writeBoolean(false);
out.writeVLong(updateSuccess.get());
out.writeVLong(updateTotalTimeInMillis.get());
out.writeVLong(updateFailed.get());
out.writeVInt(persistenceStats.size());
for (PersistedStateStats stats : persistenceStats) {
stats.writeTo(out);
}
}

public ClusterStateStats(StreamInput in) throws IOException {
this.stateUpdateSuccess = new AtomicLong(in.readVLong());
this.stateUpdateTotalTimeInMillis = new AtomicLong(in.readVLong());
this.stateUpdateFailed = new AtomicLong(in.readVLong());
if (in.readBoolean()) {
this.remoteStateStats = new PersistedStateStats(in);
this.updateSuccess = new AtomicLong(in.readVLong());
this.updateTotalTimeInMillis = new AtomicLong(in.readVLong());
this.updateFailed = new AtomicLong(in.readVLong());
int persistedStatsSize = in.readVInt();
this.persistenceStats = new ArrayList<>();
for (int statsNumber = 0; statsNumber < persistedStatsSize; statsNumber++) {
PersistedStateStats stats = new PersistedStateStats(in);
this.persistenceStats.add(stats);
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.CLUSTER_STATE_STATS);
builder.startObject(Fields.OVERALL);
builder.field(Fields.UPDATE_COUNT, getStateUpdateSuccess());
builder.field(Fields.TOTAL_TIME_IN_MILLIS, getStateUpdateTotalTimeInMillis());
builder.field(Fields.FAILED_COUNT, getStateUpdateFailed());
builder.field(Fields.UPDATE_COUNT, getUpdateSuccess());
builder.field(Fields.TOTAL_TIME_IN_MILLIS, getUpdateTotalTimeInMillis());
builder.field(Fields.FAILED_COUNT, getUpdateFailed());
builder.endObject();
if (remoteStateStats != null) {
remoteStateStats.toXContent(builder, params);
for (PersistedStateStats stats : persistenceStats) {
stats.toXContent(builder, params);
}
builder.endObject();
return builder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,8 @@ public void setLastAcceptedState(ClusterState clusterState) {
}

@Override
public PersistedStateStats getPersistedStateStats() {
public PersistedStateStats getStats() {
// Note: These stats are not published yet, will come in future
return null;
}

Expand Down Expand Up @@ -731,7 +732,7 @@ assert verifyManifestAndClusterState(lastAcceptedManifest, lastAcceptedState) ==
}

@Override
public PersistedStateStats getPersistedStateStats() {
public PersistedStateStats getStats() {
return remoteClusterStateService.getRemoteClusterStateStats();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public class RemoteClusterStateService implements Closeable {
private volatile TimeValue slowWriteLoggingThreshold;

private final AtomicBoolean deleteStaleMetadataRunning = new AtomicBoolean(false);
private final RemoteStateStats remoteStateStats;
private final RemotePersistenceStats remoteStateStats;
public static final int INDEX_METADATA_CURRENT_CODEC_VERSION = 1;
public static final int MANIFEST_CURRENT_CODEC_VERSION = ClusterMetadataManifest.CODEC_V1;
public static final int GLOBAL_METADATA_CURRENT_CODEC_VERSION = 1;
Expand Down Expand Up @@ -172,7 +172,7 @@ public RemoteClusterStateService(
this.threadpool = threadPool;
this.slowWriteLoggingThreshold = clusterSettings.get(SLOW_WRITE_LOGGING_THRESHOLD);
clusterSettings.addSettingsUpdateConsumer(SLOW_WRITE_LOGGING_THRESHOLD, this::setSlowWriteLoggingThreshold);
this.remoteStateStats = new RemoteStateStats();
this.remoteStateStats = new RemotePersistenceStats();
}

private BlobStoreTransferService getBlobStoreTransferService() {
Expand Down Expand Up @@ -213,8 +213,8 @@ public ClusterMetadataManifest writeFullMetadata(ClusterState clusterState, Stri
false
);
final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos);
remoteStateStats.stateUploaded();
remoteStateStats.stateUploadTook(durationMillis);
remoteStateStats.stateSucceeded();
remoteStateStats.stateTook(durationMillis);
if (durationMillis >= slowWriteLoggingThreshold.getMillis()) {
logger.warn(
"writing cluster state took [{}ms] which is above the warn threshold of [{}]; " + "wrote full state with [{}] indices",
Expand Down Expand Up @@ -316,8 +316,8 @@ public ClusterMetadataManifest writeIncrementalMetadata(
deleteStaleClusterMetadata(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), RETAINED_MANIFESTS);

final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos);
remoteStateStats.stateUploaded();
remoteStateStats.stateUploadTook(durationMillis);
remoteStateStats.stateSucceeded();
remoteStateStats.stateTook(durationMillis);
if (durationMillis >= slowWriteLoggingThreshold.getMillis()) {
logger.warn(
"writing cluster state took [{}ms] which is above the warn threshold of [{}]; "
Expand Down Expand Up @@ -1011,7 +1011,7 @@ public static String encodeString(String content) {
}

public void writeMetadataFailed() {
remoteStateStats.stateUploadFailed();
remoteStateStats.stateFailed();
}

/**
Expand Down Expand Up @@ -1217,7 +1217,7 @@ public void deleteStaleClusterUUIDs(ClusterState clusterState, ClusterMetadataMa
});
}

public RemoteStateStats getRemoteClusterStateStats() {
public RemotePersistenceStats getRemoteClusterStateStats() {
return remoteStateStats;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
*
* @opensearch.internal
*/
public class RemoteStateStats extends PersistedStateStats {
public class RemotePersistenceStats extends PersistedStateStats {
static final String CLEANUP_ATTEMPT_FAILED_COUNT = "cleanup_attempt_failed_count";
static final String REMOTE = "remote";
private AtomicLong cleanupAttemptFailedCount = new AtomicLong(0);

public RemoteStateStats() {
public RemotePersistenceStats() {
super(REMOTE);
addToExtendedFields(CLEANUP_ATTEMPT_FAILED_COUNT, cleanupAttemptFailedCount);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.action.admin.indices.stats.CommonStatsFlags;
import org.opensearch.action.search.SearchRequestStats;
import org.opensearch.cluster.coordination.PendingClusterStateStats;
import org.opensearch.cluster.coordination.PersistedStateStats;
import org.opensearch.cluster.coordination.PublishClusterStateStats;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.WeightedRoutingStats;
Expand All @@ -48,6 +49,7 @@
import org.opensearch.core.indices.breaker.AllCircuitBreakerStats;
import org.opensearch.core.indices.breaker.CircuitBreakerStats;
import org.opensearch.discovery.DiscoveryStats;
import org.opensearch.gateway.remote.RemotePersistenceStats;
import org.opensearch.http.HttpStats;
import org.opensearch.index.ReplicationStats;
import org.opensearch.index.remote.RemoteSegmentStats;
Expand All @@ -72,6 +74,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -349,6 +352,25 @@ public void testSerialization() throws IOException {
assertEquals(queueStats.getTotal(), deserializedDiscoveryStats.getQueueStats().getTotal());
assertEquals(queueStats.getPending(), deserializedDiscoveryStats.getQueueStats().getPending());
}
ClusterStateStats stateStats = discoveryStats.getClusterStateStats();
if (stateStats == null) {
assertNull(deserializedDiscoveryStats.getClusterStateStats());
} else {
assertEquals(stateStats.getUpdateFailed(), deserializedDiscoveryStats.getClusterStateStats().getUpdateFailed());
assertEquals(stateStats.getUpdateSuccess(), deserializedDiscoveryStats.getClusterStateStats().getUpdateSuccess());
assertEquals(
stateStats.getUpdateTotalTimeInMillis(),
deserializedDiscoveryStats.getClusterStateStats().getUpdateTotalTimeInMillis()
);
assertEquals(1, deserializedDiscoveryStats.getClusterStateStats().getPersistenceStats().size());
PersistedStateStats deserializedRemoteStateStats = deserializedDiscoveryStats.getClusterStateStats()
.getPersistenceStats()
.get(0);
PersistedStateStats remoteStateStats = stateStats.getPersistenceStats().get(0);
assertEquals(remoteStateStats.getFailedCount(), deserializedRemoteStateStats.getFailedCount());
assertEquals(remoteStateStats.getSuccessCount(), deserializedRemoteStateStats.getSuccessCount());
assertEquals(remoteStateStats.getTotalTimeInMillis(), deserializedRemoteStateStats.getTotalTimeInMillis());
}
}
IngestStats ingestStats = nodeStats.getIngestStats();
IngestStats deserializedIngestStats = deserializedNodeStats.getIngestStats();
Expand Down Expand Up @@ -714,13 +736,16 @@ public static NodeStats createNodeStats(boolean remoteStoreStats) {
ScriptStats scriptStats = frequently()
? new ScriptStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong())
: null;
ClusterStateStats stateStats = new ClusterStateStats();
RemotePersistenceStats remoteStateStats = new RemotePersistenceStats();
stateStats.setPersistenceStats(Arrays.asList(remoteStateStats));
DiscoveryStats discoveryStats = frequently()
? new DiscoveryStats(
randomBoolean() ? new PendingClusterStateStats(randomInt(), randomInt(), randomInt()) : null,
randomBoolean()
? new PublishClusterStateStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong())
: null,
randomBoolean() ? new ClusterStateStats() : null
randomBoolean() ? stateStats : null
)
: null;
IngestStats ingestStats = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,9 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
submittedTasksPerThread.get(entry.getKey()).get()
);
}
// verify stats values after state is published
assertEquals(1, clusterManagerService.getStateStats().getUpdateSuccess());
assertEquals(0, clusterManagerService.getStateStats().getUpdateFailed());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1017,7 +1017,7 @@ public void setLastAcceptedState(ClusterState clusterState) {
}

@Override
public PersistedStateStats getPersistedStateStats() {
public PersistedStateStats getStats() {
return null;
}

Expand Down

0 comments on commit 6386f7a

Please sign in to comment.