From 72f8530d45c88ce0ae3a07e17dd3dcf3e8007c10 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Wed, 18 Dec 2024 18:28:15 -0700 Subject: [PATCH] Refactor pausable field plugin to have common codebase (#118909) (#119008) (cherry picked from commit 65faabd08d79043d10a0351e57f1b0c6239862da) --- .../action/AbstractPausableIntegTestCase.java | 62 +------------ .../esql/action/AbstractPauseFieldPlugin.java | 86 +++++++++++++++++++ .../esql/action/CrossClusterAsyncQueryIT.java | 72 +--------------- .../action/CrossClustersCancellationIT.java | 80 ++--------------- .../esql/action/SimplePauseFieldPlugin.java | 36 ++++++++ 5 files changed, 136 insertions(+), 200 deletions(-) create mode 100644 x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractPauseFieldPlugin.java create mode 100644 x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/SimplePauseFieldPlugin.java diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractPausableIntegTestCase.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractPausableIntegTestCase.java index 8de65847c3f85..8054b260f0060 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractPausableIntegTestCase.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractPausableIntegTestCase.java @@ -10,26 +10,15 @@ import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.index.engine.SegmentsStats; -import org.elasticsearch.index.mapper.OnScriptError; -import org.elasticsearch.logging.LogManager; -import org.elasticsearch.logging.Logger; import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.plugins.ScriptPlugin; -import org.elasticsearch.script.LongFieldScript; -import org.elasticsearch.script.ScriptContext; -import org.elasticsearch.script.ScriptEngine; -import org.elasticsearch.search.lookup.SearchLookup; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.json.JsonXContent; import org.junit.Before; import java.io.IOException; import java.util.Collection; -import java.util.Map; -import java.util.Set; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -40,8 +29,6 @@ */ public abstract class AbstractPausableIntegTestCase extends AbstractEsqlIntegTestCase { - private static final Logger LOGGER = LogManager.getLogger(AbstractPausableIntegTestCase.class); - protected static final Semaphore scriptPermits = new Semaphore(0); protected int pageSize = -1; @@ -108,53 +95,10 @@ public void setupIndex() throws IOException { } } - public static class PausableFieldPlugin extends Plugin implements ScriptPlugin { - + public static class PausableFieldPlugin extends AbstractPauseFieldPlugin { @Override - public ScriptEngine getScriptEngine(Settings settings, Collection> contexts) { - return new ScriptEngine() { - @Override - public String getType() { - return "pause"; - } - - @Override - @SuppressWarnings("unchecked") - public FactoryType compile( - String name, - String code, - ScriptContext context, - Map params - ) { - return (FactoryType) new LongFieldScript.Factory() { - @Override - public LongFieldScript.LeafFactory newFactory( - String fieldName, - Map params, - SearchLookup searchLookup, - OnScriptError onScriptError - ) { - return ctx -> new LongFieldScript(fieldName, params, searchLookup, onScriptError, ctx) { - @Override - public void execute() { - try { - assertTrue(scriptPermits.tryAcquire(1, TimeUnit.MINUTES)); - } catch (Exception e) { - throw new AssertionError(e); - } - LOGGER.debug("--> emitting value"); - emit(1); - } - }; - } - }; - } - - @Override - public Set> getSupportedContexts() { - return Set.of(LongFieldScript.CONTEXT); - } - }; + protected boolean onWait() throws InterruptedException { + return scriptPermits.tryAcquire(1, TimeUnit.MINUTES); } } } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractPauseFieldPlugin.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractPauseFieldPlugin.java new file mode 100644 index 0000000000000..5554f7e571dfb --- /dev/null +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractPauseFieldPlugin.java @@ -0,0 +1,86 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.action; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.mapper.OnScriptError; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.ScriptPlugin; +import org.elasticsearch.script.LongFieldScript; +import org.elasticsearch.script.ScriptContext; +import org.elasticsearch.script.ScriptEngine; +import org.elasticsearch.search.lookup.SearchLookup; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertTrue; + +/** + * A plugin that provides a script language "pause" that can be used to simulate slow running queries. + * See also {@link AbstractPausableIntegTestCase}. + */ +public abstract class AbstractPauseFieldPlugin extends Plugin implements ScriptPlugin { + + // Called when the engine enters the execute() method. + protected void onStartExecute() {} + + // Called when the engine needs to wait for further execution to be allowed. + protected abstract boolean onWait() throws InterruptedException; + + @Override + public ScriptEngine getScriptEngine(Settings settings, Collection> contexts) { + return new ScriptEngine() { + @Override + public String getType() { + return "pause"; + } + + @Override + @SuppressWarnings("unchecked") + public FactoryType compile( + String name, + String code, + ScriptContext context, + Map params + ) { + if (context == LongFieldScript.CONTEXT) { + return (FactoryType) new LongFieldScript.Factory() { + @Override + public LongFieldScript.LeafFactory newFactory( + String fieldName, + Map params, + SearchLookup searchLookup, + OnScriptError onScriptError + ) { + return ctx -> new LongFieldScript(fieldName, params, searchLookup, onScriptError, ctx) { + @Override + public void execute() { + onStartExecute(); + try { + assertTrue(onWait()); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + emit(1); + } + }; + } + }; + } + throw new IllegalStateException("unsupported type " + context); + } + + @Override + public Set> getSupportedContexts() { + return Set.of(LongFieldScript.CONTEXT); + } + }; + } +} diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryIT.java index ea78ee2e3cfbd..22cdb3f064dd6 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryIT.java @@ -19,14 +19,8 @@ import org.elasticsearch.compute.operator.exchange.ExchangeService; import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; -import org.elasticsearch.index.mapper.OnScriptError; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.plugins.ScriptPlugin; -import org.elasticsearch.script.LongFieldScript; -import org.elasticsearch.script.ScriptContext; -import org.elasticsearch.script.ScriptEngine; -import org.elasticsearch.search.lookup.SearchLookup; import org.elasticsearch.test.AbstractMultiClustersTestCase; import org.elasticsearch.test.XContentTestUtils; import org.elasticsearch.transport.RemoteClusterAware; @@ -44,7 +38,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -80,7 +73,7 @@ protected Collection> nodePlugins(String clusterAlias) { plugins.add(EsqlPluginWithEnterpriseOrTrialLicense.class); plugins.add(EsqlAsyncActionIT.LocalStateEsqlAsync.class); // allows the async_search DELETE action plugins.add(InternalExchangePlugin.class); - plugins.add(PauseFieldPlugin.class); + plugins.add(SimplePauseFieldPlugin.class); return plugins; } @@ -99,64 +92,7 @@ public List> getSettings() { @Before public void resetPlugin() { - PauseFieldPlugin.allowEmitting = new CountDownLatch(1); - PauseFieldPlugin.startEmitting = new CountDownLatch(1); - } - - public static class PauseFieldPlugin extends Plugin implements ScriptPlugin { - public static CountDownLatch startEmitting = new CountDownLatch(1); - public static CountDownLatch allowEmitting = new CountDownLatch(1); - - @Override - public ScriptEngine getScriptEngine(Settings settings, Collection> contexts) { - return new ScriptEngine() { - @Override - - public String getType() { - return "pause"; - } - - @Override - @SuppressWarnings("unchecked") - public FactoryType compile( - String name, - String code, - ScriptContext context, - Map params - ) { - if (context == LongFieldScript.CONTEXT) { - return (FactoryType) new LongFieldScript.Factory() { - @Override - public LongFieldScript.LeafFactory newFactory( - String fieldName, - Map params, - SearchLookup searchLookup, - OnScriptError onScriptError - ) { - return ctx -> new LongFieldScript(fieldName, params, searchLookup, onScriptError, ctx) { - @Override - public void execute() { - startEmitting.countDown(); - try { - assertTrue(allowEmitting.await(30, TimeUnit.SECONDS)); - } catch (InterruptedException e) { - throw new AssertionError(e); - } - emit(1); - } - }; - } - }; - } - throw new IllegalStateException("unsupported type " + context); - } - - @Override - public Set> getSupportedContexts() { - return Set.of(LongFieldScript.CONTEXT); - } - }; - } + SimplePauseFieldPlugin.resetPlugin(); } /** @@ -184,7 +120,7 @@ public void testSuccessfulPathways() throws Exception { } // wait until we know that the query against 'remote-b:blocking' has started - PauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS); + SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS); // wait until the query of 'cluster-a:logs-*' has finished (it is not blocked since we are not searching the 'blocking' index on it) assertBusy(() -> { @@ -234,7 +170,7 @@ public void testSuccessfulPathways() throws Exception { } // allow remoteB query to proceed - PauseFieldPlugin.allowEmitting.countDown(); + SimplePauseFieldPlugin.allowEmitting.countDown(); // wait until both remoteB and local queries have finished assertBusy(() -> { diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java index 68bfc60202365..8dcc1ec57e319 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java @@ -15,18 +15,11 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.common.settings.Setting; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.compute.operator.DriverTaskRunner; import org.elasticsearch.compute.operator.exchange.ExchangeService; import org.elasticsearch.core.TimeValue; -import org.elasticsearch.index.mapper.OnScriptError; import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.plugins.ScriptPlugin; -import org.elasticsearch.script.LongFieldScript; -import org.elasticsearch.script.ScriptContext; -import org.elasticsearch.script.ScriptEngine; -import org.elasticsearch.search.lookup.SearchLookup; import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.test.AbstractMultiClustersTestCase; import org.elasticsearch.transport.TransportService; @@ -38,9 +31,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList; @@ -63,7 +53,7 @@ protected Collection> nodePlugins(String clusterAlias) { List> plugins = new ArrayList<>(super.nodePlugins(clusterAlias)); plugins.add(EsqlPluginWithEnterpriseOrTrialLicense.class); plugins.add(InternalExchangePlugin.class); - plugins.add(PauseFieldPlugin.class); + plugins.add(SimplePauseFieldPlugin.class); return plugins; } @@ -82,63 +72,7 @@ public List> getSettings() { @Before public void resetPlugin() { - PauseFieldPlugin.allowEmitting = new CountDownLatch(1); - PauseFieldPlugin.startEmitting = new CountDownLatch(1); - } - - public static class PauseFieldPlugin extends Plugin implements ScriptPlugin { - public static CountDownLatch startEmitting = new CountDownLatch(1); - public static CountDownLatch allowEmitting = new CountDownLatch(1); - - @Override - public ScriptEngine getScriptEngine(Settings settings, Collection> contexts) { - return new ScriptEngine() { - @Override - public String getType() { - return "pause"; - } - - @Override - @SuppressWarnings("unchecked") - public FactoryType compile( - String name, - String code, - ScriptContext context, - Map params - ) { - if (context == LongFieldScript.CONTEXT) { - return (FactoryType) new LongFieldScript.Factory() { - @Override - public LongFieldScript.LeafFactory newFactory( - String fieldName, - Map params, - SearchLookup searchLookup, - OnScriptError onScriptError - ) { - return ctx -> new LongFieldScript(fieldName, params, searchLookup, onScriptError, ctx) { - @Override - public void execute() { - startEmitting.countDown(); - try { - assertTrue(allowEmitting.await(30, TimeUnit.SECONDS)); - } catch (InterruptedException e) { - throw new AssertionError(e); - } - emit(1); - } - }; - } - }; - } - throw new IllegalStateException("unsupported type " + context); - } - - @Override - public Set> getSupportedContexts() { - return Set.of(LongFieldScript.CONTEXT); - } - }; - } + SimplePauseFieldPlugin.resetPlugin(); } private void createRemoteIndex(int numDocs) throws Exception { @@ -169,7 +103,7 @@ public void testCancel() throws Exception { request.pragmas(randomPragmas()); PlainActionFuture requestFuture = new PlainActionFuture<>(); client().execute(EsqlQueryAction.INSTANCE, request, requestFuture); - assertTrue(PauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS)); + assertTrue(SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS)); List rootTasks = new ArrayList<>(); assertBusy(() -> { List tasks = client().admin().cluster().prepareListTasks().setActions(EsqlQueryAction.NAME).get().getTasks(); @@ -192,7 +126,7 @@ public void testCancel() throws Exception { } }); } finally { - PauseFieldPlugin.allowEmitting.countDown(); + SimplePauseFieldPlugin.allowEmitting.countDown(); } Exception error = expectThrows(Exception.class, requestFuture::actionGet); assertThat(error.getMessage(), containsString("proxy timeout")); @@ -223,7 +157,7 @@ public void testSameRemoteClusters() throws Exception { assertThat(tasks, hasSize(moreClusters + 1)); }); } finally { - PauseFieldPlugin.allowEmitting.countDown(); + SimplePauseFieldPlugin.allowEmitting.countDown(); } try (EsqlQueryResponse resp = future.actionGet(30, TimeUnit.SECONDS)) { // TODO: This produces incorrect results because data on the remote cluster is processed multiple times. @@ -244,7 +178,7 @@ public void testTasks() throws Exception { request.query("FROM *:test | STATS total=sum(const) | LIMIT 1"); request.pragmas(randomPragmas()); ActionFuture requestFuture = client().execute(EsqlQueryAction.INSTANCE, request); - assertTrue(PauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS)); + assertTrue(SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS)); try { assertBusy(() -> { List clusterTasks = client(REMOTE_CLUSTER).admin() @@ -270,7 +204,7 @@ public void testTasks() throws Exception { \\_ExchangeSinkOperator""")); }); } finally { - PauseFieldPlugin.allowEmitting.countDown(); + SimplePauseFieldPlugin.allowEmitting.countDown(); } requestFuture.actionGet(30, TimeUnit.SECONDS).close(); } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/SimplePauseFieldPlugin.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/SimplePauseFieldPlugin.java new file mode 100644 index 0000000000000..3ba73dd9a402e --- /dev/null +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/SimplePauseFieldPlugin.java @@ -0,0 +1,36 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.action; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * A plugin that provides a script language "pause" that can be used to simulate slow running queries. + * This implementation allows to know when it arrives at execute() via startEmitting and to allow the execution to proceed + * via allowEmitting. + */ +public class SimplePauseFieldPlugin extends AbstractPauseFieldPlugin { + public static CountDownLatch startEmitting = new CountDownLatch(1); + public static CountDownLatch allowEmitting = new CountDownLatch(1); + + public static void resetPlugin() { + allowEmitting = new CountDownLatch(1); + startEmitting = new CountDownLatch(1); + } + + @Override + public void onStartExecute() { + startEmitting.countDown(); + } + + @Override + public boolean onWait() throws InterruptedException { + return allowEmitting.await(30, TimeUnit.SECONDS); + } +}