Skip to content

Commit

Permalink
Abstract out PersistedStateStats and use PersistedState interface for…
Browse files Browse the repository at this point in the history
… stats

Signed-off-by: Aman Khare <[email protected]>
  • Loading branch information
Aman Khare committed Oct 20, 2023
1 parent 0953f54 commit 9862244
Show file tree
Hide file tree
Showing 18 changed files with 237 additions and 190 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,12 @@ public interface PersistedState extends Closeable {
*/
void setLastAcceptedState(ClusterState clusterState);

/**
* Returns the stats for the persistence layer for {@link CoordinationState}.
* @return PersistedStateStats
*/
PersistedStateStats getPersistedStateStats();

/**
* Marks the last accepted cluster state as committed.
* After a successful call to this method, {@link #getLastAcceptedState()} should return the last cluster state that was set,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@
import org.opensearch.discovery.PeerFinder;
import org.opensearch.discovery.SeedHostsProvider;
import org.opensearch.discovery.SeedHostsResolver;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.monitor.NodeHealthService;
import org.opensearch.monitor.StatusInfo;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
Expand Down Expand Up @@ -186,7 +185,6 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private final NodeHealthService nodeHealthService;
private final PersistedStateRegistry persistedStateRegistry;
private final RemoteStoreNodeService remoteStoreNodeService;
private final RemoteClusterStateService remoteClusterStateService;

/**
* @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}.
Expand All @@ -209,8 +207,7 @@ public Coordinator(
ElectionStrategy electionStrategy,
NodeHealthService nodeHealthService,
PersistedStateRegistry persistedStateRegistry,
RemoteStoreNodeService remoteStoreNodeService,
RemoteClusterStateService remoteClusterStateService
RemoteStoreNodeService remoteStoreNodeService
) {
this.settings = settings;
this.transportService = transportService;
Expand Down Expand Up @@ -299,7 +296,6 @@ public Coordinator(
this.persistedStateRegistry = persistedStateRegistry;
this.localNodeCommissioned = true;
this.remoteStoreNodeService = remoteStoreNodeService;
this.remoteClusterStateService = remoteClusterStateService;
}

private ClusterFormationState getClusterFormationState() {
Expand Down Expand Up @@ -870,9 +866,13 @@ protected void doStart() {

@Override
public DiscoveryStats stats() {
ClusterStateStats clusterStateStats = remoteClusterStateService != null
? clusterManagerService.getStateStats(this.remoteClusterStateService.getRemoteClusterStateStats())
: clusterManagerService.getStateStats();
CoordinationState.PersistedState remotePersistedState = persistedStateRegistry.getPersistedState(
PersistedStateRegistry.PersistedStateType.REMOTE
);
ClusterStateStats clusterStateStats = clusterManagerService.getStateStats();
if (remotePersistedState != null) {
clusterStateStats.setRemoteStateStats(remotePersistedState.getPersistedStateStats());
}
return new DiscoveryStats(new PendingClusterStateStats(0, 0, 0), publicationHandler.stats(), clusterStateStats);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ public void setLastAcceptedState(ClusterState clusterState) {
this.acceptedState = clusterState;
}

@Override
public PersistedStateStats getPersistedStateStats() {
return null;
}

@Override
public long getCurrentTerm() {
return currentTerm;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.coordination;

import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

/**
* Persisted cluster state related stats.
*
* @opensearch.internal
*/
public class PersistedStateStats implements Writeable, ToXContentObject {
private String statsName;
private AtomicLong totalTimeInMillis = new AtomicLong(0);
private AtomicLong failedCount = new AtomicLong(0);
private AtomicLong successCount = new AtomicLong(0);
private Map<String, AtomicLong> extendedFields = new HashMap<>(); // keeping minimal extensibility

public PersistedStateStats(String statsName) {
this.statsName = statsName;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(successCount.get());
out.writeVLong(failedCount.get());
out.writeVLong(totalTimeInMillis.get());
if (extendedFields.size() > 0) {
out.writeBoolean(true);
out.writeVInt(extendedFields.size());
for (Map.Entry<String, AtomicLong> extendedField : extendedFields.entrySet()) {
out.writeString(extendedField.getKey());
out.writeVLong(extendedField.getValue().get());
}
} else {
out.writeBoolean(false);
}
}

public PersistedStateStats(StreamInput in) throws IOException {
this.successCount = new AtomicLong(in.readVLong());
this.failedCount = new AtomicLong(in.readVLong());
this.totalTimeInMillis = new AtomicLong(in.readVLong());
if (in.readBoolean()) {
int extendedFieldsSize = in.readVInt();
this.extendedFields = new HashMap<>();
for (int fieldNumber = 0; fieldNumber < extendedFieldsSize; fieldNumber++) {
extendedFields.put(in.readString(), new AtomicLong(in.readVLong()));
}
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(statsName);
builder.field(Fields.UPDATE_COUNT, getSuccessCount());
builder.field(Fields.FAILED_COUNT, getFailedCount());
builder.field(Fields.TOTAL_TIME_IN_MILLIS, getTotalTimeInMillis());
if (extendedFields.size() > 0) {
for (Map.Entry<String, AtomicLong> extendedField : extendedFields.entrySet()) {
builder.field(extendedField.getKey(), extendedField.getValue().get());
}
}
builder.endObject();
return builder;
}

public void stateUploadFailed() {
failedCount.incrementAndGet();
}

public void stateUploaded() {
successCount.incrementAndGet();
}

/**
* Expects user to send time taken in milliseconds.
*
* @param timeTakenInUpload time taken in uploading the cluster state to remote
*/
public void stateUploadTook(long timeTakenInUpload) {
totalTimeInMillis.addAndGet(timeTakenInUpload);
}

public long getTotalTimeInMillis() {
return totalTimeInMillis.get();
}

public long getFailedCount() {
return failedCount.get();
}

public long getSuccessCount() {
return successCount.get();
}

protected void addToExtendedFields(String extendedField, AtomicLong extendedFieldValue) {
this.extendedFields.put(extendedField, extendedFieldValue);
}

/**
* Fields for parsing and toXContent
*
* @opensearch.internal
*/
static final class Fields {
static final String UPDATE_COUNT = "update_count";
static final String TOTAL_TIME_IN_MILLIS = "total_time_in_millis";
static final String FAILED_COUNT = "failed_count";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@

package org.opensearch.cluster.service;

import org.opensearch.cluster.coordination.PersistedStateStats;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.gateway.remote.RemoteClusterStateStats;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -25,68 +25,69 @@
*/
public class ClusterStateStats implements Writeable, ToXContentObject {

private AtomicLong stateUpdated = new AtomicLong(0);
private AtomicLong stateUpdateTimeTotalInMS = new AtomicLong(0);
private AtomicLong stateUpdateSuccess = new AtomicLong(0);
private AtomicLong stateUpdateTotalTimeInMillis = new AtomicLong(0);
private AtomicLong stateUpdateFailed = new AtomicLong(0);
private RemoteClusterStateStats remoteStateStats = null;
private PersistedStateStats remoteStateStats = null;

public ClusterStateStats() {}

public long getStateUpdated() {
return stateUpdated.get();
public long getStateUpdateSuccess() {
return stateUpdateSuccess.get();
}

public long getStateUpdateTimeTotalInMS() {
return stateUpdateTimeTotalInMS.get();
public long getStateUpdateTotalTimeInMillis() {
return stateUpdateTotalTimeInMillis.get();
}

public long getStateUpdateFailed() {
return stateUpdateFailed.get();
}

public void stateUpdated() {
stateUpdated.incrementAndGet();
stateUpdateSuccess.incrementAndGet();
}

public void stateUpdateFailed() {
stateUpdateFailed.incrementAndGet();
}

public void stateUpdateTook(long stateUpdateTime) {
stateUpdateTimeTotalInMS.addAndGet(stateUpdateTime);
stateUpdateTotalTimeInMillis.addAndGet(stateUpdateTime);
}

public void setRemoteStateStats(RemoteClusterStateStats remoteStateStats) {
public void setRemoteStateStats(PersistedStateStats remoteStateStats) {
this.remoteStateStats = remoteStateStats;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(stateUpdated.get());
out.writeVLong(stateUpdateTimeTotalInMS.get());
out.writeVLong(stateUpdateSuccess.get());
out.writeVLong(stateUpdateTotalTimeInMillis.get());
out.writeVLong(stateUpdateFailed.get());
if (remoteStateStats != null) {
out.writeBoolean(true);
remoteStateStats.writeTo(out);
} else {
out.writeBoolean(false);
}
}

public ClusterStateStats(StreamInput in) throws IOException {
this.stateUpdated = new AtomicLong(in.readVLong());
this.stateUpdateTimeTotalInMS = new AtomicLong(in.readVLong());
this.stateUpdateSuccess = new AtomicLong(in.readVLong());
this.stateUpdateTotalTimeInMillis = new AtomicLong(in.readVLong());
this.stateUpdateFailed = new AtomicLong(in.readVLong());
if (in.readBoolean()) {
this.remoteStateStats = new RemoteClusterStateStats(in);
this.remoteStateStats = new PersistedStateStats(in);
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.CLUSTER_STATE);
builder.startObject(Fields.OVERALL_STATS);
builder.field(Fields.UPDATE_COUNT, getStateUpdated());
builder.field(Fields.TOTAL_TIME_IN_MILLIS, getStateUpdateTimeTotalInMS());
builder.startObject(Fields.CLUSTER_STATE_STATS);
builder.startObject(Fields.OVERALL);
builder.field(Fields.UPDATE_COUNT, getStateUpdateSuccess());
builder.field(Fields.TOTAL_TIME_IN_MILLIS, getStateUpdateTotalTimeInMillis());
builder.field(Fields.FAILED_COUNT, getStateUpdateFailed());
builder.endObject();
if (remoteStateStats != null) {
Expand All @@ -102,8 +103,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
* @opensearch.internal
*/
static final class Fields {
static final String CLUSTER_STATE = "cluster_state";
static final String OVERALL_STATS = "overall_stats";
static final String CLUSTER_STATE_STATS = "cluster_state_stats";
static final String OVERALL = "overall";
static final String UPDATE_COUNT = "update_count";
static final String TOTAL_TIME_IN_MILLIS = "total_time_in_millis";
static final String FAILED_COUNT = "failed_count";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@
import org.opensearch.core.common.text.Text;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.discovery.Discovery;
import org.opensearch.gateway.remote.RemoteClusterStateStats;
import org.opensearch.node.Node;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -1002,11 +1001,6 @@ public <T> void submitStateUpdateTasks(
}
}

public ClusterStateStats getStateStats(RemoteClusterStateStats remoteStateStats) {
stateStats.setRemoteStateStats(remoteStateStats);
return stateStats;
}

public ClusterStateStats getStateStats() {
return stateStats;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.common.transport.TransportAddress;
import org.opensearch.gateway.GatewayMetaState;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.monitor.NodeHealthService;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.plugins.DiscoveryPlugin;
Expand Down Expand Up @@ -134,8 +133,7 @@ public DiscoveryModule(
RerouteService rerouteService,
NodeHealthService nodeHealthService,
PersistedStateRegistry persistedStateRegistry,
RemoteStoreNodeService remoteStoreNodeService,
RemoteClusterStateService remoteClusterStateService
RemoteStoreNodeService remoteStoreNodeService
) {
final Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators = new ArrayList<>();
final Map<String, Supplier<SeedHostsProvider>> hostProviders = new HashMap<>();
Expand Down Expand Up @@ -213,8 +211,7 @@ public DiscoveryModule(
electionStrategy,
nodeHealthService,
persistedStateRegistry,
remoteStoreNodeService,
remoteClusterStateService
remoteStoreNodeService
);
} else {
throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]");
Expand Down
Loading

0 comments on commit 9862244

Please sign in to comment.