From dd2ef5604b514d6c145cdc344db07da78a452a6f Mon Sep 17 00:00:00 2001 From: Claus Stadler Date: Tue, 5 Nov 2024 18:02:59 +0100 Subject: [PATCH] GH-2821: Cancel Signal unavailable without timeouts - breaks manual abort() --- .../jena/http/sys/ExecUpdateHTTPBuilder.java | 15 ++- .../jena/sparql/exec/QueryExecDataset.java | 27 ++-- .../jena/sparql/exec/UpdateExecAdapter.java | 6 + .../sparql/exec/UpdateExecDatasetBuilder.java | 25 ++-- .../sparql/exec/UpdateExecutionAdapter.java | 5 + .../jena/sparql/exec/http/UpdateExecHTTP.java | 8 +- .../jena/sparql/modify/UpdateEngineBase.java | 21 ++- .../sparql/modify/UpdateEngineWorker.java | 2 +- .../sparql/modify/UpdateProcessorBase.java | 5 + .../apache/jena/update/UpdateProcessor.java | 4 + .../sparql/api/TestQueryExecutionCancel.java | 127 +++++++++++++++++- 11 files changed, 198 insertions(+), 47 deletions(-) diff --git a/jena-arq/src/main/java/org/apache/jena/http/sys/ExecUpdateHTTPBuilder.java b/jena-arq/src/main/java/org/apache/jena/http/sys/ExecUpdateHTTPBuilder.java index 19bbe0f8011..856b3813985 100644 --- a/jena-arq/src/main/java/org/apache/jena/http/sys/ExecUpdateHTTPBuilder.java +++ b/jena-arq/src/main/java/org/apache/jena/http/sys/ExecUpdateHTTPBuilder.java @@ -31,6 +31,7 @@ import org.apache.jena.sparql.exec.http.UpdateSendMode; import org.apache.jena.sparql.syntax.syntaxtransform.UpdateTransformOps; import org.apache.jena.sparql.util.Context; +import org.apache.jena.sparql.util.ContextAccumulator; import org.apache.jena.sparql.util.Symbol; import org.apache.jena.sys.JenaSystem; import org.apache.jena.update.Update; @@ -146,7 +147,7 @@ public String buildString() { protected UpdateSendMode sendMode = UpdateSendMode.systemDefault; protected List usingGraphURIs = null; protected List usingNamedGraphURIs = null; - protected Context context = null; + private ContextAccumulator contextAcc = ContextAccumulator.newBuilder(()->ARQ.getContext()); // Uses query rewrite to replace variables by values. protected Map substitutionMap = new HashMap<>(); @@ -275,25 +276,25 @@ public Y context(Context context) { if ( context == null ) return thisBuilder(); ensureContext(); - this.context.setAll(context); + this.contextAcc.context(context); return thisBuilder(); } public Y set(Symbol symbol, Object value) { ensureContext(); - this.context.set(symbol, value); + this.contextAcc.set(symbol, value); return thisBuilder(); } public Y set(Symbol symbol, boolean value) { ensureContext(); - this.context.set(symbol, value); + this.contextAcc.set(symbol, value); return thisBuilder(); } private void ensureContext() { - if ( context == null ) - context = Context.create(); +// if ( context == null ) +// context = Context.create(); } public X build() { @@ -322,7 +323,7 @@ public X build() { // If the UpdateRequest object wasn't built until now then build the string instead. String updateStringActual = updateActual == null ? updateEltAcc.buildString() : null; - Context cxt = (context!=null) ? context : ARQ.getContext().copy(); + Context cxt = contextAcc.context(); return buildX(hClient, updateActual, updateStringActual, cxt); } diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/QueryExecDataset.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/QueryExecDataset.java index e2078a22476..df797c8e774 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/exec/QueryExecDataset.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/QueryExecDataset.java @@ -457,6 +457,23 @@ private void startQueryIteratorActual() { execInit(); + // JENA-2821 - Unconditionally provide a cancel signal because manual abort via QueryExec.abort() + // may be triggered any time, even if no timeouts were configured. + // Prior to this issue, the cancel signal was only provided when timeouts were configured. + + // The following note is older: + // JENA-2141 - the timeout can go off while building the query iterator structure. + // In this case, use a signal passed through the context. + // We don't know if getPlan().iterator() does a lot of work or not + // (ideally it shouldn't start executing the query but in some sub-systems + // it might be necessary) + // + // This applies to the time to first result because to get the first result, the + // queryIterator must have been built. So it does not apply for the second + // stage of N,-1 or N,M. + context.set(ARQConstants.symCancelQuery, cancelSignal); + + /* Timeouts: * -1,-1 No timeouts * N, same as -1,N Overall timeout only. No wrapper needed. @@ -470,16 +487,6 @@ private void startQueryIteratorActual() { return; } - // JENA-2141 - the timeout can go off while building the query iterator structure. - // In this case, use a signal passed through the context. - // We don't know if getPlan().iterator() does a lot of work or not - // (ideally it shouldn't start executing the query but in some sub-systems - // it might be necessary) - // - // This applies to the time to first result because to get the first result, the - // queryIterator must have been built. So it does not apply for the second - // stage of N,-1 or N,M. - context.set(ARQConstants.symCancelQuery, cancelSignal); TimeoutCallback callback = new TimeoutCallback() ; expectedCallback.set(callback) ; diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecAdapter.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecAdapter.java index fd23e63af91..3e51b36507b 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecAdapter.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecAdapter.java @@ -18,6 +18,7 @@ package org.apache.jena.sparql.exec; +import org.apache.jena.sparql.util.Context; import org.apache.jena.update.UpdateExecution; public class UpdateExecAdapter implements UpdateExec { @@ -39,4 +40,9 @@ protected UpdateExecAdapter(UpdateExecution updateProc) { @Override public void execute() { updateProc.execute(); } + + @Override + public Context getContext() { + return updateProc.getContext(); + } } diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecDatasetBuilder.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecDatasetBuilder.java index 52979eedf7c..15c61bd1aff 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecDatasetBuilder.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecDatasetBuilder.java @@ -23,6 +23,7 @@ import java.util.Objects; import org.apache.jena.graph.Node; +import org.apache.jena.query.ARQ; import org.apache.jena.query.Query; import org.apache.jena.sparql.core.DatasetGraph; import org.apache.jena.sparql.core.Var; @@ -31,6 +32,7 @@ import org.apache.jena.sparql.modify.UpdateEngineRegistry; import org.apache.jena.sparql.syntax.syntaxtransform.UpdateTransformOps; import org.apache.jena.sparql.util.Context; +import org.apache.jena.sparql.util.ContextAccumulator; import org.apache.jena.sparql.util.Symbol; import org.apache.jena.update.Update; import org.apache.jena.update.UpdateException; @@ -43,7 +45,11 @@ public class UpdateExecDatasetBuilder implements UpdateExecBuilder { private DatasetGraph dataset = null; private Query query = null; - private Context context = null; + // private Context context = null; + private ContextAccumulator contextAcc = + ContextAccumulator.newBuilder(()->ARQ.getContext(), ()->Context.fromDataset(dataset)); + + // Uses query rewrite to replace variables by values. private Map substitutionMap = null; @@ -96,29 +102,24 @@ public UpdateExecDatasetBuilder dataset(DatasetGraph dsg) { public UpdateExecDatasetBuilder context(Context context) { if ( context == null ) return this; - ensureContext(); - this.context.putAll(context); + this.contextAcc.context(context); return this; } @Override public UpdateExecDatasetBuilder set(Symbol symbol, Object value) { - ensureContext(); - this.context.set(symbol, value); + this.contextAcc.set(symbol, value); return this; } @Override public UpdateExecDatasetBuilder set(Symbol symbol, boolean value) { - ensureContext(); - this.context.set(symbol, value); + this.contextAcc.set(symbol, value); return this; } - - private void ensureContext() { - if ( context == null ) - context = new Context(); + public Context getContext() { + return contextAcc.context(); } @Override @@ -157,7 +158,7 @@ public UpdateExec build() { if ( substitutionMap != null && ! substitutionMap.isEmpty() ) actualUpdate = UpdateTransformOps.transform(actualUpdate, substitutionMap); - Context cxt = Context.setupContextForDataset(context, dataset); + Context cxt = getContext(); UpdateEngineFactory f = UpdateEngineRegistry.get().find(dataset, cxt); if ( f == null ) throw new UpdateException("Failed to find an UpdateEngine"); diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecutionAdapter.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecutionAdapter.java index 967a4731669..8a7d7af62b1 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecutionAdapter.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecutionAdapter.java @@ -18,6 +18,7 @@ package org.apache.jena.sparql.exec; +import org.apache.jena.sparql.util.Context; import org.apache.jena.update.UpdateExecution; public class UpdateExecutionAdapter implements UpdateExecution { @@ -40,4 +41,8 @@ protected UpdateExecutionAdapter(UpdateExec updateExec) { @Override public void execute() { updateExec.execute(); } + @Override + public Context getContext() { + return updateExec.getContext(); + } } diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/UpdateExecHTTP.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/UpdateExecHTTP.java index e39fac4d933..80644d0667f 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/UpdateExecHTTP.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/UpdateExecHTTP.java @@ -79,10 +79,10 @@ public static UpdateExecHTTPBuilder service(String endpointURL) { this.sendMode = sendMode; } -// @Override -// public Context getContext() { -// return null; -// } + @Override + public Context getContext() { + return context; + } // // @Override // public DatasetGraph getDatasetGraph() { diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateEngineBase.java b/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateEngineBase.java index 18498224c3b..c521ea63c83 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateEngineBase.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateEngineBase.java @@ -18,7 +18,6 @@ package org.apache.jena.sparql.modify; -import org.apache.jena.query.ARQ ; import org.apache.jena.sparql.ARQConstants ; import org.apache.jena.sparql.core.DatasetGraph ; import org.apache.jena.sparql.engine.binding.Binding ; @@ -39,18 +38,16 @@ public UpdateEngineBase(DatasetGraph datasetGraph, this.inputBinding = inputBinding ; this.context = setupContext(context, datasetGraph) ; } - - private static Context setupContext(Context context, DatasetGraph dataset) + + private Context setupContext(Context cxt, DatasetGraph dataset) { - // To many copies? - if ( context == null ) // Copy of global context to protect against change. - context = ARQ.getContext() ; - context = context.copy() ; + // The following setup is effectively the same as in QueryEngineBase + Context result = cxt; + + if ( result == null ) + result = Context.setupContextForDataset(cxt, dataset); - if ( dataset.getContext() != null ) - context.putAll(dataset.getContext()) ; - - context.set(ARQConstants.sysCurrentTime, NodeFactoryExtra.nowAsDateTime()) ; - return context ; + result.set(ARQConstants.sysCurrentTime, NodeFactoryExtra.nowAsDateTime()) ; + return result ; } } diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateEngineWorker.java b/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateEngineWorker.java index 5c4eeab04bf..53cc9685716 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateEngineWorker.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateEngineWorker.java @@ -537,7 +537,7 @@ protected static Iterator evalBindings(Query query, DatasetGraph dsg, B } // Not QueryExecDataset.dataset(...) because of initialBinding. - QueryExecDatasetBuilder builder = QueryExecDatasetBuilder.create().dataset(dsg).query(query); + QueryExecDatasetBuilder builder = QueryExecDatasetBuilder.create().dataset(dsg).query(query).context(context); if ( inputBinding != null ) { // Must use initialBinding - it puts the input in the results, unlike substitution. builder.initialBinding(inputBinding); diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateProcessorBase.java b/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateProcessorBase.java index e4b1b53c5eb..9abb33dccfd 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateProcessorBase.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateProcessorBase.java @@ -62,4 +62,9 @@ public void execute() { uProc.finishRequest() ; } } + + @Override + public Context getContext() { + return context; + } } diff --git a/jena-arq/src/main/java/org/apache/jena/update/UpdateProcessor.java b/jena-arq/src/main/java/org/apache/jena/update/UpdateProcessor.java index 71ffb5a63ce..b9e6f42909c 100644 --- a/jena-arq/src/main/java/org/apache/jena/update/UpdateProcessor.java +++ b/jena-arq/src/main/java/org/apache/jena/update/UpdateProcessor.java @@ -18,6 +18,8 @@ package org.apache.jena.update; +import org.apache.jena.sparql.util.Context; + /** * An instance of a execution of an UpdateRequest. * Applies to UpdateExec (GPI) and UpdateExecution (API). @@ -26,4 +28,6 @@ public interface UpdateProcessor { /** Execute */ public void execute() ; + + public Context getContext(); } diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/api/TestQueryExecutionCancel.java b/jena-arq/src/test/java/org/apache/jena/sparql/api/TestQueryExecutionCancel.java index 1815e1a3060..46141db7e2e 100644 --- a/jena-arq/src/test/java/org/apache/jena/sparql/api/TestQueryExecutionCancel.java +++ b/jena-arq/src/test/java/org/apache/jena/sparql/api/TestQueryExecutionCancel.java @@ -31,25 +31,43 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; +import org.apache.jena.atlas.iterator.Iter; import org.apache.jena.graph.Graph; import org.apache.jena.graph.NodeFactory; -import org.apache.jena.query.* ; +import org.apache.jena.query.ARQ; +import org.apache.jena.query.Query; +import org.apache.jena.query.QueryCancelledException; +import org.apache.jena.query.QueryExecution; +import org.apache.jena.query.QueryExecutionFactory; +import org.apache.jena.query.QueryFactory; +import org.apache.jena.query.ResultSet; +import org.apache.jena.query.ResultSetFormatter; import org.apache.jena.rdf.model.Model ; import org.apache.jena.rdf.model.ModelFactory; import org.apache.jena.rdf.model.Property ; import org.apache.jena.rdf.model.Resource ; +import org.apache.jena.sparql.ARQConstants; import org.apache.jena.sparql.core.DatasetGraph; import org.apache.jena.sparql.core.DatasetGraphFactory; +import org.apache.jena.sparql.engine.ExecutionContext; import org.apache.jena.sparql.exec.QueryExec; +import org.apache.jena.sparql.exec.UpdateExec; +import org.apache.jena.sparql.exec.UpdateExecBuilder; +import org.apache.jena.sparql.expr.NodeValue; +import org.apache.jena.sparql.function.FunctionBase0; +import org.apache.jena.sparql.function.FunctionEnv; import org.apache.jena.sparql.function.FunctionRegistry ; import org.apache.jena.sparql.function.library.wait ; import org.apache.jena.sparql.graph.GraphFactory ; import org.apache.jena.sparql.sse.SSE; +import org.apache.jena.sparql.util.Context; +import org.apache.jena.sparql.util.Symbol; import org.junit.AfterClass ; import org.junit.Assert; import org.junit.BeforeClass ; @@ -229,6 +247,113 @@ public void test_cancel_json() { cancellationTest("JSON {\":a\": \"b\"} WHERE {}", exec->exec.execJson().get(0)); } + /** + * Registers the function which returns its value if present. + * A RuntimeException is raised if there is no cancel signal in the execution context. + * + * Note: Prior to jena-5.3.0 the cancel signal was only set when configuring a timeout. + * However, implementations may also want to react to manual abort via {@link QueryExec#abort()}. + */ + public static FunctionRegistry registerCancelSignalFunction(FunctionRegistry fnReg) { + fnReg.put("urn:cancelSignal", iri -> new FunctionBase0() { + @Override + protected NodeValue exec(List args, FunctionEnv env) { + ExecutionContext execCxt = (ExecutionContext)env; + AtomicBoolean cancelSignal = execCxt.getCancelSignal(); + if (cancelSignal == null) { + throw new RuntimeException("No cancel signal in execution context."); + } + return NodeValue.makeBoolean(cancelSignal.get()); + } + + @Override + public NodeValue exec() { + throw new IllegalStateException("Should never be called"); + } + }); + + return fnReg; + } + + /** Set cancel signal function via {@link QueryExecBuilder#set(Symbol, Object). */ + @Test + public void test_cancel_signal_query_1() { + DatasetGraph dsg = DatasetGraphFactory.create(); + FunctionRegistry fnReg = registerCancelSignalFunction(new FunctionRegistry()); + try (QueryExec qe = QueryExec.dataset(dsg).query("SELECT (() AS ?foobar) { }") + .set(ARQConstants.registryFunctions, fnReg) + .build()) { + Assert.assertEquals(1, ResultSetFormatter.consume(ResultSet.adapt(qe.select()))); + } + } + + /** Set cancel signal function via {@link QueryExecBuilder#context(Context). */ + @Test + public void test_cancel_signal_query_2() { + DatasetGraph dsg = DatasetGraphFactory.create(); + Context cxt = ARQ.getContext().copy(); + FunctionRegistry fnReg = registerCancelSignalFunction(new FunctionRegistry()); + FunctionRegistry.set(cxt, fnReg); + try (QueryExec qe = QueryExec.dataset(dsg).query("SELECT (() AS ?foobar) { }").context(cxt).build()) { + Assert.assertEquals(1, ResultSetFormatter.consume(ResultSet.adapt(qe.select()))); + } + } + + /** Set cancel signal function via {@link QueryExec#getContext()}. */ + @Test + public void test_cancel_signal_query_3() { + DatasetGraph dsg = DatasetGraphFactory.create(); + try (QueryExec qe = QueryExec.dataset(dsg).query("SELECT (() AS ?foobar) { }").build()) { + FunctionRegistry fnReg = registerCancelSignalFunction(new FunctionRegistry()); + FunctionRegistry.set(qe.getContext(), fnReg); + ResultSetFormatter.consume(ResultSet.adapt(qe.select())); + } + } + + /** Set cancel signal function via {@link UpdateExecBuilder#set(Symbol, Object)}. */ + @Test + public void test_cancel_signal_update_1() { + DatasetGraph dsg = DatasetGraphFactory.create(); + FunctionRegistry fnReg = registerCancelSignalFunction(new FunctionRegistry()); + UpdateExec ue = UpdateExec.dataset(dsg) + .update("INSERT {

?o } WHERE { BIND(() AS ?o) }") + .set(ARQConstants.registryFunctions, fnReg) + .set(Symbol.create("foo:bar:baz"), true) + .build(); + ue.execute(); + Assert.assertEquals(1, Iter.count(dsg.find())); + } + + /** Set cancel signal function via {@link UpdateExecBuilder#context(Context). */ + @Test + public void test_cancel_signal_update_2() { + DatasetGraph dsg = DatasetGraphFactory.create(); + Context cxt = ARQ.getContext().copy(); + FunctionRegistry fnReg = registerCancelSignalFunction(new FunctionRegistry()); + FunctionRegistry.set(cxt, fnReg); + UpdateExec ue = UpdateExec.dataset(dsg) + .update("INSERT {

?o } WHERE { BIND(() AS ?o) }") + .context(cxt) + .build(); + + ue.execute(); + Assert.assertEquals(1, Iter.count(dsg.find())); + } + + /** Set cancel signal function via {@link UpdateExec#getContext()}. */ + @Test + public void test_cancel_signal_update_3() { + DatasetGraph dsg = DatasetGraphFactory.create(); + UpdateExec ue = UpdateExec.dataset(dsg) + .update("INSERT {

?o } WHERE { BIND(() AS ?o) }") + .build(); + Context cxt = ue.getContext(); + FunctionRegistry fnReg = registerCancelSignalFunction(new FunctionRegistry()); + FunctionRegistry.set(cxt, fnReg); + ue.execute(); + Assert.assertEquals(1, Iter.count(dsg.find())); + } + static void cancellationTest(String queryString, Function> itFactory, Consumer> itConsumer) { cancellationTest(queryString, itFactory::apply); cancellationTestForIterator(queryString, itFactory, itConsumer);