Skip to content

Commit

Permalink
GH-2821: Cancel Signal unavailable without timeouts - breaks manual a…
Browse files Browse the repository at this point in the history
…bort()
  • Loading branch information
Aklakan committed Nov 6, 2024
1 parent 510680c commit dd2ef56
Show file tree
Hide file tree
Showing 11 changed files with 198 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.jena.sparql.exec.http.UpdateSendMode;
import org.apache.jena.sparql.syntax.syntaxtransform.UpdateTransformOps;
import org.apache.jena.sparql.util.Context;
import org.apache.jena.sparql.util.ContextAccumulator;
import org.apache.jena.sparql.util.Symbol;
import org.apache.jena.sys.JenaSystem;
import org.apache.jena.update.Update;
Expand Down Expand Up @@ -146,7 +147,7 @@ public String buildString() {
protected UpdateSendMode sendMode = UpdateSendMode.systemDefault;
protected List<String> usingGraphURIs = null;
protected List<String> usingNamedGraphURIs = null;
protected Context context = null;
private ContextAccumulator contextAcc = ContextAccumulator.newBuilder(()->ARQ.getContext());
// Uses query rewrite to replace variables by values.
protected Map<Var, Node> substitutionMap = new HashMap<>();

Expand Down Expand Up @@ -275,25 +276,25 @@ public Y context(Context context) {
if ( context == null )
return thisBuilder();
ensureContext();
this.context.setAll(context);
this.contextAcc.context(context);
return thisBuilder();
}

public Y set(Symbol symbol, Object value) {
ensureContext();
this.context.set(symbol, value);
this.contextAcc.set(symbol, value);
return thisBuilder();
}

public Y set(Symbol symbol, boolean value) {
ensureContext();
this.context.set(symbol, value);
this.contextAcc.set(symbol, value);
return thisBuilder();
}

private void ensureContext() {
if ( context == null )
context = Context.create();
// if ( context == null )
// context = Context.create();
}

public X build() {
Expand Down Expand Up @@ -322,7 +323,7 @@ public X build() {
// If the UpdateRequest object wasn't built until now then build the string instead.
String updateStringActual = updateActual == null ? updateEltAcc.buildString() : null;

Context cxt = (context!=null) ? context : ARQ.getContext().copy();
Context cxt = contextAcc.context();
return buildX(hClient, updateActual, updateStringActual, cxt);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,23 @@ private void startQueryIteratorActual() {

execInit();

// JENA-2821 - Unconditionally provide a cancel signal because manual abort via QueryExec.abort()
// may be triggered any time, even if no timeouts were configured.
// Prior to this issue, the cancel signal was only provided when timeouts were configured.

// The following note is older:
// JENA-2141 - the timeout can go off while building the query iterator structure.
// In this case, use a signal passed through the context.
// We don't know if getPlan().iterator() does a lot of work or not
// (ideally it shouldn't start executing the query but in some sub-systems
// it might be necessary)
//
// This applies to the time to first result because to get the first result, the
// queryIterator must have been built. So it does not apply for the second
// stage of N,-1 or N,M.
context.set(ARQConstants.symCancelQuery, cancelSignal);


/* Timeouts:
* -1,-1 No timeouts
* N, same as -1,N Overall timeout only. No wrapper needed.
Expand All @@ -470,16 +487,6 @@ private void startQueryIteratorActual() {
return;
}

// JENA-2141 - the timeout can go off while building the query iterator structure.
// In this case, use a signal passed through the context.
// We don't know if getPlan().iterator() does a lot of work or not
// (ideally it shouldn't start executing the query but in some sub-systems
// it might be necessary)
//
// This applies to the time to first result because to get the first result, the
// queryIterator must have been built. So it does not apply for the second
// stage of N,-1 or N,M.
context.set(ARQConstants.symCancelQuery, cancelSignal);
TimeoutCallback callback = new TimeoutCallback() ;
expectedCallback.set(callback) ;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.jena.sparql.exec;

import org.apache.jena.sparql.util.Context;
import org.apache.jena.update.UpdateExecution;

public class UpdateExecAdapter implements UpdateExec {
Expand All @@ -39,4 +40,9 @@ protected UpdateExecAdapter(UpdateExecution updateProc) {

@Override
public void execute() { updateProc.execute(); }

@Override
public Context getContext() {
return updateProc.getContext();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Objects;

import org.apache.jena.graph.Node;
import org.apache.jena.query.ARQ;
import org.apache.jena.query.Query;
import org.apache.jena.sparql.core.DatasetGraph;
import org.apache.jena.sparql.core.Var;
Expand All @@ -31,6 +32,7 @@
import org.apache.jena.sparql.modify.UpdateEngineRegistry;
import org.apache.jena.sparql.syntax.syntaxtransform.UpdateTransformOps;
import org.apache.jena.sparql.util.Context;
import org.apache.jena.sparql.util.ContextAccumulator;
import org.apache.jena.sparql.util.Symbol;
import org.apache.jena.update.Update;
import org.apache.jena.update.UpdateException;
Expand All @@ -43,7 +45,11 @@ public class UpdateExecDatasetBuilder implements UpdateExecBuilder {

private DatasetGraph dataset = null;
private Query query = null;
private Context context = null;
// private Context context = null;
private ContextAccumulator contextAcc =
ContextAccumulator.newBuilder(()->ARQ.getContext(), ()->Context.fromDataset(dataset));


// Uses query rewrite to replace variables by values.
private Map<Var, Node> substitutionMap = null;

Expand Down Expand Up @@ -96,29 +102,24 @@ public UpdateExecDatasetBuilder dataset(DatasetGraph dsg) {
public UpdateExecDatasetBuilder context(Context context) {
if ( context == null )
return this;
ensureContext();
this.context.putAll(context);
this.contextAcc.context(context);
return this;
}

@Override
public UpdateExecDatasetBuilder set(Symbol symbol, Object value) {
ensureContext();
this.context.set(symbol, value);
this.contextAcc.set(symbol, value);
return this;
}

@Override
public UpdateExecDatasetBuilder set(Symbol symbol, boolean value) {
ensureContext();
this.context.set(symbol, value);
this.contextAcc.set(symbol, value);
return this;
}


private void ensureContext() {
if ( context == null )
context = new Context();
public Context getContext() {
return contextAcc.context();
}

@Override
Expand Down Expand Up @@ -157,7 +158,7 @@ public UpdateExec build() {
if ( substitutionMap != null && ! substitutionMap.isEmpty() )
actualUpdate = UpdateTransformOps.transform(actualUpdate, substitutionMap);

Context cxt = Context.setupContextForDataset(context, dataset);
Context cxt = getContext();
UpdateEngineFactory f = UpdateEngineRegistry.get().find(dataset, cxt);
if ( f == null )
throw new UpdateException("Failed to find an UpdateEngine");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.jena.sparql.exec;

import org.apache.jena.sparql.util.Context;
import org.apache.jena.update.UpdateExecution;

public class UpdateExecutionAdapter implements UpdateExecution {
Expand All @@ -40,4 +41,8 @@ protected UpdateExecutionAdapter(UpdateExec updateExec) {
@Override
public void execute() { updateExec.execute(); }

@Override
public Context getContext() {
return updateExec.getContext();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,10 @@ public static UpdateExecHTTPBuilder service(String endpointURL) {
this.sendMode = sendMode;
}

// @Override
// public Context getContext() {
// return null;
// }
@Override
public Context getContext() {
return context;
}
//
// @Override
// public DatasetGraph getDatasetGraph() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.jena.sparql.modify;

import org.apache.jena.query.ARQ ;
import org.apache.jena.sparql.ARQConstants ;
import org.apache.jena.sparql.core.DatasetGraph ;
import org.apache.jena.sparql.engine.binding.Binding ;
Expand All @@ -39,18 +38,16 @@ public UpdateEngineBase(DatasetGraph datasetGraph,
this.inputBinding = inputBinding ;
this.context = setupContext(context, datasetGraph) ;
}
private static Context setupContext(Context context, DatasetGraph dataset)

private Context setupContext(Context cxt, DatasetGraph dataset)
{
// To many copies?
if ( context == null ) // Copy of global context to protect against change.
context = ARQ.getContext() ;
context = context.copy() ;
// The following setup is effectively the same as in QueryEngineBase
Context result = cxt;

if ( result == null )
result = Context.setupContextForDataset(cxt, dataset);

if ( dataset.getContext() != null )
context.putAll(dataset.getContext()) ;

context.set(ARQConstants.sysCurrentTime, NodeFactoryExtra.nowAsDateTime()) ;
return context ;
result.set(ARQConstants.sysCurrentTime, NodeFactoryExtra.nowAsDateTime()) ;
return result ;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ protected static Iterator<Binding> evalBindings(Query query, DatasetGraph dsg, B
}

// Not QueryExecDataset.dataset(...) because of initialBinding.
QueryExecDatasetBuilder builder = QueryExecDatasetBuilder.create().dataset(dsg).query(query);
QueryExecDatasetBuilder builder = QueryExecDatasetBuilder.create().dataset(dsg).query(query).context(context);
if ( inputBinding != null ) {
// Must use initialBinding - it puts the input in the results, unlike substitution.
builder.initialBinding(inputBinding);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,9 @@ public void execute() {
uProc.finishRequest() ;
}
}

@Override
public Context getContext() {
return context;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.jena.update;

import org.apache.jena.sparql.util.Context;

/**
* An instance of a execution of an UpdateRequest.
* Applies to UpdateExec (GPI) and UpdateExecution (API).
Expand All @@ -26,4 +28,6 @@ public interface UpdateProcessor
{
/** Execute */
public void execute() ;

public Context getContext();
}
Loading

0 comments on commit dd2ef56

Please sign in to comment.