Skip to content

Commit

Permalink
added tests and refactored based on review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
rajiv-kv committed Feb 14, 2024
1 parent accb5e8 commit 1ede5f5
Show file tree
Hide file tree
Showing 6 changed files with 233 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,6 @@
* compatible open source license.
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,33 +6,10 @@
* compatible open source license.
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.action.admin.cluster.state.term;

import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
Expand All @@ -46,27 +23,30 @@
*/
public class ClusterTermVersionResponse extends ActionResponse {

protected DiscoveryNode sourceNode;
private final ClusterName clusterName;
private final String stateUUID;
private final long term;
private final long version;

protected long term;
protected long version;

public ClusterTermVersionResponse(DiscoveryNode sourceNode, long term, long version) {
this.sourceNode = sourceNode;
public ClusterTermVersionResponse(ClusterName clusterName, String stateUUID, long term, long version) {
this.clusterName = clusterName;
this.stateUUID = stateUUID;
this.term = term;
this.version = version;
}

public ClusterTermVersionResponse(StreamInput in) throws IOException {
super(in);
this.sourceNode = new DiscoveryNode(in);
this.clusterName = new ClusterName(in);
this.stateUUID = in.readString();
this.term = in.readLong();
this.version = in.readLong();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
sourceNode.writeTo(out);
clusterName.writeTo(out);
out.writeString(stateUUID);
out.writeLong(term);
out.writeLong(version);
}
Expand All @@ -79,4 +59,19 @@ public long getVersion() {
return version;
}

public ClusterName getClusterName() {
return clusterName;
}

public String getStateUUID() {
return stateUUID;
}

public boolean matches(ClusterState clusterState) {
return clusterName.equals(clusterState.getClusterName())
&& stateUUID.equals(clusterState.stateUUID())
&& term == clusterState.term()
&& version == clusterState.version();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,6 @@
* compatible open source license.
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
Expand Down Expand Up @@ -91,6 +72,7 @@ public ClusterTermVersionResponse read(StreamInput in) throws IOException {

@Override
protected ClusterBlockException checkBlock(ClusterTermVersionRequest request, ClusterState state) {
// cluster state term and version needs to be retrieved even on a fully blocked cluster
return null;
}

Expand All @@ -105,6 +87,6 @@ protected void clusterManagerOperation(

private ClusterTermVersionResponse buildResponse(ClusterTermVersionRequest request, ClusterState state) {
logger.trace("Serving cluster term version request using term {} and version {}", state.term(), state.version());
return new ClusterTermVersionResponse(state.getNodes().getClusterManagerNode(), state.term(), state.getVersion());
return new ClusterTermVersionResponse(state.getClusterName(), state.stateUUID(), state.term(), state.getVersion());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ protected void doStart(ClusterState clusterState) {
threadPool.executor(executor)
.execute(
ActionRunnable.wrap(
GetDelegateForLocalExecute(clusterState),
getDelegateForLocalExecute(clusterState),
l -> clusterManagerOperation(task, request, clusterState, l)
)
);
Expand All @@ -271,49 +271,13 @@ protected void doStart(ClusterState clusterState) {
retryOnMasterChange(clusterState, null);
} else {
DiscoveryNode clusterManagerNode = nodes.getClusterManagerNode();

if (clusterManagerNode.getVersion().onOrAfter(Version.V_2_11_0) && checkTermVersion()) {
transportService.sendRequest(
clusterManagerNode,
ClusterTermVersionAction.NAME,
new ClusterTermVersionRequest(),
new TransportResponseHandler<ClusterTermVersionResponse>() {
@Override
public void handleResponse(ClusterTermVersionResponse response) {
if (response.getTerm() == clusterState.term() && response.getVersion() == clusterState.version()) {
// same as local execute
threadPool.executor(executor)
.execute(
ActionRunnable.wrap(
GetDelegateForLocalExecute(clusterState),
l -> clusterManagerOperation(task, request, clusterState, l)
)
);
} else {
// cluster-manager has updated state
executeOnClusterManager(nodes.getClusterManagerNode(), clusterState);
}
}

@Override
public void handleException(TransportException exp) {
handleTransportException(nodes.getClusterManagerNode(), clusterState, exp);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public ClusterTermVersionResponse read(StreamInput in) throws IOException {
return new ClusterTermVersionResponse(in);
}
}
);
boolean shouldExecuteOnClusterManger = clusterManagerNode.getVersion().onOrAfter(Version.V_2_11_0)
&& checkTermVersion();
if (shouldExecuteOnClusterManger) {
execOnClusterManagerOnTermMismatch(clusterManagerNode, clusterState);
} else {
// should be always executed on cluster-manager
executeOnClusterManager(nodes.getClusterManagerNode(), clusterState);
executeOnClusterManager(clusterManagerNode, clusterState);
}
}
}
Expand All @@ -322,10 +286,6 @@ public ClusterTermVersionResponse read(StreamInput in) throws IOException {
}
}

private ClusterTermVersionResponse readClusterTermResponse(StreamInput in) throws IOException {
return new ClusterTermVersionResponse(in);
}

private void retryOnMasterChange(ClusterState state, Throwable failure) {
retry(state, failure, ClusterManagerNodeChangePredicate.build(state));
}
Expand Down Expand Up @@ -369,7 +329,7 @@ public void onTimeout(TimeValue timeout) {
}, statePredicate);
}

private ActionListener<Response> GetDelegateForLocalExecute(ClusterState clusterState) {
private ActionListener<Response> getDelegateForLocalExecute(ClusterState clusterState) {
return ActionListener.delegateResponse(listener, (delegatedListener, t) -> {
if (t instanceof FailedToCommitClusterStateException || t instanceof NotClusterManagerException) {
logger.debug(
Expand All @@ -387,6 +347,46 @@ private ActionListener<Response> GetDelegateForLocalExecute(ClusterState cluster
});
}

private void execOnClusterManagerOnTermMismatch(DiscoveryNode clusterManagerNode, ClusterState clusterState) {
transportService.sendRequest(
clusterManagerNode,
ClusterTermVersionAction.NAME,
new ClusterTermVersionRequest(),
new TransportResponseHandler<ClusterTermVersionResponse>() {
@Override
public void handleResponse(ClusterTermVersionResponse response) {
boolean shouldExecuteOnClusterManger = !response.matches(clusterState);
if (shouldExecuteOnClusterManger) {
executeOnClusterManager(clusterManagerNode, clusterState);
} else {
Runnable runTask = ActionRunnable.wrap(
getDelegateForLocalExecute(clusterState),
l -> clusterManagerOperation(task, request, clusterState, l)
);
threadPool.executor(executor).execute(runTask);
}
}

@Override
public void handleException(TransportException exp) {
handleTransportException(clusterManagerNode, clusterState, exp);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public ClusterTermVersionResponse read(StreamInput in) throws IOException {
return new ClusterTermVersionResponse(in);
}

}

);
}

private void executeOnClusterManager(DiscoveryNode clusterManagerNode, ClusterState clusterState) {
final String actionName = getClusterManagerActionName(clusterManagerNode);

Expand Down
Loading

0 comments on commit 1ede5f5

Please sign in to comment.