Skip to content

Commit

Permalink
Refactor pausable field plugin to have common codebase (elastic#118909)…
Browse files Browse the repository at this point in the history
… (elastic#119008)

(cherry picked from commit 65faabd)
  • Loading branch information
smalyshev authored Dec 19, 2024
1 parent 0b4937f commit 72f8530
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 200 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,15 @@
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.index.engine.SegmentsStats;
import org.elasticsearch.index.mapper.OnScriptError;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.ScriptPlugin;
import org.elasticsearch.script.LongFieldScript;
import org.elasticsearch.script.ScriptContext;
import org.elasticsearch.script.ScriptEngine;
import org.elasticsearch.search.lookup.SearchLookup;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.junit.Before;

import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

Expand All @@ -40,8 +29,6 @@
*/
public abstract class AbstractPausableIntegTestCase extends AbstractEsqlIntegTestCase {

private static final Logger LOGGER = LogManager.getLogger(AbstractPausableIntegTestCase.class);

protected static final Semaphore scriptPermits = new Semaphore(0);

protected int pageSize = -1;
Expand Down Expand Up @@ -108,53 +95,10 @@ public void setupIndex() throws IOException {
}
}

public static class PausableFieldPlugin extends Plugin implements ScriptPlugin {

public static class PausableFieldPlugin extends AbstractPauseFieldPlugin {
@Override
public ScriptEngine getScriptEngine(Settings settings, Collection<ScriptContext<?>> contexts) {
return new ScriptEngine() {
@Override
public String getType() {
return "pause";
}

@Override
@SuppressWarnings("unchecked")
public <FactoryType> FactoryType compile(
String name,
String code,
ScriptContext<FactoryType> context,
Map<String, String> params
) {
return (FactoryType) new LongFieldScript.Factory() {
@Override
public LongFieldScript.LeafFactory newFactory(
String fieldName,
Map<String, Object> params,
SearchLookup searchLookup,
OnScriptError onScriptError
) {
return ctx -> new LongFieldScript(fieldName, params, searchLookup, onScriptError, ctx) {
@Override
public void execute() {
try {
assertTrue(scriptPermits.tryAcquire(1, TimeUnit.MINUTES));
} catch (Exception e) {
throw new AssertionError(e);
}
LOGGER.debug("--> emitting value");
emit(1);
}
};
}
};
}

@Override
public Set<ScriptContext<?>> getSupportedContexts() {
return Set.of(LongFieldScript.CONTEXT);
}
};
protected boolean onWait() throws InterruptedException {
return scriptPermits.tryAcquire(1, TimeUnit.MINUTES);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.action;

import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.mapper.OnScriptError;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.ScriptPlugin;
import org.elasticsearch.script.LongFieldScript;
import org.elasticsearch.script.ScriptContext;
import org.elasticsearch.script.ScriptEngine;
import org.elasticsearch.search.lookup.SearchLookup;

import java.util.Collection;
import java.util.Map;
import java.util.Set;

import static org.junit.Assert.assertTrue;

/**
* A plugin that provides a script language "pause" that can be used to simulate slow running queries.
* See also {@link AbstractPausableIntegTestCase}.
*/
public abstract class AbstractPauseFieldPlugin extends Plugin implements ScriptPlugin {

// Called when the engine enters the execute() method.
protected void onStartExecute() {}

// Called when the engine needs to wait for further execution to be allowed.
protected abstract boolean onWait() throws InterruptedException;

@Override
public ScriptEngine getScriptEngine(Settings settings, Collection<ScriptContext<?>> contexts) {
return new ScriptEngine() {
@Override
public String getType() {
return "pause";
}

@Override
@SuppressWarnings("unchecked")
public <FactoryType> FactoryType compile(
String name,
String code,
ScriptContext<FactoryType> context,
Map<String, String> params
) {
if (context == LongFieldScript.CONTEXT) {
return (FactoryType) new LongFieldScript.Factory() {
@Override
public LongFieldScript.LeafFactory newFactory(
String fieldName,
Map<String, Object> params,
SearchLookup searchLookup,
OnScriptError onScriptError
) {
return ctx -> new LongFieldScript(fieldName, params, searchLookup, onScriptError, ctx) {
@Override
public void execute() {
onStartExecute();
try {
assertTrue(onWait());
} catch (InterruptedException e) {
throw new AssertionError(e);
}
emit(1);
}
};
}
};
}
throw new IllegalStateException("unsupported type " + context);
}

@Override
public Set<ScriptContext<?>> getSupportedContexts() {
return Set.of(LongFieldScript.CONTEXT);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,8 @@
import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.mapper.OnScriptError;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.ScriptPlugin;
import org.elasticsearch.script.LongFieldScript;
import org.elasticsearch.script.ScriptContext;
import org.elasticsearch.script.ScriptEngine;
import org.elasticsearch.search.lookup.SearchLookup;
import org.elasticsearch.test.AbstractMultiClustersTestCase;
import org.elasticsearch.test.XContentTestUtils;
import org.elasticsearch.transport.RemoteClusterAware;
Expand All @@ -44,7 +38,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -80,7 +73,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
plugins.add(EsqlPluginWithEnterpriseOrTrialLicense.class);
plugins.add(EsqlAsyncActionIT.LocalStateEsqlAsync.class); // allows the async_search DELETE action
plugins.add(InternalExchangePlugin.class);
plugins.add(PauseFieldPlugin.class);
plugins.add(SimplePauseFieldPlugin.class);
return plugins;
}

Expand All @@ -99,64 +92,7 @@ public List<Setting<?>> getSettings() {

@Before
public void resetPlugin() {
PauseFieldPlugin.allowEmitting = new CountDownLatch(1);
PauseFieldPlugin.startEmitting = new CountDownLatch(1);
}

public static class PauseFieldPlugin extends Plugin implements ScriptPlugin {
public static CountDownLatch startEmitting = new CountDownLatch(1);
public static CountDownLatch allowEmitting = new CountDownLatch(1);

@Override
public ScriptEngine getScriptEngine(Settings settings, Collection<ScriptContext<?>> contexts) {
return new ScriptEngine() {
@Override

public String getType() {
return "pause";
}

@Override
@SuppressWarnings("unchecked")
public <FactoryType> FactoryType compile(
String name,
String code,
ScriptContext<FactoryType> context,
Map<String, String> params
) {
if (context == LongFieldScript.CONTEXT) {
return (FactoryType) new LongFieldScript.Factory() {
@Override
public LongFieldScript.LeafFactory newFactory(
String fieldName,
Map<String, Object> params,
SearchLookup searchLookup,
OnScriptError onScriptError
) {
return ctx -> new LongFieldScript(fieldName, params, searchLookup, onScriptError, ctx) {
@Override
public void execute() {
startEmitting.countDown();
try {
assertTrue(allowEmitting.await(30, TimeUnit.SECONDS));
} catch (InterruptedException e) {
throw new AssertionError(e);
}
emit(1);
}
};
}
};
}
throw new IllegalStateException("unsupported type " + context);
}

@Override
public Set<ScriptContext<?>> getSupportedContexts() {
return Set.of(LongFieldScript.CONTEXT);
}
};
}
SimplePauseFieldPlugin.resetPlugin();
}

/**
Expand Down Expand Up @@ -184,7 +120,7 @@ public void testSuccessfulPathways() throws Exception {
}

// wait until we know that the query against 'remote-b:blocking' has started
PauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS);
SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS);

// wait until the query of 'cluster-a:logs-*' has finished (it is not blocked since we are not searching the 'blocking' index on it)
assertBusy(() -> {
Expand Down Expand Up @@ -234,7 +170,7 @@ public void testSuccessfulPathways() throws Exception {
}

// allow remoteB query to proceed
PauseFieldPlugin.allowEmitting.countDown();
SimplePauseFieldPlugin.allowEmitting.countDown();

// wait until both remoteB and local queries have finished
assertBusy(() -> {
Expand Down
Loading

0 comments on commit 72f8530

Please sign in to comment.