From e1e54e2aeb55dbeb0640993ac140d11dc3d91bd5 Mon Sep 17 00:00:00 2001 From: Claus Stadler Date: Tue, 5 Nov 2024 18:02:59 +0100 Subject: [PATCH] GH-2821: Cancel Signal unavailable without timeouts - breaks manual abort() --- .../java/org/apache/jena/http/HttpLib.java | 14 ++ .../apache/jena/http/sys/ExecHTTPBuilder.java | 11 -- .../jena/http/sys/ExecUpdateHTTPBuilder.java | 34 ++-- .../main/java/org/apache/jena/query/ARQ.java | 11 ++ .../jena/sparql/engine/ExecutionContext.java | 15 +- .../apache/jena/sparql/engine/Timeouts.java | 181 ++++++++++++++++++ .../iterator/QueryIterProcessBinding.java | 2 +- .../jena/sparql/exec/QueryExecDataset.java | 29 +-- .../sparql/exec/QueryExecDatasetBuilder.java | 62 ++---- .../jena/sparql/exec/UpdateExecAdapter.java | 11 ++ .../jena/sparql/exec/UpdateExecBuilder.java | 4 + .../sparql/exec/UpdateExecBuilderAdapter.java | 7 + .../jena/sparql/exec/UpdateExecDataset.java | 5 +- .../sparql/exec/UpdateExecDatasetBuilder.java | 51 +++-- .../sparql/exec/UpdateExecutionAdapter.java | 10 + .../exec/UpdateExecutionBuilderAdapter.java | 8 + .../jena/sparql/exec/http/QueryExecHTTP.java | 2 +- .../jena/sparql/exec/http/UpdateExecHTTP.java | 57 ++++-- .../exec/http/UpdateExecHTTPBuilder.java | 2 +- .../exec/http/UpdateExecutionHTTPBuilder.java | 3 +- .../jena/sparql/modify/UpdateEngine.java | 6 +- .../jena/sparql/modify/UpdateEngineBase.java | 21 +- .../jena/sparql/modify/UpdateEngineMain.java | 20 +- .../sparql/modify/UpdateEngineWorker.java | 66 ++++++- .../sparql/modify/UpdateProcessorBase.java | 27 ++- .../org/apache/jena/sparql/util/Context.java | 23 +++ .../jena/update/UpdateExecutionBuilder.java | 6 + .../update/UpdateExecutionDatasetBuilder.java | 8 + .../apache/jena/update/UpdateProcessor.java | 7 + .../sparql/api/TestQueryExecutionCancel.java | 96 ++++++++-- .../apache/jena/sparql/api/TestTimeouts.java | 43 +++++ .../sparql/api/TestUpdateExecutionCancel.java | 158 +++++++++++++++ 32 files changed, 809 insertions(+), 191 deletions(-) create mode 100644 jena-arq/src/test/java/org/apache/jena/sparql/api/TestTimeouts.java create mode 100644 jena-arq/src/test/java/org/apache/jena/sparql/api/TestUpdateExecutionCancel.java diff --git a/jena-arq/src/main/java/org/apache/jena/http/HttpLib.java b/jena-arq/src/main/java/org/apache/jena/http/HttpLib.java index d141eada0c2..530533b8452 100644 --- a/jena-arq/src/main/java/org/apache/jena/http/HttpLib.java +++ b/jena-arq/src/main/java/org/apache/jena/http/HttpLib.java @@ -244,7 +244,21 @@ public static void handleResponseNoBody(HttpResponse response) { * @return String */ public static String handleResponseRtnString(HttpResponse response) { + return handleResponseRtnString(response, null); + } + + /** + * Handle the HTTP response and read the body to produce a string if a 200. + * Otherwise, throw an {@link HttpException}. + * @param response + * @param callback A callback that receives the opened input stream. + * @return String + */ + public static String handleResponseRtnString(HttpResponse response, Consumer callback) { InputStream input = handleResponseInputStream(response); + if (callback != null) { + callback.accept(input); + } try { return IO.readWholeFileAsUTF8(input); } catch (RuntimeIOException e) { throw new HttpException(e); } diff --git a/jena-arq/src/main/java/org/apache/jena/http/sys/ExecHTTPBuilder.java b/jena-arq/src/main/java/org/apache/jena/http/sys/ExecHTTPBuilder.java index c5b6718d27a..9c3b032d44b 100644 --- a/jena-arq/src/main/java/org/apache/jena/http/sys/ExecHTTPBuilder.java +++ b/jena-arq/src/main/java/org/apache/jena/http/sys/ExecHTTPBuilder.java @@ -280,31 +280,20 @@ public Y httpHeaders(Map headers) { public Y context(Context context) { if ( context == null ) return thisBuilder(); - ensureContext(); contextAcc.context(context); - //this.context.putAll(context); return thisBuilder(); } public Y set(Symbol symbol, Object value) { - ensureContext(); contextAcc.set(symbol, value); - //context.set(symbol, value); return thisBuilder(); } public Y set(Symbol symbol, boolean value) { - ensureContext(); contextAcc.set(symbol, value); - //context.set(symbol, value); return thisBuilder(); } - private void ensureContext() { -// if ( context == null ) -// context = new Context(); - } - /** * Set a timeout of the overall operation. * Time-to-connect can be set with a custom {@link HttpClient} - see {@link java.net.http.HttpClient.Builder#connectTimeout(java.time.Duration)}. diff --git a/jena-arq/src/main/java/org/apache/jena/http/sys/ExecUpdateHTTPBuilder.java b/jena-arq/src/main/java/org/apache/jena/http/sys/ExecUpdateHTTPBuilder.java index 19bbe0f8011..22e551c4cee 100644 --- a/jena-arq/src/main/java/org/apache/jena/http/sys/ExecUpdateHTTPBuilder.java +++ b/jena-arq/src/main/java/org/apache/jena/http/sys/ExecUpdateHTTPBuilder.java @@ -20,6 +20,7 @@ import java.net.http.HttpClient; import java.util.*; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.jena.graph.Node; @@ -31,6 +32,7 @@ import org.apache.jena.sparql.exec.http.UpdateSendMode; import org.apache.jena.sparql.syntax.syntaxtransform.UpdateTransformOps; import org.apache.jena.sparql.util.Context; +import org.apache.jena.sparql.util.ContextAccumulator; import org.apache.jena.sparql.util.Symbol; import org.apache.jena.sys.JenaSystem; import org.apache.jena.update.Update; @@ -56,8 +58,9 @@ public String toString() { /** Accumulator for update elements. Can build an overall string or UpdateRequest from the elements. */ private class UpdateEltAcc implements Iterable { - /** Delimiter for joining multiple SPARQL update strings into a single one. */ - public static final String DELIMITER = ";\n"; + /** Delimiter for joining multiple SPARQL update strings into a single one. + * The delimiter takes into account that the last line of a statement may be a single-line-comment. */ + public static final String DELIMITER = "\n;\n"; private List updateOperations = new ArrayList<>(); private List updateOperationsView = Collections.unmodifiableList(updateOperations); @@ -76,6 +79,7 @@ public void add(Update update) { add(new UpdateElt(update)); } + /** Add a string by parsing it. */ public void add(String updateRequestString) { UpdateRequest updateRequest = UpdateFactory.create(updateRequestString); add(updateRequest); @@ -146,9 +150,11 @@ public String buildString() { protected UpdateSendMode sendMode = UpdateSendMode.systemDefault; protected List usingGraphURIs = null; protected List usingNamedGraphURIs = null; - protected Context context = null; + private ContextAccumulator contextAcc = ContextAccumulator.newBuilder(()->ARQ.getContext()); // Uses query rewrite to replace variables by values. protected Map substitutionMap = new HashMap<>(); + protected long timeout = -1; + protected TimeUnit timeoutUnit = null; protected ExecUpdateHTTPBuilder() {} @@ -213,6 +219,12 @@ public Y substitution(Var var, Node value) { return thisBuilder(); } + public Y timeout(long timeout, TimeUnit timeoutUnit) { + this.timeout = timeout; + this.timeoutUnit = timeoutUnit; + return thisBuilder(); + } + public Y httpClient(HttpClient httpClient) { this.httpClient = Objects.requireNonNull(httpClient); return thisBuilder(); @@ -274,28 +286,20 @@ public Y httpHeaders(Map headers) { public Y context(Context context) { if ( context == null ) return thisBuilder(); - ensureContext(); - this.context.setAll(context); + this.contextAcc.context(context); return thisBuilder(); } public Y set(Symbol symbol, Object value) { - ensureContext(); - this.context.set(symbol, value); + this.contextAcc.set(symbol, value); return thisBuilder(); } public Y set(Symbol symbol, boolean value) { - ensureContext(); - this.context.set(symbol, value); + this.contextAcc.set(symbol, value); return thisBuilder(); } - private void ensureContext() { - if ( context == null ) - context = Context.create(); - } - public X build() { Objects.requireNonNull(serviceURL, "No service URL"); if ( updateEltAcc.isEmpty() ) @@ -322,7 +326,7 @@ public X build() { // If the UpdateRequest object wasn't built until now then build the string instead. String updateStringActual = updateActual == null ? updateEltAcc.buildString() : null; - Context cxt = (context!=null) ? context : ARQ.getContext().copy(); + Context cxt = contextAcc.context(); return buildX(hClient, updateActual, updateStringActual, cxt); } diff --git a/jena-arq/src/main/java/org/apache/jena/query/ARQ.java b/jena-arq/src/main/java/org/apache/jena/query/ARQ.java index 941f90f6c89..196af200421 100644 --- a/jena-arq/src/main/java/org/apache/jena/query/ARQ.java +++ b/jena-arq/src/main/java/org/apache/jena/query/ARQ.java @@ -198,6 +198,17 @@ public static void enableBlankNodeResultLabels(boolean val) { */ public static final Symbol queryTimeout = SystemARQ.allocSymbol("queryTimeout"); + /** + * Set timeout. The value of this symbol gives the value of the timeout in milliseconds + *
    + *
  • A Number; the long value is used
  • + *
  • A string, e.g. "1000", parsed as a number
  • + *
  • A string, as two numbers separated by a comma, e.g. "500,10000" parsed as two numbers
  • + *
+ * @see org.apache.jena.update.UpdateExecutionBuilder#timeout(long, TimeUnit) + */ + public static final Symbol updateTimeout = SystemARQ.allocSymbol("updateTimeout"); + // This can't be a context constant because NodeValues don't look in the context. // /** // * Context symbol controlling Roman Numerals in Filters. diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/engine/ExecutionContext.java b/jena-arq/src/main/java/org/apache/jena/sparql/engine/ExecutionContext.java index 8d67401a3c1..a79eea860b6 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/engine/ExecutionContext.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/engine/ExecutionContext.java @@ -24,10 +24,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.jena.atlas.iterator.Iter; -import org.apache.jena.atlas.logging.Log; import org.apache.jena.graph.Graph; import org.apache.jena.query.ARQ; -import org.apache.jena.sparql.ARQConstants; import org.apache.jena.sparql.core.DatasetGraph; import org.apache.jena.sparql.engine.main.OpExecutor; import org.apache.jena.sparql.engine.main.OpExecutorFactory; @@ -78,18 +76,7 @@ public ExecutionContext(DatasetGraph dataset, OpExecutorFactory factory) { } public ExecutionContext(Context params, Graph activeGraph, DatasetGraph dataset, OpExecutorFactory factory) { - this(params, activeGraph, dataset, factory, cancellationSignal(params)); - } - - private static AtomicBoolean cancellationSignal(Context cxt) { - if ( cxt == null ) - return null; - try { - return cxt.get(ARQConstants.symCancelQuery); - } catch (ClassCastException ex) { - Log.error(ExecutionContext.class, "Class cast exception: Expected AtomicBoolean for cancel control: "+ex.getMessage()); - return null; - } + this(params, activeGraph, dataset, factory, Context.getCancelSignal(params)); } private ExecutionContext(Context params, Graph activeGraph, DatasetGraph dataset, OpExecutorFactory factory, AtomicBoolean cancelSignal) { diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/engine/Timeouts.java b/jena-arq/src/main/java/org/apache/jena/sparql/engine/Timeouts.java index c0f0586cbba..81be81477f3 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/engine/Timeouts.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/engine/Timeouts.java @@ -22,6 +22,9 @@ import org.apache.jena.atlas.lib.Pair; import org.apache.jena.atlas.logging.Log; +import org.apache.jena.query.ARQ; +import org.apache.jena.sparql.util.Context; +import org.apache.jena.sparql.util.Symbol; /** Processing timeout strings. */ public class Timeouts { @@ -49,4 +52,182 @@ public static Pair parseTimeoutStr(String str, TimeUnit unit) { return null; } } + + public static record DurationWithUnit(long amount, TimeUnit unit) { + public DurationWithUnit() { + this(-1, TimeUnit.MILLISECONDS); + } + + public boolean isSet() { + return amount >= 0; + } + + public long asMillis() { + return (amount < 0) ? amount : unit.toMillis(amount); + } + } + + public static record Timeout(DurationWithUnit initialTimeout, DurationWithUnit overallTimeout) { + public static Timeout UNSET = new Timeout(-1, -1); + + public Timeout(long initialTimeout, TimeUnit initialTimeoutUnit, long overallTimeout, TimeUnit overallTimeoutUnit) { + this(new DurationWithUnit(initialTimeout, initialTimeoutUnit), new DurationWithUnit(overallTimeout, overallTimeoutUnit)); + } + + public Timeout(long initialTimeout, long overallTimeout) { + this(initialTimeout, TimeUnit.MILLISECONDS, overallTimeout, TimeUnit.MILLISECONDS); + } + + public boolean hasInitialTimeout() { + return initialTimeout().isSet(); + } + + public long initialTimeoutMillis() { + return initialTimeout().asMillis(); + } + + public boolean hasOverallTimeout() { + return overallTimeout().isSet(); + } + + public long overallTimeoutMillis() { + return overallTimeout().asMillis(); + } + + public boolean hasTimeout() { + return hasInitialTimeout() || hasOverallTimeout(); + } + } + + // TimeoutBuilder reserved as a possible super-interface for {Query, Update}Exec(ution)Builder. + public static class TimeoutBuilderImpl { + private static final long UNSET = -1; + + protected long initialTimeout = UNSET; + protected TimeUnit initialTimeoutUnit = null; + protected long overallTimeout = UNSET; + protected TimeUnit overallTimeoutUnit = null; + + public TimeoutBuilderImpl timeout(long value, TimeUnit timeUnit) { + this.initialTimeout = UNSET; + this.initialTimeoutUnit = null; + this.overallTimeout = value; + this.overallTimeoutUnit = timeUnit; + return this; + } + + public TimeoutBuilderImpl initialTimeout(long value, TimeUnit timeUnit) { + this.initialTimeout = value < 0 ? -1L : value ; + this.initialTimeoutUnit = timeUnit; + return this; + } + + public boolean hasInitialTimeout() { + return initialTimeout >= 0; + } + + public TimeoutBuilderImpl overallTimeout(long value, TimeUnit timeUnit) { + this.overallTimeout = value; + this.overallTimeoutUnit = timeUnit; + return this; + } + + public boolean hasOverallTimeout() { + return overallTimeout >= 0; + } + + public Timeout build() { + return new Timeout(initialTimeout, initialTimeoutUnit, overallTimeout, overallTimeoutUnit); + } + } + + /** Update any unset timeout in the builder from the specification object. */ + public static void applyDefaultTimeout(TimeoutBuilderImpl builder, Timeout timeout) { + if (timeout != null) { + if ( !builder.hasInitialTimeout() ) + builder.initialTimeout(timeout.initialTimeout().amount(), timeout.initialTimeout().unit()); + if ( !builder.hasOverallTimeout() ) + builder.overallTimeout(timeout.overallTimeout().amount(), timeout.overallTimeout().unit()); + } + } + + public static Timeout extractQueryTimeout(Context cxt) { + return extractTimeout(cxt, ARQ.queryTimeout); + } + + public static Timeout extractUpdateTimeout(Context cxt) { + return extractTimeout(cxt, ARQ.updateTimeout); + } + + public static Timeout extractTimeout(Context cxt, Symbol symbol) { + Object obj = cxt.get(symbol); + return parseTimeout(obj); + } + + public static Timeout parseTimeout(Object obj) { + Timeout result = Timeout.UNSET; + if ( obj != null ) { + try { + if ( obj instanceof Timeout to ) { + result = to; + } else if ( obj instanceof Number n ) { + long x = n.longValue(); + result = new Timeout(new DurationWithUnit(), new DurationWithUnit(x, TimeUnit.MILLISECONDS)); + } else if ( obj instanceof String str ) { + Pair pair = Timeouts.parseTimeoutStr(str, TimeUnit.MILLISECONDS); + if ( pair == null ) { + Log.warn(Timeouts.class, "Bad timeout string: "+str); + return result; + } + result = new Timeout(pair.getLeft(), pair.getRight()); + } else + Log.warn(Timeouts.class, "Can't interpret timeout: " + obj); + } catch (Exception ex) { + Log.warn(Timeouts.class, "Exception setting timeouts (context) from: "+obj, ex); + } + } + return result; + } + + public static void setQueryTimeout(Context cxt, Timeout timeout) { + setTimeout(cxt, ARQ.queryTimeout, timeout); + } + + public static void setUpdateTimeout(Context cxt, Timeout timeout) { + setTimeout(cxt, ARQ.updateTimeout, timeout); + } + + public static void setTimeout(Context cxt, Symbol symbol, Timeout timeout) { + Object obj = toContextValue(timeout); + cxt.set(symbol, obj); + } + + /** Inverse function of {@link #parseTimeout(Object)}. */ + public static Object toContextValue(Timeout timeout) { + Object result = timeout == null + ? null + : timeout.hasInitialTimeout() + ? toString(timeout) + : timeout.hasOverallTimeout() + ? timeout.overallTimeoutMillis() + : null; + return result; + } + + /** Inverse function of {@link #parseTimeout(Object)}. */ + public static String toString(Timeout timeout) { + String result = timeout.hasInitialTimeout() + ? timeout.initialTimeoutMillis() + "," + timeout.overallTimeoutMillis() + : timeout.hasOverallTimeout() + ? Long.toString(timeout.overallTimeoutMillis()) + : null; + return result; + } + + // Set times from context if not set directly. e..g Context provides default values. + // Contrast with SPARQLQueryProcessor where the context is limiting values of the protocol parameter. + public static void applyDefaultQueryTimeoutFromContext(TimeoutBuilderImpl builder, Context cxt) { + Timeout queryTimeout = extractQueryTimeout(cxt); + applyDefaultTimeout(builder, queryTimeout); + } } diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterProcessBinding.java b/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterProcessBinding.java index 72b1e045e11..204b577e02c 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterProcessBinding.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterProcessBinding.java @@ -49,7 +49,7 @@ public QueryIterProcessBinding(QueryIterator qIter, ExecutionContext context) { nextBinding = null ; AtomicBoolean signal; try { - signal = context.getContext().get(ARQConstants.symCancelQuery); + signal = context.getCancelSignal(); } catch(Exception ex) { signal = null; } diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/QueryExecDataset.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/QueryExecDataset.java index e2078a22476..c8e925e7b68 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/exec/QueryExecDataset.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/QueryExecDataset.java @@ -19,7 +19,6 @@ package org.apache.jena.sparql.exec; import java.util.*; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -53,6 +52,7 @@ import org.apache.jena.sparql.engine.binding.Binding; import org.apache.jena.sparql.engine.binding.BindingFactory; import org.apache.jena.sparql.engine.iterator.QueryIteratorWrapper; +import org.apache.jena.sparql.engine.Timeouts.Timeout; import org.apache.jena.sparql.graph.GraphOps; import org.apache.jena.sparql.modify.TemplateLib; import org.apache.jena.sparql.syntax.ElementGroup; @@ -89,11 +89,10 @@ public class QueryExecDataset implements QueryExec private long timeout2 = TIMEOUT_UNSET; private final AlarmClock alarmClock = AlarmClock.get(); private long queryStartTime = -1; // Unset - private AtomicBoolean cancelSignal = new AtomicBoolean(false); + private AtomicBoolean cancelSignal; protected QueryExecDataset(Query query, String queryString, DatasetGraph datasetGraph, Context cxt, - QueryEngineFactory qeFactory, - long timeout1, TimeUnit timeUnit1, long timeout2, TimeUnit timeUnit2, + QueryEngineFactory qeFactory, Timeout timeout, Binding initialToEngine) { // Context cxt is already a safe copy. this.query = query; @@ -101,10 +100,14 @@ protected QueryExecDataset(Query query, String queryString, DatasetGraph dataset this.dataset = datasetGraph; this.qeFactory = qeFactory; this.context = (cxt == null) ? Context.setupContextForDataset(cxt, datasetGraph) : cxt; - this.timeout1 = asMillis(timeout1, timeUnit1); - this.timeout2 = asMillis(timeout2, timeUnit2); + this.timeout1 = timeout.initialTimeoutMillis(); + this.timeout2 = timeout.overallTimeoutMillis(); // See also query substitution handled in QueryExecBuilder this.initialBinding = initialToEngine; + + // Cancel signal may originate from an e.c. an update execution. + this.cancelSignal = Context.getOrSetCancelSignal(context); + init(); } @@ -114,10 +117,6 @@ private void init() { context.put(ARQConstants.sysCurrentQuery, query); } - private static long asMillis(long duration, TimeUnit timeUnit) { - return (duration < 0) ? duration : timeUnit.toMillis(duration); - } - @Override public void close() { closed = true; @@ -470,16 +469,6 @@ private void startQueryIteratorActual() { return; } - // JENA-2141 - the timeout can go off while building the query iterator structure. - // In this case, use a signal passed through the context. - // We don't know if getPlan().iterator() does a lot of work or not - // (ideally it shouldn't start executing the query but in some sub-systems - // it might be necessary) - // - // This applies to the time to first result because to get the first result, the - // queryIterator must have been built. So it does not apply for the second - // stage of N,-1 or N,M. - context.set(ARQConstants.symCancelQuery, cancelSignal); TimeoutCallback callback = new TimeoutCallback() ; expectedCallback.set(callback) ; diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/QueryExecDatasetBuilder.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/QueryExecDatasetBuilder.java index eb0f705366e..23e003009af 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/exec/QueryExecDatasetBuilder.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/QueryExecDatasetBuilder.java @@ -23,7 +23,6 @@ import java.util.Objects; import java.util.concurrent.TimeUnit; -import org.apache.jena.atlas.lib.Pair; import org.apache.jena.atlas.logging.Log; import org.apache.jena.graph.Graph; import org.apache.jena.graph.Node; @@ -34,8 +33,10 @@ import org.apache.jena.sparql.core.Var; import org.apache.jena.sparql.engine.QueryEngineFactory; import org.apache.jena.sparql.engine.QueryEngineRegistry; -import org.apache.jena.sparql.engine.Timeouts; import org.apache.jena.sparql.engine.binding.Binding; +import org.apache.jena.sparql.engine.Timeouts; +import org.apache.jena.sparql.engine.Timeouts.Timeout; +import org.apache.jena.sparql.engine.Timeouts.TimeoutBuilderImpl; import org.apache.jena.sparql.syntax.syntaxtransform.QueryTransformOps; import org.apache.jena.sparql.util.Context; import org.apache.jena.sparql.util.ContextAccumulator; @@ -69,10 +70,7 @@ public static QueryExecDatasetBuilder create() { // Uses initial binding to execution (old, original) feature private Binding initialBinding = null; - private long initialTimeout = UNSET; - private TimeUnit initialTimeoutUnit = null; - private long overallTimeout = UNSET; - private TimeUnit overallTimeoutUnit = null; + private TimeoutBuilderImpl timeoutBuilder = new TimeoutBuilderImpl(); private QueryExecDatasetBuilder() { } @@ -166,60 +164,22 @@ public QueryExecDatasetBuilder initialBinding(Binding binding) { @Override public QueryExecDatasetBuilder timeout(long value, TimeUnit timeUnit) { - this.initialTimeout = UNSET; - this.initialTimeoutUnit = null; - this.overallTimeout = value; - this.overallTimeoutUnit = timeUnit; + timeoutBuilder.timeout(value, timeUnit); return this; } @Override public QueryExecDatasetBuilder initialTimeout(long value, TimeUnit timeUnit) { - this.initialTimeout = value < 0 ? -1L : value ; - this.initialTimeoutUnit = timeUnit; + timeoutBuilder.initialTimeout(value, timeUnit); return this; } @Override public QueryExecDatasetBuilder overallTimeout(long value, TimeUnit timeUnit) { - this.overallTimeout = value; - this.overallTimeoutUnit = timeUnit; + timeoutBuilder.overallTimeout(value, timeUnit); return this; } - // Set times from context if not set directly. e..g Context provides default values. - // Contrast with SPARQLQueryProcessor where the context is limiting values of the protocol parameter. - private static void defaultTimeoutsFromContext(QueryExecDatasetBuilder builder, Context cxt) { - applyTimeouts(builder, cxt.get(ARQ.queryTimeout)); - } - - /** Take obj, find the timeout(s) and apply to the builder */ - private static void applyTimeouts(QueryExecDatasetBuilder builder, Object obj) { - if ( obj == null ) - return ; - try { - if ( obj instanceof Number ) { - long x = ((Number)obj).longValue(); - if ( builder.overallTimeout < 0 ) - builder.overallTimeout(x, TimeUnit.MILLISECONDS); - } else if ( obj instanceof String ) { - String str = obj.toString(); - Pair pair = Timeouts.parseTimeoutStr(str, TimeUnit.MILLISECONDS); - if ( pair == null ) { - Log.warn(builder, "Bad timeout string: "+str); - return ; - } - if ( builder.initialTimeout < 0 ) - builder.initialTimeout(pair.getLeft(), TimeUnit.MILLISECONDS); - if ( builder.overallTimeout < 0 ) - builder.overallTimeout(pair.getRight(), TimeUnit.MILLISECONDS); - } else - Log.warn(builder, "Can't interpret timeout: " + obj); - } catch (Exception ex) { - Log.warn(builder, "Exception setting timeouts (context) from: "+obj); - } - } - @Override public QueryExec build() { Objects.requireNonNull(query, "No query for QueryExec"); @@ -243,17 +203,17 @@ public QueryExec build() { queryStringActual = null; } - defaultTimeoutsFromContext(this, cxt); + Timeouts.applyDefaultQueryTimeoutFromContext(this.timeoutBuilder, cxt); if ( dataset != null ) cxt.set(ARQConstants.sysCurrentDataset, DatasetFactory.wrap(dataset)); if ( queryActual != null ) cxt.set(ARQConstants.sysCurrentQuery, queryActual); + Timeout timeout = timeoutBuilder.build(); + QueryExec qExec = new QueryExecDataset(queryActual, queryStringActual, dataset, cxt, qeFactory, - initialTimeout, initialTimeoutUnit, - overallTimeout, overallTimeoutUnit, - initialBinding); + timeout, initialBinding); return qExec; } } diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecAdapter.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecAdapter.java index fd23e63af91..b97c2af0420 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecAdapter.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecAdapter.java @@ -18,6 +18,7 @@ package org.apache.jena.sparql.exec; +import org.apache.jena.sparql.util.Context; import org.apache.jena.update.UpdateExecution; public class UpdateExecAdapter implements UpdateExec { @@ -39,4 +40,14 @@ protected UpdateExecAdapter(UpdateExecution updateProc) { @Override public void execute() { updateProc.execute(); } + + @Override + public Context getContext() { + return updateProc.getContext(); + } + + @Override + public void abort() { + updateProc.abort(); + } } diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecBuilder.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecBuilder.java index 46434ed84f4..02f56413c90 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecBuilder.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecBuilder.java @@ -18,6 +18,8 @@ package org.apache.jena.sparql.exec; +import java.util.concurrent.TimeUnit; + import org.apache.jena.graph.Node; import org.apache.jena.query.ARQ; import org.apache.jena.sparql.core.Var; @@ -65,6 +67,8 @@ public default UpdateExecBuilder substitution(String var, Node value) { return substitution(Var.alloc(var), value); } + public UpdateExecBuilder timeout(long value, TimeUnit timeUnit); + public UpdateExec build(); /** Build and execute. */ diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecBuilderAdapter.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecBuilderAdapter.java index 1355939aa6a..461373bb0e7 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecBuilderAdapter.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecBuilderAdapter.java @@ -19,6 +19,7 @@ package org.apache.jena.sparql.exec; import java.util.Objects; +import java.util.concurrent.TimeUnit; import org.apache.jena.graph.Node; import org.apache.jena.sparql.core.ResultBinding; @@ -118,6 +119,12 @@ public UpdateExecBuilder substitution(Var var, Node value) { return this; } + @Override + public UpdateExecBuilder timeout(long timeout, TimeUnit timeoutUnit) { + builder = builder.timeout(timeout, timeoutUnit); + return this; + } + @Override public UpdateExec build() { UpdateExecution updateExec = builder.build(); diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecDataset.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecDataset.java index b6f0eed8276..1443aebdf86 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecDataset.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecDataset.java @@ -20,6 +20,7 @@ import org.apache.jena.sparql.core.DatasetGraph; import org.apache.jena.sparql.engine.binding.Binding; +import org.apache.jena.sparql.engine.Timeouts.Timeout; import org.apache.jena.sparql.modify.UpdateEngineFactory; import org.apache.jena.sparql.modify.UpdateProcessorBase; import org.apache.jena.sparql.util.Context; @@ -28,8 +29,8 @@ public class UpdateExecDataset extends UpdateProcessorBase implements UpdateExec { protected UpdateExecDataset(UpdateRequest request, DatasetGraph datasetGraph, - Binding inputBinding, Context context, UpdateEngineFactory factory) { - super(request, datasetGraph, inputBinding, context, factory); + Binding inputBinding, Context context, UpdateEngineFactory factory, Timeout timeout) { + super(request, datasetGraph, inputBinding, context, factory, timeout); } @Override diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecDatasetBuilder.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecDatasetBuilder.java index 52979eedf7c..f58c9b3bda6 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecDatasetBuilder.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecDatasetBuilder.java @@ -21,16 +21,20 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.concurrent.TimeUnit; import org.apache.jena.graph.Node; -import org.apache.jena.query.Query; +import org.apache.jena.query.ARQ; import org.apache.jena.sparql.core.DatasetGraph; import org.apache.jena.sparql.core.Var; import org.apache.jena.sparql.engine.binding.Binding; +import org.apache.jena.sparql.engine.Timeouts.Timeout; +import org.apache.jena.sparql.engine.Timeouts.TimeoutBuilderImpl; import org.apache.jena.sparql.modify.UpdateEngineFactory; import org.apache.jena.sparql.modify.UpdateEngineRegistry; import org.apache.jena.sparql.syntax.syntaxtransform.UpdateTransformOps; import org.apache.jena.sparql.util.Context; +import org.apache.jena.sparql.util.ContextAccumulator; import org.apache.jena.sparql.util.Symbol; import org.apache.jena.update.Update; import org.apache.jena.update.UpdateException; @@ -41,15 +45,18 @@ public class UpdateExecDatasetBuilder implements UpdateExecBuilder { public static UpdateExecDatasetBuilder create() { return new UpdateExecDatasetBuilder(); } - private DatasetGraph dataset = null; - private Query query = null; - private Context context = null; + private DatasetGraph dataset = null; + private ContextAccumulator contextAcc = ContextAccumulator.newBuilder(()->ARQ.getContext(), ()->Context.fromDataset(dataset)); + // Uses query rewrite to replace variables by values. - private Map substitutionMap = null; + private Map substitutionMap = null; + + private Binding initialBinding = null; + + private TimeoutBuilderImpl timeoutBuilder = new TimeoutBuilderImpl(); - private Binding initialBinding = null; - private UpdateRequest update = null; - private UpdateRequest updateRequest = new UpdateRequest(); + private UpdateRequest update = null; + private UpdateRequest updateRequest = new UpdateRequest(); private UpdateExecDatasetBuilder() {} @@ -96,29 +103,24 @@ public UpdateExecDatasetBuilder dataset(DatasetGraph dsg) { public UpdateExecDatasetBuilder context(Context context) { if ( context == null ) return this; - ensureContext(); - this.context.putAll(context); + this.contextAcc.context(context); return this; } @Override public UpdateExecDatasetBuilder set(Symbol symbol, Object value) { - ensureContext(); - this.context.set(symbol, value); + this.contextAcc.set(symbol, value); return this; } @Override public UpdateExecDatasetBuilder set(Symbol symbol, boolean value) { - ensureContext(); - this.context.set(symbol, value); + this.contextAcc.set(symbol, value); return this; } - - private void ensureContext() { - if ( context == null ) - context = new Context(); + public Context getContext() { + return contextAcc.context(); } @Override @@ -140,6 +142,12 @@ private void ensureSubstitutionMap() { substitutionMap = new HashMap<>(); } + @Override + public UpdateExecDatasetBuilder timeout(long timeout, TimeUnit timeoutUnit) { + this.timeoutBuilder.timeout(timeout, timeoutUnit); + return this; + } + /** Use {@link #substitution(Binding)} */ @Deprecated public UpdateExecDatasetBuilder initialBinding(Binding initialBinding) { @@ -157,11 +165,14 @@ public UpdateExec build() { if ( substitutionMap != null && ! substitutionMap.isEmpty() ) actualUpdate = UpdateTransformOps.transform(actualUpdate, substitutionMap); - Context cxt = Context.setupContextForDataset(context, dataset); + Context cxt = getContext(); UpdateEngineFactory f = UpdateEngineRegistry.get().find(dataset, cxt); if ( f == null ) throw new UpdateException("Failed to find an UpdateEngine"); - UpdateExec uExec = new UpdateExecDataset(actualUpdate, dataset, initialBinding, cxt, f); + + Timeout timeout = timeoutBuilder.build(); + + UpdateExec uExec = new UpdateExecDataset(actualUpdate, dataset, initialBinding, cxt, f, timeout); return uExec; } diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecutionAdapter.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecutionAdapter.java index 967a4731669..7123b9c32fc 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecutionAdapter.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecutionAdapter.java @@ -18,6 +18,7 @@ package org.apache.jena.sparql.exec; +import org.apache.jena.sparql.util.Context; import org.apache.jena.update.UpdateExecution; public class UpdateExecutionAdapter implements UpdateExecution { @@ -40,4 +41,13 @@ protected UpdateExecutionAdapter(UpdateExec updateExec) { @Override public void execute() { updateExec.execute(); } + @Override + public Context getContext() { + return updateExec.getContext(); + } + + @Override + public void abort() { + updateExec.abort(); + } } diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecutionBuilderAdapter.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecutionBuilderAdapter.java index 48240e68229..88e91e98c46 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecutionBuilderAdapter.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecutionBuilderAdapter.java @@ -18,6 +18,8 @@ package org.apache.jena.sparql.exec; +import java.util.concurrent.TimeUnit; + import org.apache.jena.query.QuerySolution; import org.apache.jena.rdf.model.RDFNode; import org.apache.jena.sparql.engine.binding.Binding; @@ -103,6 +105,12 @@ public UpdateExecutionBuilder substitution(String varName, RDFNode value) { return this; } + @Override + public UpdateExecutionBuilder timeout(long value, TimeUnit timeUnit) { + builder.timeout(value, timeUnit); + return this; + } + @Override public UpdateExecution build() { return UpdateExecutionAdapter.adapt(builder.build()); diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/QueryExecHTTP.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/QueryExecHTTP.java index beb947f4f0c..1d5e08f59a0 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/QueryExecHTTP.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/QueryExecHTTP.java @@ -114,7 +114,7 @@ public static QueryExecHTTPBuilder service(String serviceURL) { private String httpResponseContentType = null; // Releasing HTTP input streams is important. We remember this for SELECT result // set streaming, and will close it when the execution is closed - private InputStream retainedConnection = null; + private volatile InputStream retainedConnection = null; private HttpClient httpClient = HttpEnv.getDftHttpClient(); private Map httpHeaders; diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/UpdateExecHTTP.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/UpdateExecHTTP.java index e39fac4d933..12690831077 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/UpdateExecHTTP.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/UpdateExecHTTP.java @@ -29,7 +29,10 @@ import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.jena.atlas.logging.Log; import org.apache.jena.http.HttpEnv; import org.apache.jena.http.HttpLib; import org.apache.jena.riot.WebContent; @@ -59,13 +62,19 @@ public static UpdateExecHTTPBuilder service(String endpointURL) { private final Params params; private final List usingGraphURIs; private final List usingNamedGraphURIs; + private final long timeout; + private final TimeUnit timeoutUnit; + + private AtomicBoolean cancelSignal = new AtomicBoolean(false); + private volatile InputStream retainedConnection = null; /*package*/ UpdateExecHTTP(String serviceURL, UpdateRequest update, String updateString, HttpClient httpClient, Params params, List usingGraphURIs, List usingNamedGraphURIs, Map httpHeaders, UpdateSendMode sendMode, - Context context) { + Context context, + long timeout, TimeUnit timeoutUnit) { this.context = context; this.service = serviceURL; //this.update = update; @@ -77,17 +86,14 @@ public static UpdateExecHTTPBuilder service(String endpointURL) { this.usingNamedGraphURIs = usingNamedGraphURIs; this.httpHeaders = httpHeaders; this.sendMode = sendMode; + this.timeout = timeout; + this.timeoutUnit = timeoutUnit; } -// @Override -// public Context getContext() { -// return null; -// } -// -// @Override -// public DatasetGraph getDatasetGraph() { -// return null; -// } + @Override + public Context getContext() { + return context; + } @Override public void execute() { @@ -130,13 +136,40 @@ private void executePostForm(Params thisParams) { } private String executeUpdate(String requestURL, BodyPublisher body, String contentType) { - HttpRequest.Builder builder = HttpLib.requestBuilder(requestURL, httpHeaders, -1L, null); + HttpRequest.Builder builder = HttpLib.requestBuilder(requestURL, httpHeaders, timeout, timeoutUnit); builder = contentTypeHeader(builder, contentType); HttpRequest request = builder.POST(body).build(); logUpdate(updateString, request); HttpResponse response = HttpLib.execute(httpClient, request); - return handleResponseRtnString(response); + return HttpLib.handleResponseRtnString(response, this::setRetainedConnection); + } + + private void setRetainedConnection(InputStream in) { + synchronized (cancelSignal) { + retainedConnection = in; + if (cancelSignal.get()) { + abort(); + } + } } private static void logUpdate(String updateString, HttpRequest request) {} + + /** Best effort that tries to close an underlying HTTP connection. + * May still hang waiting for the HTTP request to complete. */ + @Override + public void abort() { + cancelSignal.set(true); + synchronized (cancelSignal) { + try { + InputStream in = retainedConnection; + if (in != null) { + in.close(); + retainedConnection = null; + } + } catch (Exception ex) { + Log.warn(this, "Error during abort", ex); + } + } + } } diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/UpdateExecHTTPBuilder.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/UpdateExecHTTPBuilder.java index fb11b209854..c38ff2c72c4 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/UpdateExecHTTPBuilder.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/UpdateExecHTTPBuilder.java @@ -45,6 +45,6 @@ protected UpdateExecHTTP buildX(HttpClient hClient, UpdateRequest updateActual, copyArray(usingGraphURIs), copyArray(usingNamedGraphURIs), new HashMap<>(httpHeaders), - sendMode, cxt); + sendMode, cxt, timeout, timeoutUnit); } } diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/UpdateExecutionHTTPBuilder.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/UpdateExecutionHTTPBuilder.java index 5d6350c0407..f4aecfb0b7c 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/UpdateExecutionHTTPBuilder.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/UpdateExecutionHTTPBuilder.java @@ -52,7 +52,8 @@ protected UpdateExecutionHTTP buildX(HttpClient hClient, UpdateRequest updateAct copyArray(usingGraphURIs), copyArray(usingNamedGraphURIs), new HashMap<>(httpHeaders), - sendMode, cxt); + sendMode, cxt, + timeout, timeoutUnit); return new UpdateExecutionHTTP(uExec); } diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateEngine.java b/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateEngine.java index d8ef8284dd2..74171307c5e 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateEngine.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateEngine.java @@ -30,12 +30,12 @@ public interface UpdateEngine * Signal start of a request being executed */ public void startRequest(); - + /** - * Signal end of a request being executed + * Signal end of a request being executed */ public void finishRequest(); - + /** * Returns an {@link UpdateSink} that accepts Update operations */ diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateEngineBase.java b/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateEngineBase.java index 18498224c3b..c521ea63c83 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateEngineBase.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateEngineBase.java @@ -18,7 +18,6 @@ package org.apache.jena.sparql.modify; -import org.apache.jena.query.ARQ ; import org.apache.jena.sparql.ARQConstants ; import org.apache.jena.sparql.core.DatasetGraph ; import org.apache.jena.sparql.engine.binding.Binding ; @@ -39,18 +38,16 @@ public UpdateEngineBase(DatasetGraph datasetGraph, this.inputBinding = inputBinding ; this.context = setupContext(context, datasetGraph) ; } - - private static Context setupContext(Context context, DatasetGraph dataset) + + private Context setupContext(Context cxt, DatasetGraph dataset) { - // To many copies? - if ( context == null ) // Copy of global context to protect against change. - context = ARQ.getContext() ; - context = context.copy() ; + // The following setup is effectively the same as in QueryEngineBase + Context result = cxt; + + if ( result == null ) + result = Context.setupContextForDataset(cxt, dataset); - if ( dataset.getContext() != null ) - context.putAll(dataset.getContext()) ; - - context.set(ARQConstants.sysCurrentTime, NodeFactoryExtra.nowAsDateTime()) ; - return context ; + result.set(ARQConstants.sysCurrentTime, NodeFactoryExtra.nowAsDateTime()) ; + return result ; } } diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateEngineMain.java b/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateEngineMain.java index 597e1c0b9cf..45b23e387fc 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateEngineMain.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateEngineMain.java @@ -38,7 +38,7 @@ * See {@link UpdateEngineNonStreaming} for a subclass that accumulates updates, including during * parsing then executes the operation. */ -public class UpdateEngineMain extends UpdateEngineBase +public class UpdateEngineMain extends UpdateEngineBase { /** * Creates a new Update Engine @@ -53,12 +53,12 @@ public UpdateEngineMain(DatasetGraph datasetGraph, Binding inputBinding, Context @Override public void startRequest() {} - + @Override public void finishRequest() {} - + private UpdateSink updateSink = null ; - + /* * Returns the {@link UpdateSink}. In this implementation, this is done by with * an {@link UpdateVisitor} which will visit each update operation and send the @@ -71,11 +71,11 @@ public UpdateSink getUpdateSink() { if ( updateSink == null ) updateSink = new UpdateVisitorSink(this.prepareWorker(), - sink(q->datasetGraph.add(q)), + sink(q->datasetGraph.add(q)), sink(q->datasetGraph.delete(q))); return updateSink ; } - + /** * Creates the {@link UpdateVisitor} which will do the work of applying the updates * @return The update visitor to be used to apply the updates @@ -84,18 +84,18 @@ protected UpdateVisitor prepareWorker() { return new UpdateEngineWorker(datasetGraph, inputBinding, context) ; } - /** Direct a sink to a Consumer. */ + /** Direct a sink to a Consumer. */ private Sink sink(Consumer action) { return new Sink() { @Override public void send(X item) { action.accept(item); } - @Override public void close() {} + @Override public void close() {} @Override public void flush() {} - }; + }; } - + private static UpdateEngineFactory factory = new UpdateEngineFactory() { @Override diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateEngineWorker.java b/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateEngineWorker.java index 5c4eeab04bf..40a443bba75 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateEngineWorker.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateEngineWorker.java @@ -25,6 +25,7 @@ import java.util.Collection; import java.util.Iterator; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.jena.atlas.data.BagFactory; import org.apache.jena.atlas.data.DataBag; @@ -45,6 +46,8 @@ import org.apache.jena.sparql.engine.binding.Binding; import org.apache.jena.sparql.engine.binding.BindingRoot; import org.apache.jena.sparql.exec.*; +import org.apache.jena.sparql.engine.Timeouts; +import org.apache.jena.sparql.engine.Timeouts.Timeout; import org.apache.jena.sparql.graph.GraphFactory; import org.apache.jena.sparql.graph.GraphOps; import org.apache.jena.sparql.modify.request.*; @@ -64,10 +67,42 @@ public class UpdateEngineWorker implements UpdateVisitor protected final Binding inputBinding; // Used for UpdateModify only: substitution is better. protected final Context context; + protected final Timeout timeout; + + /** Used to compute the remaining overall time that may be spent in query execution. */ + protected long startTimeMillis = -1; + + /** The currently executing query exec. */ + protected final AtomicBoolean cancelSignal; + protected volatile QueryExec activeQExec = null; + public UpdateEngineWorker(DatasetGraph datasetGraph, Binding inputBinding, Context context) { this.datasetGraph = datasetGraph; this.inputBinding = inputBinding; this.context = context; + this.timeout = Timeouts.extractUpdateTimeout(context); + this.cancelSignal = Context.getOrSetCancelSignal(context); + } + + public void abort() { + if (cancelSignal.compareAndSet(false, true)) { + synchronized (this) { + // If the change of the cancel signal happened here then abort the activeQExec. + if (activeQExec != null) { + activeQExec.abort(); + } + } + } + } + + private synchronized void setQExec(QueryExec qExec) { + synchronized (this) { + this.activeQExec = qExec; + // Cancel the qExec immediately if the cancel signal is true. + if (cancelSignal.get()) { + activeQExec.abort(); + } + } } @Override @@ -528,7 +563,7 @@ protected Iterator evalBindings(Element pattern) { } @SuppressWarnings("all") - protected static Iterator evalBindings(Query query, DatasetGraph dsg, Binding inputBinding, Context context) { + protected Iterator evalBindings(Query query, DatasetGraph dsg, Binding inputBinding, Context context) { // The UpdateProcessorBase already copied the context and made it safe // ... but that's going to happen again :-( if ( query == null ) { @@ -536,8 +571,10 @@ protected static Iterator evalBindings(Query query, DatasetGraph dsg, B return Iter.singletonIterator(binding); } + updateRemainingQueryTimeout(context); + // Not QueryExecDataset.dataset(...) because of initialBinding. - QueryExecDatasetBuilder builder = QueryExecDatasetBuilder.create().dataset(dsg).query(query); + QueryExecDatasetBuilder builder = QueryExecDatasetBuilder.create().dataset(dsg).query(query).context(context); if ( inputBinding != null ) { // Must use initialBinding - it puts the input in the results, unlike substitution. builder.initialBinding(inputBinding); @@ -545,9 +582,34 @@ protected static Iterator evalBindings(Query query, DatasetGraph dsg, B // builder.substitution(inputBinding); } QueryExec qExec = builder.build(); + setQExec(qExec); return qExec.select(); } + private void updateRemainingQueryTimeout(Context context) { + Timeout finalTimeout = null; + if (timeout.hasOverallTimeout()) { + long remainingOverallTimeoutMillis = -1; + if (startTimeMillis < 0) { + startTimeMillis = System.currentTimeMillis(); + remainingOverallTimeoutMillis = timeout.overallTimeoutMillis(); + } else { + long currentTimeMillis = System.currentTimeMillis(); + long elapsedMillis = currentTimeMillis - startTimeMillis; + remainingOverallTimeoutMillis -= elapsedMillis; + if (remainingOverallTimeoutMillis < 0) { + remainingOverallTimeoutMillis = 0; + } + } + finalTimeout = new Timeout(timeout.initialTimeoutMillis(), remainingOverallTimeoutMillis); + } else if(timeout.hasInitialTimeout()) { + finalTimeout = new Timeout(timeout.initialTimeoutMillis(), -1); + } + + // Override any prior queryTimeout symbol with a fresh value computed from the configured updateTimeout. + Timeouts.setQueryTimeout(context, finalTimeout); + } + /** * Execute. *
diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateProcessorBase.java b/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateProcessorBase.java index e4b1b53c5eb..2218db8697d 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateProcessorBase.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateProcessorBase.java @@ -18,9 +18,13 @@ package org.apache.jena.sparql.modify; +import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.jena.atlas.iterator.Iter ; import org.apache.jena.sparql.core.DatasetGraph ; import org.apache.jena.sparql.engine.binding.Binding ; +import org.apache.jena.sparql.engine.Timeouts; +import org.apache.jena.sparql.engine.Timeouts.Timeout; import org.apache.jena.sparql.util.Context ; import org.apache.jena.update.UpdateProcessor ; import org.apache.jena.update.UpdateRequest ; @@ -35,12 +39,14 @@ public class UpdateProcessorBase implements UpdateProcessor protected final Binding inputBinding; protected final UpdateEngineFactory factory ; protected final Context context ; + protected final Timeout timeout ; public UpdateProcessorBase(UpdateRequest request, DatasetGraph datasetGraph, Binding inputBinding, Context context, - UpdateEngineFactory factory) + UpdateEngineFactory factory, + Timeout timeout) { this.request = request ; this.datasetGraph = datasetGraph ; @@ -48,6 +54,9 @@ public UpdateProcessorBase(UpdateRequest request, this.context = context; Context.setCurrentDateTime(this.context) ; this.factory = factory ; + this.timeout = timeout; + Context.getOrSetCancelSignal(this.context) ; + Timeouts.setUpdateTimeout(context, timeout); } @Override @@ -55,6 +64,7 @@ public void execute() { UpdateEngine uProc = factory.create(datasetGraph, inputBinding, context); uProc.startRequest(); + // context.get(ARQ.updateTimeout); try { UpdateSink sink = uProc.getUpdateSink(); Iter.sendToSink(request.iterator(), sink); // Will call close on sink if there are no exceptions @@ -62,4 +72,19 @@ public void execute() { uProc.finishRequest() ; } } + + @Override + public Context getContext() { + return context; + } + + @Override + public void abort() { + // Right now abort is only signaled via the context's cancel signal. + // An improvement might be introducing UpdateEngine.abort(). + AtomicBoolean cancelSignal = Context.getCancelSignal(context); + if (cancelSignal != null) { + cancelSignal.set(true); + } + } } diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/util/Context.java b/jena-arq/src/main/java/org/apache/jena/sparql/util/Context.java index 9521ea3f7d2..08d6c41583b 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/util/Context.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/util/Context.java @@ -20,13 +20,16 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; import org.apache.jena.atlas.lib.Lib; +import org.apache.jena.atlas.logging.Log; import org.apache.jena.query.ARQ; import org.apache.jena.sparql.ARQConstants; import org.apache.jena.sparql.ARQException; import org.apache.jena.sparql.core.DatasetGraph; +import org.apache.jena.sparql.engine.ExecutionContext; /** * A class for setting and keeping named values. Used to pass @@ -418,6 +421,26 @@ public static void setCurrentDateTime(Context context) { context.set(ARQConstants.sysCurrentTime, NodeFactoryExtra.nowAsDateTime()); } + public static AtomicBoolean getCancelSignal(Context context) { + if ( context == null ) + return null; + try { + return context.get(ARQConstants.symCancelQuery); + } catch (ClassCastException ex) { + Log.error(Context.class, "Class cast exception: Expected AtomicBoolean for cancel control: "+ex.getMessage()); + return null; + } + } + + public static AtomicBoolean getOrSetCancelSignal(Context context) { + AtomicBoolean cancelSignal = getCancelSignal(context); + if (cancelSignal == null) { + cancelSignal = new AtomicBoolean(false); + context.set(ARQConstants.symCancelQuery, cancelSignal); + } + return cancelSignal; + } + /** Merge an outer (defaults to the system global context) * and local context to produce a new context * The new context is always a separate copy. diff --git a/jena-arq/src/main/java/org/apache/jena/update/UpdateExecutionBuilder.java b/jena-arq/src/main/java/org/apache/jena/update/UpdateExecutionBuilder.java index 0a294e3c225..3672c15b8b4 100644 --- a/jena-arq/src/main/java/org/apache/jena/update/UpdateExecutionBuilder.java +++ b/jena-arq/src/main/java/org/apache/jena/update/UpdateExecutionBuilder.java @@ -18,6 +18,8 @@ package org.apache.jena.update; +import java.util.concurrent.TimeUnit; + import org.apache.jena.query.QuerySolution; import org.apache.jena.rdf.model.RDFNode; import org.apache.jena.sparql.util.Context; @@ -47,6 +49,10 @@ public interface UpdateExecutionBuilder { public UpdateExecutionBuilder substitution(String varName, RDFNode value); + public UpdateExecutionBuilder timeout(long value, TimeUnit timeUnit); + + public default UpdateExecutionBuilder timeout(long value) { return timeout(value, TimeUnit.MILLISECONDS); } + public UpdateExecution build(); /** Build and execute */ diff --git a/jena-arq/src/main/java/org/apache/jena/update/UpdateExecutionDatasetBuilder.java b/jena-arq/src/main/java/org/apache/jena/update/UpdateExecutionDatasetBuilder.java index 63efc743f4e..b0ce8acac21 100644 --- a/jena-arq/src/main/java/org/apache/jena/update/UpdateExecutionDatasetBuilder.java +++ b/jena-arq/src/main/java/org/apache/jena/update/UpdateExecutionDatasetBuilder.java @@ -18,6 +18,8 @@ package org.apache.jena.update; +import java.util.concurrent.TimeUnit; + import org.apache.jena.graph.Node; import org.apache.jena.query.Dataset; import org.apache.jena.query.QueryExecution; @@ -131,6 +133,12 @@ public UpdateExecutionDatasetBuilder substitution(String varName, RDFNode value) return this; } + @Override + public UpdateExecutionBuilder timeout(long value, TimeUnit timeUnit) { + builder.timeout(value, timeUnit); + return this; + } + @Override public UpdateExecution build() { UpdateExec exec = builder.build(); diff --git a/jena-arq/src/main/java/org/apache/jena/update/UpdateProcessor.java b/jena-arq/src/main/java/org/apache/jena/update/UpdateProcessor.java index 71ffb5a63ce..df2964ea11a 100644 --- a/jena-arq/src/main/java/org/apache/jena/update/UpdateProcessor.java +++ b/jena-arq/src/main/java/org/apache/jena/update/UpdateProcessor.java @@ -18,6 +18,8 @@ package org.apache.jena.update; +import org.apache.jena.sparql.util.Context; + /** * An instance of a execution of an UpdateRequest. * Applies to UpdateExec (GPI) and UpdateExecution (API). @@ -26,4 +28,9 @@ public interface UpdateProcessor { /** Execute */ public void execute() ; + + /** Attempt to asynchronously abort an update execution. */ + public void abort() ; + + public Context getContext(); } diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/api/TestQueryExecutionCancel.java b/jena-arq/src/test/java/org/apache/jena/sparql/api/TestQueryExecutionCancel.java index 1815e1a3060..dbfacc14f3a 100644 --- a/jena-arq/src/test/java/org/apache/jena/sparql/api/TestQueryExecutionCancel.java +++ b/jena-arq/src/test/java/org/apache/jena/sparql/api/TestQueryExecutionCancel.java @@ -31,6 +31,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; @@ -43,13 +44,21 @@ import org.apache.jena.rdf.model.ModelFactory; import org.apache.jena.rdf.model.Property ; import org.apache.jena.rdf.model.Resource ; +import org.apache.jena.sparql.ARQConstants; import org.apache.jena.sparql.core.DatasetGraph; import org.apache.jena.sparql.core.DatasetGraphFactory; +import org.apache.jena.sparql.engine.ExecutionContext; import org.apache.jena.sparql.exec.QueryExec; +import org.apache.jena.sparql.exec.QueryExecBuilder; +import org.apache.jena.sparql.expr.NodeValue; +import org.apache.jena.sparql.function.FunctionBase0; +import org.apache.jena.sparql.function.FunctionEnv; import org.apache.jena.sparql.function.FunctionRegistry ; import org.apache.jena.sparql.function.library.wait ; import org.apache.jena.sparql.graph.GraphFactory ; import org.apache.jena.sparql.sse.SSE; +import org.apache.jena.sparql.util.Context; +import org.apache.jena.sparql.util.Symbol; import org.junit.AfterClass ; import org.junit.Assert; import org.junit.BeforeClass ; @@ -229,6 +238,73 @@ public void test_cancel_json() { cancellationTest("JSON {\":a\": \"b\"} WHERE {}", exec->exec.execJson().get(0)); } + /** Set cancel signal function via {@link QueryExecBuilder#set(Symbol, Object)}. */ + @Test + public void test_cancel_signal_1() { + DatasetGraph dsg = DatasetGraphFactory.create(); + FunctionRegistry fnReg = registerCancelSignalFunction(new FunctionRegistry()); + try (QueryExec qe = QueryExec.dataset(dsg).query("SELECT (() AS ?foobar) { }") + .set(ARQConstants.registryFunctions, fnReg) + .build()) { + Assert.assertEquals(1, ResultSetFormatter.consume(ResultSet.adapt(qe.select()))); + } + } + + /** Set cancel signal function via {@link QueryExecBuilder#context(Context)}. */ + @Test + public void test_cancel_signal_2() { + DatasetGraph dsg = DatasetGraphFactory.create(); + Context cxt = ARQ.getContext().copy(); + FunctionRegistry fnReg = registerCancelSignalFunction(new FunctionRegistry()); + FunctionRegistry.set(cxt, fnReg); + try (QueryExec qe = QueryExec.dataset(dsg).query("SELECT (() AS ?foobar) { }").context(cxt).build()) { + Assert.assertEquals(1, ResultSetFormatter.consume(ResultSet.adapt(qe.select()))); + } + } + + /** Set cancel signal function via {@link QueryExec#getContext()}. */ + @Test + public void test_cancel_signal_3() { + DatasetGraph dsg = DatasetGraphFactory.create(); + try (QueryExec qe = QueryExec.dataset(dsg).query("SELECT (() AS ?foobar) { }").build()) { + FunctionRegistry fnReg = registerCancelSignalFunction(new FunctionRegistry()); + FunctionRegistry.set(qe.getContext(), fnReg); + ResultSetFormatter.consume(ResultSet.adapt(qe.select())); + } + } + + /** Registers the function which returns its value if present. + * A RuntimeException is raised if there is no cancel signal in the execution context. */ + static FunctionRegistry registerCancelSignalFunction(FunctionRegistry fnReg) { + fnReg.put("urn:cancelSignal", iri -> new FunctionBase0() { + @Override + protected NodeValue exec(List args, FunctionEnv env) { + ExecutionContext execCxt = (ExecutionContext)env; + AtomicBoolean cancelSignal = execCxt.getCancelSignal(); + if (cancelSignal == null) { + throw new RuntimeException("No cancel signal in execution context."); + } + return NodeValue.makeBoolean(cancelSignal.get()); + } + + @Override + public NodeValue exec() { + throw new IllegalStateException("Should never be called"); + } + }); + + return fnReg; + } + + /** Create a model with 1000 triples. */ + static Graph createTestGraph() { + Graph graph = GraphFactory.createDefaultGraph(); + IntStream.range(0, 1000) + .mapToObj(i -> NodeFactory.createURI("http://www.example.org/r" + i)) + .forEach(node -> graph.add(node, node, node)); + return graph; + } + static void cancellationTest(String queryString, Function> itFactory, Consumer> itConsumer) { cancellationTest(queryString, itFactory::apply); cancellationTestForIterator(queryString, itFactory, itConsumer); @@ -245,7 +321,7 @@ static void cancellationTest(String queryString, Consumer execAction) } /** Obtain an iterator and only afterwards abort the query exec. - * Operations on the iterator are now expected to fail. */ + * Operations on the iterator are now expected to fail. */ static void cancellationTestForIterator(String queryString, Function> itFactory, Consumer> itConsumer) { DatasetGraph dsg = DatasetGraphFactory.createTxnMem(); dsg.add(SSE.parseQuad("(_ :s :p :o)")); @@ -256,10 +332,8 @@ static void cancellationTestForIterator(String queryString, Function NodeFactory.createURI("http://www.example.org/r" + i)) - .forEach(node -> graph.add(node, node, node)); - Model model = ModelFactory.createModelForGraph(graph); + Model model = ModelFactory.createModelForGraph(createTestGraph()); // Create a query that creates 3 cross joins - resulting in one billion result rows Query query = QueryFactory.create("SELECT * { ?a ?b ?c . ?d ?e ?f . ?g ?h ?i . }"); @@ -290,10 +360,8 @@ private static final int doCount(QueryExecution qe) { } } - /** - * Reusable method that creates a parallel stream that starts query executions - * and schedules cancel tasks on a separate thread pool. - */ + /** Reusable method that creates a parallel stream that starts query executions + * and schedules cancel tasks on a separate thread pool. */ public static void runConcurrentAbort(int taskCount, int maxCancelDelay, Callable qeFactory, Function processor) { Random cancelDelayRandom = new Random(); ExecutorService executorService = Executors.newCachedThreadPool(); diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/api/TestTimeouts.java b/jena-arq/src/test/java/org/apache/jena/sparql/api/TestTimeouts.java new file mode 100644 index 00000000000..8b19370c543 --- /dev/null +++ b/jena-arq/src/test/java/org/apache/jena/sparql/api/TestTimeouts.java @@ -0,0 +1,43 @@ +package org.apache.jena.sparql.api; + +import org.apache.jena.sparql.engine.Timeouts; +import org.apache.jena.sparql.engine.Timeouts.Timeout; +import org.junit.Assert; +import org.junit.Test; + +public class TestTimeouts { + @Test + public void testUnset() { + Timeout timeout = roundtrip(Timeout.UNSET); + String str = Timeouts.toString(timeout); + Assert.assertNull(str); + } + + @Test + public void testInitialTimeout() { + Timeout timeout = roundtrip(new Timeout(6, -1)); + String str = Timeouts.toString(timeout); + Assert.assertEquals("6,-1", str); + } + + @Test + public void testOverallTimeout() { + Timeout timeout = roundtrip(new Timeout(-1, 6)); + String str = Timeouts.toString(timeout); + Assert.assertEquals("6", str); + } + + @Test + public void testInitialAndOverallTimeout() { + Timeout timeout = roundtrip(new Timeout(6, 6)); + String str = Timeouts.toString(timeout); + Assert.assertEquals("6,6", str); + } + + public static Timeout roundtrip(Timeout timeout) { + Object obj = Timeouts.toContextValue(timeout); + Timeout result = Timeouts.parseTimeout(obj); + Assert.assertEquals(timeout, result); + return result; + } +} diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/api/TestUpdateExecutionCancel.java b/jena-arq/src/test/java/org/apache/jena/sparql/api/TestUpdateExecutionCancel.java new file mode 100644 index 00000000000..2f5947160a3 --- /dev/null +++ b/jena-arq/src/test/java/org/apache/jena/sparql/api/TestUpdateExecutionCancel.java @@ -0,0 +1,158 @@ +package org.apache.jena.sparql.api; + +import java.util.List; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.jena.atlas.iterator.Iter; +import org.apache.jena.graph.Graph; +import org.apache.jena.query.ARQ; +import org.apache.jena.query.Dataset; +import org.apache.jena.query.DatasetFactory; +import org.apache.jena.query.QueryCancelledException; +import org.apache.jena.sparql.ARQConstants; +import org.apache.jena.sparql.core.DatasetGraph; +import org.apache.jena.sparql.core.DatasetGraphFactory; +import org.apache.jena.sparql.exec.UpdateExec; +import org.apache.jena.sparql.exec.UpdateExecBuilder; +import org.apache.jena.sparql.function.FunctionRegistry; +import org.apache.jena.sparql.util.Context; +import org.apache.jena.sparql.util.Symbol; +import org.apache.jena.update.UpdateExecutionFactory; +import org.apache.jena.update.UpdateFactory; +import org.apache.jena.update.UpdateProcessor; +import org.apache.jena.update.UpdateRequest; +import org.junit.Assert; +import org.junit.Test; + +public class TestUpdateExecutionCancel { + + /** Set cancel signal function via {@link UpdateExecBuilder#set(Symbol, Object)}. */ + @Test + public void test_cancel_signal_1() { + DatasetGraph dsg = DatasetGraphFactory.create(); + FunctionRegistry fnReg = TestQueryExecutionCancel.registerCancelSignalFunction(new FunctionRegistry()); + UpdateExec ue = UpdateExec.dataset(dsg) + .update("INSERT {

?o } WHERE { BIND(() AS ?o) }") + .set(ARQConstants.registryFunctions, fnReg) + .build(); + ue.execute(); + Assert.assertEquals(1, Iter.count(dsg.find())); + } + + /** Set cancel signal function via {@link UpdateExecBuilder#context(Context)}. */ + @Test + public void test_cancel_signal_2() { + DatasetGraph dsg = DatasetGraphFactory.create(); + Context cxt = ARQ.getContext().copy(); + FunctionRegistry fnReg = TestQueryExecutionCancel.registerCancelSignalFunction(new FunctionRegistry()); + FunctionRegistry.set(cxt, fnReg); + UpdateExec ue = UpdateExec.dataset(dsg) + .update("INSERT {

?o } WHERE { BIND(() AS ?o) }") + .context(cxt) + .build(); + ue.execute(); + Assert.assertEquals(1, Iter.count(dsg.find())); + } + + /** Set cancel signal function via {@link UpdateExec#getContext()}. */ + @Test + public void test_cancel_signal_3() { + DatasetGraph dsg = DatasetGraphFactory.create(); + UpdateExec ue = UpdateExec.dataset(dsg) + .update("INSERT {

?o } WHERE { BIND(() AS ?o) }") + .build(); + Context cxt = ue.getContext(); + FunctionRegistry fnReg = TestQueryExecutionCancel.registerCancelSignalFunction(new FunctionRegistry()); + FunctionRegistry.set(cxt, fnReg); + ue.execute(); + Assert.assertEquals(1, Iter.count(dsg.find())); + } + + @Test(expected = QueryCancelledException.class, timeout = 5000) + public void test_update_cancel_1() { + Graph graph = TestQueryExecutionCancel.createTestGraph(); + // Create an insert whose WHERE clause creates 3 cross joins a 1000 triples/bindings. + // This would result in one billion result rows. + UpdateExec + .dataset(graph) + // No-op delete followed by insert indirectly tests that timeout is applied to overall update request. + .update("DELETE {

} WHERE { ?a ?b ?c } ; INSERT {

} WHERE { ?a ?b ?c . ?d ?e ?f . ?g ?h ?i . }") + .timeout(50, TimeUnit.MILLISECONDS) + .build() + .execute(); + } + + /** Test that creates iterators over a billion result rows and attempts to cancel them. + * If this test hangs then it is likely that something went wrong in the cancellation machinery. */ + @Test(timeout = 10000) + public void test_cancel_concurrent_1() { + int maxCancelDelayInMillis = 100; + + int cpuCount = Runtime.getRuntime().availableProcessors(); + // Spend at most roughly 1 second per cpu (10 tasks a max 100ms) + int taskCount = cpuCount * 10; + + // Create a model with 1000 triples + Dataset dataset = DatasetFactory.wrap(DatasetGraphFactory.wrap(TestQueryExecutionCancel.createTestGraph())); + + // Create a query that creates 3 cross joins - resulting in one billion result rows + UpdateRequest updateRequest = UpdateFactory.create("INSERT {

} WHERE { ?a ?b ?c . ?d ?e ?f . ?g ?h ?i . }"); + Callable qeFactory = () -> UpdateExecutionFactory.create(updateRequest, dataset); + + runConcurrentAbort(taskCount, maxCancelDelayInMillis, qeFactory); + } + + /** Reusable method that creates a parallel stream that starts query executions + * and schedules cancel tasks on a separate thread pool. */ + public static void runConcurrentAbort(int taskCount, int maxCancelDelay, Callable upFactory) { + Random cancelDelayRandom = new Random(); + ExecutorService executorService = Executors.newCachedThreadPool(); + try { + List list = IntStream.range(0, taskCount).boxed().collect(Collectors.toList()); + list + .parallelStream() + .forEach(i -> { + UpdateProcessor up; + try { + up = upFactory.call(); + } catch (Exception e) { + throw new RuntimeException("Failed to build a query execution", e); + } + Future future = executorService.submit(() -> up.execute()); + int delayToAbort = cancelDelayRandom.nextInt(maxCancelDelay); + try { + Thread.sleep(delayToAbort); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + // System.out.println("Abort: " + qe); + up.abort(); + try { + // System.out.println("Waiting for: " + qe); + future.get(); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (!(cause instanceof QueryCancelledException)) { + // Unexpected exception - print out the stack trace + e.printStackTrace(); + } + Assert.assertEquals(QueryCancelledException.class, cause.getClass()); + } catch (InterruptedException e) { + // Ignored + } finally { + // System.out.println("Completed: " + qe); + } + }); + } finally { + executorService.shutdownNow(); + } + } +}