diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/tasks/PendingTasksBlocksIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/tasks/PendingTasksBlocksIT.java index 595788b1eb9f5..eaf8948348684 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/tasks/PendingTasksBlocksIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/tasks/PendingTasksBlocksIT.java @@ -37,7 +37,7 @@ public void testPendingTasksWithIndexBlocks() { )) { try { enableIndexBlock("test", blockSetting); - PendingClusterTasksResponse response = clusterAdmin().preparePendingClusterTasks().get(); + PendingClusterTasksResponse response = getClusterPendingTasks(); assertNotNull(response.pendingTasks()); } finally { disableIndexBlock("test", blockSetting); @@ -53,7 +53,7 @@ public void testPendingTasksWithClusterReadOnlyBlock() { try { setClusterReadOnly(true); - PendingClusterTasksResponse response = clusterAdmin().preparePendingClusterTasks().get(); + PendingClusterTasksResponse response = getClusterPendingTasks(); assertNotNull(response.pendingTasks()); } finally { setClusterReadOnly(false); @@ -80,7 +80,7 @@ public boolean validateClusterForming() { } }); - assertNotNull(clusterAdmin().preparePendingClusterTasks().get().pendingTasks()); + assertNotNull(getClusterPendingTasks().pendingTasks()); // starting one more node allows the cluster to recover internalCluster().startNode(); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/service/ClusterServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/service/ClusterServiceIT.java index 873f8083f4e0c..fde465346d4be 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/service/ClusterServiceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/service/ClusterServiceIT.java @@ -357,7 +357,7 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState) assertTrue(controlSources.isEmpty()); controlSources = new HashSet<>(Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8", "9", "10")); - PendingClusterTasksResponse response = internalCluster().coordOnlyNodeClient().admin().cluster().preparePendingClusterTasks().get(); + PendingClusterTasksResponse response = getClusterPendingTasks(internalCluster().coordOnlyNodeClient()); assertThat(response.pendingTasks().size(), greaterThanOrEqualTo(10)); assertThat(response.pendingTasks().get(0).getSource().string(), equalTo("1")); assertThat(response.pendingTasks().get(0).isExecuting(), equalTo(true)); @@ -419,7 +419,7 @@ public void onFailure(Exception e) { } assertTrue(controlSources.isEmpty()); - response = internalCluster().coordOnlyNodeClient().admin().cluster().preparePendingClusterTasks().get(); + response = getClusterPendingTasks(internalCluster().coordOnlyNodeClient()); assertThat(response.pendingTasks().size(), greaterThanOrEqualTo(5)); controlSources = new HashSet<>(Arrays.asList("1", "2", "3", "4", "5")); for (PendingClusterTask task : response.pendingTasks()) { diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/store/CorruptedFileIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/store/CorruptedFileIT.java index ec79b53ccd174..c1da93140a0b0 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/store/CorruptedFileIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/store/CorruptedFileIT.java @@ -182,11 +182,7 @@ public void testCorruptFileAndRecover() throws InterruptedException, IOException .waitForNoRelocatingShards(true) ).actionGet(); if (health.isTimedOut()) { - logger.info( - "cluster state:\n{}\n{}", - clusterAdmin().prepareState().get().getState(), - clusterAdmin().preparePendingClusterTasks().get() - ); + logger.info("cluster state:\n{}\n{}", clusterAdmin().prepareState().get().getState(), getClusterPendingTasks()); assertThat("timed out waiting for green state", health.isTimedOut(), equalTo(false)); } assertThat(health.getStatus(), equalTo(ClusterHealthStatus.GREEN)); @@ -295,11 +291,7 @@ public void testCorruptPrimaryNoReplica() throws ExecutionException, Interrupted if (response.getStatus() != ClusterHealthStatus.RED) { logger.info("Cluster turned red in busy loop: {}", didClusterTurnRed); - logger.info( - "cluster state:\n{}\n{}", - clusterAdmin().prepareState().get().getState(), - clusterAdmin().preparePendingClusterTasks().get() - ); + logger.info("cluster state:\n{}\n{}", clusterAdmin().prepareState().get().getState(), getClusterPendingTasks()); } assertThat(response.getStatus(), is(ClusterHealthStatus.RED)); ClusterState state = clusterAdmin().prepareState().get().getState(); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/tasks/PendingClusterTasksRequestBuilder.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/tasks/PendingClusterTasksRequestBuilder.java deleted file mode 100644 index aa3f226d23c9d..0000000000000 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/tasks/PendingClusterTasksRequestBuilder.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0 and the Server Side Public License, v 1; you may not use this file except - * in compliance with, at your election, the Elastic License 2.0 or the Server - * Side Public License, v 1. - */ - -package org.elasticsearch.action.admin.cluster.tasks; - -import org.elasticsearch.action.support.master.MasterNodeReadOperationRequestBuilder; -import org.elasticsearch.client.internal.ElasticsearchClient; - -public class PendingClusterTasksRequestBuilder extends MasterNodeReadOperationRequestBuilder< - PendingClusterTasksRequest, - PendingClusterTasksResponse, - PendingClusterTasksRequestBuilder> { - - public PendingClusterTasksRequestBuilder(ElasticsearchClient client) { - super(client, TransportPendingClusterTasksAction.TYPE, new PendingClusterTasksRequest()); - } -} diff --git a/server/src/main/java/org/elasticsearch/client/internal/ClusterAdminClient.java b/server/src/main/java/org/elasticsearch/client/internal/ClusterAdminClient.java index 4d27431dcd45d..9e3bed8cef09a 100644 --- a/server/src/main/java/org/elasticsearch/client/internal/ClusterAdminClient.java +++ b/server/src/main/java/org/elasticsearch/client/internal/ClusterAdminClient.java @@ -85,9 +85,6 @@ import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptResponse; import org.elasticsearch.action.admin.cluster.storedscripts.PutStoredScriptRequest; import org.elasticsearch.action.admin.cluster.storedscripts.PutStoredScriptRequestBuilder; -import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksRequest; -import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksRequestBuilder; -import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse; import org.elasticsearch.action.admin.indices.dangling.delete.DeleteDanglingIndexRequest; import org.elasticsearch.action.admin.indices.dangling.import_index.ImportDanglingIndexRequest; import org.elasticsearch.action.admin.indices.dangling.list.ListDanglingIndicesRequest; @@ -441,18 +438,6 @@ public interface ClusterAdminClient extends ElasticsearchClient { */ RestoreSnapshotRequestBuilder prepareRestoreSnapshot(String repository, String snapshot); - /** - * Returns a list of the pending cluster tasks, that are scheduled to be executed. This includes operations - * that update the cluster state (for example, a create index operation) - */ - void pendingClusterTasks(PendingClusterTasksRequest request, ActionListener listener); - - /** - * Returns a list of the pending cluster tasks, that are scheduled to be executed. This includes operations - * that update the cluster state (for example, a create index operation) - */ - PendingClusterTasksRequestBuilder preparePendingClusterTasks(); - /** * Get snapshot status. */ diff --git a/server/src/main/java/org/elasticsearch/client/internal/support/AbstractClient.java b/server/src/main/java/org/elasticsearch/client/internal/support/AbstractClient.java index dd230f4d8e12d..075d1a4bb1e66 100644 --- a/server/src/main/java/org/elasticsearch/client/internal/support/AbstractClient.java +++ b/server/src/main/java/org/elasticsearch/client/internal/support/AbstractClient.java @@ -118,10 +118,6 @@ import org.elasticsearch.action.admin.cluster.storedscripts.PutStoredScriptRequestBuilder; import org.elasticsearch.action.admin.cluster.storedscripts.TransportDeleteStoredScriptAction; import org.elasticsearch.action.admin.cluster.storedscripts.TransportPutStoredScriptAction; -import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksRequest; -import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksRequestBuilder; -import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse; -import org.elasticsearch.action.admin.cluster.tasks.TransportPendingClusterTasksAction; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder; import org.elasticsearch.action.admin.indices.alias.TransportIndicesAliasesAction; @@ -851,16 +847,6 @@ public ClusterSearchShardsRequestBuilder prepareSearchShards(String... indices) return new ClusterSearchShardsRequestBuilder(this).setIndices(indices); } - @Override - public PendingClusterTasksRequestBuilder preparePendingClusterTasks() { - return new PendingClusterTasksRequestBuilder(this); - } - - @Override - public void pendingClusterTasks(PendingClusterTasksRequest request, ActionListener listener) { - execute(TransportPendingClusterTasksAction.TYPE, request, listener); - } - @Override public void putRepository(PutRepositoryRequest request, ActionListener listener) { execute(TransportPutRepositoryAction.TYPE, request, listener); diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestPendingClusterTasksAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestPendingClusterTasksAction.java index 8442507c36b1c..e9f9b9bf4327d 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestPendingClusterTasksAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestPendingClusterTasksAction.java @@ -9,6 +9,7 @@ package org.elasticsearch.rest.action.admin.cluster; import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksRequest; +import org.elasticsearch.action.admin.cluster.tasks.TransportPendingClusterTasksAction; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; @@ -39,8 +40,10 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC PendingClusterTasksRequest pendingClusterTasksRequest = new PendingClusterTasksRequest(); pendingClusterTasksRequest.masterNodeTimeout(request.paramAsTime("master_timeout", pendingClusterTasksRequest.masterNodeTimeout())); pendingClusterTasksRequest.local(request.paramAsBoolean("local", pendingClusterTasksRequest.local())); - return channel -> client.admin() - .cluster() - .pendingClusterTasks(pendingClusterTasksRequest, new RestChunkedToXContentListener<>(channel)); + return channel -> client.execute( + TransportPendingClusterTasksAction.TYPE, + pendingClusterTasksRequest, + new RestChunkedToXContentListener<>(channel) + ); } } diff --git a/server/src/main/java/org/elasticsearch/rest/action/cat/RestPendingClusterTasksAction.java b/server/src/main/java/org/elasticsearch/rest/action/cat/RestPendingClusterTasksAction.java index 7408bf3ab229e..19ebbd2f19df4 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/cat/RestPendingClusterTasksAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/cat/RestPendingClusterTasksAction.java @@ -10,6 +10,7 @@ import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksRequest; import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse; +import org.elasticsearch.action.admin.cluster.tasks.TransportPendingClusterTasksAction; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.cluster.service.PendingClusterTask; import org.elasticsearch.common.Table; @@ -46,15 +47,17 @@ public RestChannelConsumer doCatRequest(final RestRequest request, final NodeCli PendingClusterTasksRequest pendingClusterTasksRequest = new PendingClusterTasksRequest(); pendingClusterTasksRequest.masterNodeTimeout(request.paramAsTime("master_timeout", pendingClusterTasksRequest.masterNodeTimeout())); pendingClusterTasksRequest.local(request.paramAsBoolean("local", pendingClusterTasksRequest.local())); - return channel -> client.admin() - .cluster() - .pendingClusterTasks(pendingClusterTasksRequest, new RestResponseListener(channel) { + return channel -> client.execute( + TransportPendingClusterTasksAction.TYPE, + pendingClusterTasksRequest, + new RestResponseListener<>(channel) { @Override public RestResponse buildResponse(PendingClusterTasksResponse pendingClusterTasks) throws Exception { Table tab = buildTable(request, pendingClusterTasks); return RestTable.buildResponse(tab, channel); } - }); + } + ); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index cf8d846c9c9e7..72fa522686632 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -29,7 +29,9 @@ import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksRequest; import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse; +import org.elasticsearch.action.admin.cluster.tasks.TransportPendingClusterTasksAction; import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; import org.elasticsearch.action.admin.indices.flush.FlushResponse; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse; @@ -860,7 +862,10 @@ public void waitNoPendingTasksOnAll() throws Exception { for (Client client : clients()) { ClusterHealthResponse clusterHealth = client.admin().cluster().prepareHealth().setLocal(true).get(); assertThat("client " + client + " still has in flight fetch", clusterHealth.getNumberOfInFlightFetch(), equalTo(0)); - PendingClusterTasksResponse pendingTasks = client.admin().cluster().preparePendingClusterTasks().setLocal(true).get(); + PendingClusterTasksResponse pendingTasks = client.execute( + TransportPendingClusterTasksAction.TYPE, + new PendingClusterTasksRequest().local(true) + ).get(); assertThat( "client " + client + " still has pending tasks " + pendingTasks, pendingTasks.pendingTasks(), @@ -977,8 +982,11 @@ private ClusterHealthStatus ensureColor( try (var listeners = new RefCountingListener(detailsFuture)) { clusterAdmin().prepareAllocationExplain().execute(listeners.acquire(allocationExplainRef::set)); clusterAdmin().prepareState().execute(listeners.acquire(clusterStateRef::set)); - clusterAdmin().preparePendingClusterTasks().execute(listeners.acquire(pendingTasksRef::set)); - + client().execute( + TransportPendingClusterTasksAction.TYPE, + new PendingClusterTasksRequest(), + listeners.acquire(pendingTasksRef::set) + ); try (var writer = new StringWriter()) { new HotThreads().busiestThreads(9999).ignoreIdleThreads(false).detect(writer); hotThreadsRef.set(writer.toString()); @@ -1040,7 +1048,7 @@ public ClusterHealthStatus waitForRelocation(ClusterHealthStatus status) { "waitForRelocation timed out (status={}), cluster state:\n{}\n{}", status, clusterAdmin().prepareState().get().getState(), - clusterAdmin().preparePendingClusterTasks().get() + getClusterPendingTasks() ); assertThat("timed out waiting for relocation", actionGet.isTimedOut(), equalTo(false)); } @@ -1050,6 +1058,18 @@ public ClusterHealthStatus waitForRelocation(ClusterHealthStatus status) { return actionGet.getStatus(); } + public static PendingClusterTasksResponse getClusterPendingTasks() { + return getClusterPendingTasks(client()); + } + + public static PendingClusterTasksResponse getClusterPendingTasks(Client client) { + try { + return client.execute(TransportPendingClusterTasksAction.TYPE, new PendingClusterTasksRequest()).get(10, TimeUnit.SECONDS); + } catch (Exception e) { + return fail(e); + } + } + /** * Waits until at least a give number of document is visible for searchers * @@ -1146,11 +1166,7 @@ public static DiscoveryNode waitAndGetHealthNode(InternalTestCluster internalClu * Prints the current cluster state as debug logging. */ public void logClusterState() { - logger.debug( - "cluster state:\n{}\n{}", - clusterAdmin().prepareState().get().getState(), - clusterAdmin().preparePendingClusterTasks().get() - ); + logger.debug("cluster state:\n{}\n{}", clusterAdmin().prepareState().get().getState(), getClusterPendingTasks()); } protected void ensureClusterSizeConsistency() { diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java index 1517571878fa2..b201d58ac0e23 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java @@ -421,7 +421,7 @@ public ClusterHealthStatus ensureGreen(TimeValue timeout, String... indices) { logger.info( "ensureGreen timed out, cluster state:\n{}\n{}", clusterAdmin().prepareState().get().getState(), - clusterAdmin().preparePendingClusterTasks().get() + ESIntegTestCase.getClusterPendingTasks(client()) ); assertThat("timed out waiting for green state", actionGet.isTimedOut(), equalTo(false)); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java index 7d9880e4b755c..ea4bc8c92047a 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java @@ -431,9 +431,9 @@ private ClusterHealthStatus ensureColor( {}""", method, leaderClient().admin().cluster().prepareState().get().getState(), - leaderClient().admin().cluster().preparePendingClusterTasks().get(), + ESIntegTestCase.getClusterPendingTasks(leaderClient()), followerClient().admin().cluster().prepareState().get().getState(), - followerClient().admin().cluster().preparePendingClusterTasks().get() + ESIntegTestCase.getClusterPendingTasks(followerClient()) ); HotThreads.logLocalHotThreads(logger, Level.INFO, "hot threads at timeout", ReferenceDocs.LOGGING); fail("timed out waiting for " + color + " state"); diff --git a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java index 826c958de4c18..f248da8a7842a 100644 --- a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java +++ b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java @@ -126,11 +126,7 @@ public boolean validateClusterForming() { } })).start(); - waitUntil( - () -> cluster.client().admin().cluster().preparePendingClusterTasks().get().pendingTasks().isEmpty(), - 60, - TimeUnit.SECONDS - ); + waitUntil(() -> getClusterPendingTasks(cluster.client()).pendingTasks().isEmpty(), 60, TimeUnit.SECONDS); ensureStableCluster(cluster.numDataAndMasterNodes()); final String targetIndex = "downsample-5m-" + sourceIndex; diff --git a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsampleClusterDisruptionIT.java b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsampleClusterDisruptionIT.java index d6549a9618d36..874b68a4bec55 100644 --- a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsampleClusterDisruptionIT.java +++ b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsampleClusterDisruptionIT.java @@ -203,7 +203,7 @@ public boolean validateClusterForming() { } })).start(); startDownsampleTaskDuringDisruption(sourceIndex, targetIndex, config, disruptionStart, disruptionEnd); - waitUntil(() -> cluster.client().admin().cluster().preparePendingClusterTasks().get().pendingTasks().isEmpty()); + waitUntil(() -> getClusterPendingTasks(cluster.client()).pendingTasks().isEmpty()); ensureStableCluster(cluster.numDataAndMasterNodes()); assertTargetIndex(cluster, sourceIndex, targetIndex, indexedDocs); } @@ -265,7 +265,7 @@ public boolean validateClusterForming() { })).start(); startDownsampleTaskDuringDisruption(sourceIndex, targetIndex, config, disruptionStart, disruptionEnd); - waitUntil(() -> cluster.client().admin().cluster().preparePendingClusterTasks().get().pendingTasks().isEmpty()); + waitUntil(() -> getClusterPendingTasks(cluster.client()).pendingTasks().isEmpty()); ensureStableCluster(cluster.numDataAndMasterNodes()); assertTargetIndex(cluster, sourceIndex, targetIndex, indexedDocs); } @@ -354,7 +354,7 @@ public boolean validateClusterForming() { })).start(); startDownsampleTaskDuringDisruption(sourceIndex, downsampleIndex, config, disruptionStart, disruptionEnd); - waitUntil(() -> cluster.client().admin().cluster().preparePendingClusterTasks().get().pendingTasks().isEmpty()); + waitUntil(() -> getClusterPendingTasks(cluster.client()).pendingTasks().isEmpty()); ensureStableCluster(cluster.numDataAndMasterNodes()); assertTargetIndex(cluster, sourceIndex, downsampleIndex, indexedDocs); } diff --git a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/ILMDownsampleDisruptionIT.java b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/ILMDownsampleDisruptionIT.java index a023f171ad209..0e84ed460cf5d 100644 --- a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/ILMDownsampleDisruptionIT.java +++ b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/ILMDownsampleDisruptionIT.java @@ -194,7 +194,7 @@ public boolean validateClusterForming() { final String targetIndex = "downsample-1h-" + sourceIndex; startDownsampleTaskViaIlm(sourceIndex, targetIndex, disruptionStart, disruptionEnd); - waitUntil(() -> cluster.client().admin().cluster().preparePendingClusterTasks().get().pendingTasks().isEmpty()); + waitUntil(() -> getClusterPendingTasks(cluster.client()).pendingTasks().isEmpty(), 60, TimeUnit.SECONDS); ensureStableCluster(cluster.numDataAndMasterNodes()); assertTargetIndex(cluster, targetIndex, indexedDocs); }