Skip to content

Commit

Permalink
Remove pending tasks helpers from client API (elastic#103451)
Browse files Browse the repository at this point in the history
There's no need for special helpers in `AdminClient` for this transport
action, we only use it in a few integ tests.
  • Loading branch information
DaveCTurner authored Dec 14, 2023
1 parent a2dc45b commit b3c1637
Show file tree
Hide file tree
Showing 14 changed files with 53 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public void testPendingTasksWithIndexBlocks() {
)) {
try {
enableIndexBlock("test", blockSetting);
PendingClusterTasksResponse response = clusterAdmin().preparePendingClusterTasks().get();
PendingClusterTasksResponse response = getClusterPendingTasks();
assertNotNull(response.pendingTasks());
} finally {
disableIndexBlock("test", blockSetting);
Expand All @@ -53,7 +53,7 @@ public void testPendingTasksWithClusterReadOnlyBlock() {

try {
setClusterReadOnly(true);
PendingClusterTasksResponse response = clusterAdmin().preparePendingClusterTasks().get();
PendingClusterTasksResponse response = getClusterPendingTasks();
assertNotNull(response.pendingTasks());
} finally {
setClusterReadOnly(false);
Expand All @@ -80,7 +80,7 @@ public boolean validateClusterForming() {
}
});

assertNotNull(clusterAdmin().preparePendingClusterTasks().get().pendingTasks());
assertNotNull(getClusterPendingTasks().pendingTasks());

// starting one more node allows the cluster to recover
internalCluster().startNode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState)
assertTrue(controlSources.isEmpty());

controlSources = new HashSet<>(Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8", "9", "10"));
PendingClusterTasksResponse response = internalCluster().coordOnlyNodeClient().admin().cluster().preparePendingClusterTasks().get();
PendingClusterTasksResponse response = getClusterPendingTasks(internalCluster().coordOnlyNodeClient());
assertThat(response.pendingTasks().size(), greaterThanOrEqualTo(10));
assertThat(response.pendingTasks().get(0).getSource().string(), equalTo("1"));
assertThat(response.pendingTasks().get(0).isExecuting(), equalTo(true));
Expand Down Expand Up @@ -419,7 +419,7 @@ public void onFailure(Exception e) {
}
assertTrue(controlSources.isEmpty());

response = internalCluster().coordOnlyNodeClient().admin().cluster().preparePendingClusterTasks().get();
response = getClusterPendingTasks(internalCluster().coordOnlyNodeClient());
assertThat(response.pendingTasks().size(), greaterThanOrEqualTo(5));
controlSources = new HashSet<>(Arrays.asList("1", "2", "3", "4", "5"));
for (PendingClusterTask task : response.pendingTasks()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,11 +182,7 @@ public void testCorruptFileAndRecover() throws InterruptedException, IOException
.waitForNoRelocatingShards(true)
).actionGet();
if (health.isTimedOut()) {
logger.info(
"cluster state:\n{}\n{}",
clusterAdmin().prepareState().get().getState(),
clusterAdmin().preparePendingClusterTasks().get()
);
logger.info("cluster state:\n{}\n{}", clusterAdmin().prepareState().get().getState(), getClusterPendingTasks());
assertThat("timed out waiting for green state", health.isTimedOut(), equalTo(false));
}
assertThat(health.getStatus(), equalTo(ClusterHealthStatus.GREEN));
Expand Down Expand Up @@ -295,11 +291,7 @@ public void testCorruptPrimaryNoReplica() throws ExecutionException, Interrupted

if (response.getStatus() != ClusterHealthStatus.RED) {
logger.info("Cluster turned red in busy loop: {}", didClusterTurnRed);
logger.info(
"cluster state:\n{}\n{}",
clusterAdmin().prepareState().get().getState(),
clusterAdmin().preparePendingClusterTasks().get()
);
logger.info("cluster state:\n{}\n{}", clusterAdmin().prepareState().get().getState(), getClusterPendingTasks());
}
assertThat(response.getStatus(), is(ClusterHealthStatus.RED));
ClusterState state = clusterAdmin().prepareState().get().getState();
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,6 @@
import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptResponse;
import org.elasticsearch.action.admin.cluster.storedscripts.PutStoredScriptRequest;
import org.elasticsearch.action.admin.cluster.storedscripts.PutStoredScriptRequestBuilder;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksRequest;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksRequestBuilder;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
import org.elasticsearch.action.admin.indices.dangling.delete.DeleteDanglingIndexRequest;
import org.elasticsearch.action.admin.indices.dangling.import_index.ImportDanglingIndexRequest;
import org.elasticsearch.action.admin.indices.dangling.list.ListDanglingIndicesRequest;
Expand Down Expand Up @@ -441,18 +438,6 @@ public interface ClusterAdminClient extends ElasticsearchClient {
*/
RestoreSnapshotRequestBuilder prepareRestoreSnapshot(String repository, String snapshot);

/**
* Returns a list of the pending cluster tasks, that are scheduled to be executed. This includes operations
* that update the cluster state (for example, a create index operation)
*/
void pendingClusterTasks(PendingClusterTasksRequest request, ActionListener<PendingClusterTasksResponse> listener);

/**
* Returns a list of the pending cluster tasks, that are scheduled to be executed. This includes operations
* that update the cluster state (for example, a create index operation)
*/
PendingClusterTasksRequestBuilder preparePendingClusterTasks();

/**
* Get snapshot status.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,6 @@
import org.elasticsearch.action.admin.cluster.storedscripts.PutStoredScriptRequestBuilder;
import org.elasticsearch.action.admin.cluster.storedscripts.TransportDeleteStoredScriptAction;
import org.elasticsearch.action.admin.cluster.storedscripts.TransportPutStoredScriptAction;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksRequest;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksRequestBuilder;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
import org.elasticsearch.action.admin.cluster.tasks.TransportPendingClusterTasksAction;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder;
import org.elasticsearch.action.admin.indices.alias.TransportIndicesAliasesAction;
Expand Down Expand Up @@ -851,16 +847,6 @@ public ClusterSearchShardsRequestBuilder prepareSearchShards(String... indices)
return new ClusterSearchShardsRequestBuilder(this).setIndices(indices);
}

@Override
public PendingClusterTasksRequestBuilder preparePendingClusterTasks() {
return new PendingClusterTasksRequestBuilder(this);
}

@Override
public void pendingClusterTasks(PendingClusterTasksRequest request, ActionListener<PendingClusterTasksResponse> listener) {
execute(TransportPendingClusterTasksAction.TYPE, request, listener);
}

@Override
public void putRepository(PutRepositoryRequest request, ActionListener<AcknowledgedResponse> listener) {
execute(TransportPutRepositoryAction.TYPE, request, listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.rest.action.admin.cluster;

import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksRequest;
import org.elasticsearch.action.admin.cluster.tasks.TransportPendingClusterTasksAction;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
Expand Down Expand Up @@ -39,8 +40,10 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
PendingClusterTasksRequest pendingClusterTasksRequest = new PendingClusterTasksRequest();
pendingClusterTasksRequest.masterNodeTimeout(request.paramAsTime("master_timeout", pendingClusterTasksRequest.masterNodeTimeout()));
pendingClusterTasksRequest.local(request.paramAsBoolean("local", pendingClusterTasksRequest.local()));
return channel -> client.admin()
.cluster()
.pendingClusterTasks(pendingClusterTasksRequest, new RestChunkedToXContentListener<>(channel));
return channel -> client.execute(
TransportPendingClusterTasksAction.TYPE,
pendingClusterTasksRequest,
new RestChunkedToXContentListener<>(channel)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksRequest;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
import org.elasticsearch.action.admin.cluster.tasks.TransportPendingClusterTasksAction;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.service.PendingClusterTask;
import org.elasticsearch.common.Table;
Expand Down Expand Up @@ -46,15 +47,17 @@ public RestChannelConsumer doCatRequest(final RestRequest request, final NodeCli
PendingClusterTasksRequest pendingClusterTasksRequest = new PendingClusterTasksRequest();
pendingClusterTasksRequest.masterNodeTimeout(request.paramAsTime("master_timeout", pendingClusterTasksRequest.masterNodeTimeout()));
pendingClusterTasksRequest.local(request.paramAsBoolean("local", pendingClusterTasksRequest.local()));
return channel -> client.admin()
.cluster()
.pendingClusterTasks(pendingClusterTasksRequest, new RestResponseListener<PendingClusterTasksResponse>(channel) {
return channel -> client.execute(
TransportPendingClusterTasksAction.TYPE,
pendingClusterTasksRequest,
new RestResponseListener<>(channel) {
@Override
public RestResponse buildResponse(PendingClusterTasksResponse pendingClusterTasks) throws Exception {
Table tab = buildTable(request, pendingClusterTasks);
return RestTable.buildResponse(tab, channel);
}
});
}
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksRequest;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
import org.elasticsearch.action.admin.cluster.tasks.TransportPendingClusterTasksAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
Expand Down Expand Up @@ -860,7 +862,10 @@ public void waitNoPendingTasksOnAll() throws Exception {
for (Client client : clients()) {
ClusterHealthResponse clusterHealth = client.admin().cluster().prepareHealth().setLocal(true).get();
assertThat("client " + client + " still has in flight fetch", clusterHealth.getNumberOfInFlightFetch(), equalTo(0));
PendingClusterTasksResponse pendingTasks = client.admin().cluster().preparePendingClusterTasks().setLocal(true).get();
PendingClusterTasksResponse pendingTasks = client.execute(
TransportPendingClusterTasksAction.TYPE,
new PendingClusterTasksRequest().local(true)
).get();
assertThat(
"client " + client + " still has pending tasks " + pendingTasks,
pendingTasks.pendingTasks(),
Expand Down Expand Up @@ -977,8 +982,11 @@ private ClusterHealthStatus ensureColor(
try (var listeners = new RefCountingListener(detailsFuture)) {
clusterAdmin().prepareAllocationExplain().execute(listeners.acquire(allocationExplainRef::set));
clusterAdmin().prepareState().execute(listeners.acquire(clusterStateRef::set));
clusterAdmin().preparePendingClusterTasks().execute(listeners.acquire(pendingTasksRef::set));

client().execute(
TransportPendingClusterTasksAction.TYPE,
new PendingClusterTasksRequest(),
listeners.acquire(pendingTasksRef::set)
);
try (var writer = new StringWriter()) {
new HotThreads().busiestThreads(9999).ignoreIdleThreads(false).detect(writer);
hotThreadsRef.set(writer.toString());
Expand Down Expand Up @@ -1040,7 +1048,7 @@ public ClusterHealthStatus waitForRelocation(ClusterHealthStatus status) {
"waitForRelocation timed out (status={}), cluster state:\n{}\n{}",
status,
clusterAdmin().prepareState().get().getState(),
clusterAdmin().preparePendingClusterTasks().get()
getClusterPendingTasks()
);
assertThat("timed out waiting for relocation", actionGet.isTimedOut(), equalTo(false));
}
Expand All @@ -1050,6 +1058,18 @@ public ClusterHealthStatus waitForRelocation(ClusterHealthStatus status) {
return actionGet.getStatus();
}

public static PendingClusterTasksResponse getClusterPendingTasks() {
return getClusterPendingTasks(client());
}

public static PendingClusterTasksResponse getClusterPendingTasks(Client client) {
try {
return client.execute(TransportPendingClusterTasksAction.TYPE, new PendingClusterTasksRequest()).get(10, TimeUnit.SECONDS);
} catch (Exception e) {
return fail(e);
}
}

/**
* Waits until at least a give number of document is visible for searchers
*
Expand Down Expand Up @@ -1146,11 +1166,7 @@ public static DiscoveryNode waitAndGetHealthNode(InternalTestCluster internalClu
* Prints the current cluster state as debug logging.
*/
public void logClusterState() {
logger.debug(
"cluster state:\n{}\n{}",
clusterAdmin().prepareState().get().getState(),
clusterAdmin().preparePendingClusterTasks().get()
);
logger.debug("cluster state:\n{}\n{}", clusterAdmin().prepareState().get().getState(), getClusterPendingTasks());
}

protected void ensureClusterSizeConsistency() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ public ClusterHealthStatus ensureGreen(TimeValue timeout, String... indices) {
logger.info(
"ensureGreen timed out, cluster state:\n{}\n{}",
clusterAdmin().prepareState().get().getState(),
clusterAdmin().preparePendingClusterTasks().get()
ESIntegTestCase.getClusterPendingTasks(client())
);
assertThat("timed out waiting for green state", actionGet.isTimedOut(), equalTo(false));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,9 +431,9 @@ private ClusterHealthStatus ensureColor(
{}""",
method,
leaderClient().admin().cluster().prepareState().get().getState(),
leaderClient().admin().cluster().preparePendingClusterTasks().get(),
ESIntegTestCase.getClusterPendingTasks(leaderClient()),
followerClient().admin().cluster().prepareState().get().getState(),
followerClient().admin().cluster().preparePendingClusterTasks().get()
ESIntegTestCase.getClusterPendingTasks(followerClient())
);
HotThreads.logLocalHotThreads(logger, Level.INFO, "hot threads at timeout", ReferenceDocs.LOGGING);
fail("timed out waiting for " + color + " state");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,7 @@ public boolean validateClusterForming() {
}
})).start();

waitUntil(
() -> cluster.client().admin().cluster().preparePendingClusterTasks().get().pendingTasks().isEmpty(),
60,
TimeUnit.SECONDS
);
waitUntil(() -> getClusterPendingTasks(cluster.client()).pendingTasks().isEmpty(), 60, TimeUnit.SECONDS);
ensureStableCluster(cluster.numDataAndMasterNodes());

final String targetIndex = "downsample-5m-" + sourceIndex;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ public boolean validateClusterForming() {
}
})).start();
startDownsampleTaskDuringDisruption(sourceIndex, targetIndex, config, disruptionStart, disruptionEnd);
waitUntil(() -> cluster.client().admin().cluster().preparePendingClusterTasks().get().pendingTasks().isEmpty());
waitUntil(() -> getClusterPendingTasks(cluster.client()).pendingTasks().isEmpty());
ensureStableCluster(cluster.numDataAndMasterNodes());
assertTargetIndex(cluster, sourceIndex, targetIndex, indexedDocs);
}
Expand Down Expand Up @@ -265,7 +265,7 @@ public boolean validateClusterForming() {
})).start();

startDownsampleTaskDuringDisruption(sourceIndex, targetIndex, config, disruptionStart, disruptionEnd);
waitUntil(() -> cluster.client().admin().cluster().preparePendingClusterTasks().get().pendingTasks().isEmpty());
waitUntil(() -> getClusterPendingTasks(cluster.client()).pendingTasks().isEmpty());
ensureStableCluster(cluster.numDataAndMasterNodes());
assertTargetIndex(cluster, sourceIndex, targetIndex, indexedDocs);
}
Expand Down Expand Up @@ -354,7 +354,7 @@ public boolean validateClusterForming() {
})).start();

startDownsampleTaskDuringDisruption(sourceIndex, downsampleIndex, config, disruptionStart, disruptionEnd);
waitUntil(() -> cluster.client().admin().cluster().preparePendingClusterTasks().get().pendingTasks().isEmpty());
waitUntil(() -> getClusterPendingTasks(cluster.client()).pendingTasks().isEmpty());
ensureStableCluster(cluster.numDataAndMasterNodes());
assertTargetIndex(cluster, sourceIndex, downsampleIndex, indexedDocs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ public boolean validateClusterForming() {

final String targetIndex = "downsample-1h-" + sourceIndex;
startDownsampleTaskViaIlm(sourceIndex, targetIndex, disruptionStart, disruptionEnd);
waitUntil(() -> cluster.client().admin().cluster().preparePendingClusterTasks().get().pendingTasks().isEmpty());
waitUntil(() -> getClusterPendingTasks(cluster.client()).pendingTasks().isEmpty(), 60, TimeUnit.SECONDS);
ensureStableCluster(cluster.numDataAndMasterNodes());
assertTargetIndex(cluster, targetIndex, indexedDocs);
}
Expand Down

0 comments on commit b3c1637

Please sign in to comment.