Skip to content

Commit

Permalink
Use transaction instead of thread
Browse files Browse the repository at this point in the history
  • Loading branch information
reinhapa committed May 9, 2024
1 parent 16b072b commit 5eef0e2
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
*/
package org.exist.collections.triggers;

import org.exist.storage.txn.Txn;
import org.exist.storage.txn.TxnListener;
import org.exist.xmldb.XmldbURI;

import javax.annotation.Nullable;
Expand All @@ -30,6 +32,8 @@
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

/**
* Avoid infinite recursions in Triggers by preventing the same trigger
Expand All @@ -39,10 +43,10 @@
*/
public class TriggerStatePerThread {

private static final ConcurrentMap<Thread, Deque<TriggerState>> THREAD_LOCAL_STATES = new ConcurrentHashMap<>();
private static final ConcurrentMap<Txn, Deque<TriggerState>> TRIGGER_STATES = new ConcurrentHashMap<>();

public static void setAndTest(final Trigger trigger, final TriggerPhase triggerPhase, final TriggerEvent triggerEvent, final XmldbURI src, final @Nullable XmldbURI dst) throws CyclicTriggerException {
final Deque<TriggerState> states = getStates();
public static void setAndTest(final Txn txn, final Trigger trigger, final TriggerPhase triggerPhase, final TriggerEvent triggerEvent, final XmldbURI src, final @Nullable XmldbURI dst) throws CyclicTriggerException {
final Deque<TriggerState> states = getStates(txn);

if (states.isEmpty()) {
if (triggerPhase != TriggerPhase.BEFORE) {
Expand Down Expand Up @@ -115,11 +119,11 @@ public CyclicTriggerException(final String message) {
}
}

public static void clearIfFinished(final TriggerPhase phase) {
public static void clearIfFinished(final Txn txn, final TriggerPhase phase) {
if (phase == TriggerPhase.AFTER) {

int depth = 0;
final Deque<TriggerState> states = getStates();
final Deque<TriggerState> states = getStates(txn);
for (final Iterator<TriggerState> it = states.descendingIterator(); it.hasNext(); ) {
final TriggerState state = it.next();
switch (state.triggerPhase) {
Expand All @@ -135,24 +139,45 @@ public static void clearIfFinished(final TriggerPhase phase) {
}

if (depth == 0) {
clear();
clear(txn);
}
}
}

public static void clear() {
THREAD_LOCAL_STATES.remove(Thread.currentThread());
public static void clear(final Txn txn) {
TRIGGER_STATES.remove(txn);
}

public static boolean isEmpty() {
return getStates().isEmpty();
public static boolean isEmpty(final Txn txn) {
return getStates(txn).isEmpty();
}

private static Deque<TriggerState> getStates() {
return THREAD_LOCAL_STATES.computeIfAbsent(Thread.currentThread(), thread -> new ArrayDeque<>());
public static void forEach(BiConsumer<Txn, Deque<TriggerState>> action) {
TRIGGER_STATES.forEach(action);
}

record TriggerState(Trigger trigger, TriggerPhase triggerPhase, TriggerEvent triggerEvent, XmldbURI src,
private static Deque<TriggerState> getStates(final Txn txn) {
return TRIGGER_STATES.computeIfAbsent(txn, TriggerStatePerThread::initStates);
}

private static Deque<TriggerState> initStates(final Txn txn) {
txn.registerListener(new TransactionCleanUp(txn, TriggerStatePerThread::clear));
return new ArrayDeque<>();
}

public record TransactionCleanUp(Txn txn, Consumer<Txn> consumer) implements TxnListener {
@Override
public void commit() {
consumer.accept(txn);
}

@Override
public void abort() {
consumer.accept(txn);
}
}

public record TriggerState(Trigger trigger, TriggerPhase triggerPhase, TriggerEvent triggerEvent, XmldbURI src,
@Nullable XmldbURI dst, boolean possiblyCyclic) {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ private void prepare(final TriggerEvent event, final DBBroker broker, final Txn

// avoid infinite recursion
try {
TriggerStatePerThread.setAndTest(this, TriggerPhase.BEFORE, event, src, dst);
TriggerStatePerThread.setAndTest(transaction,this, TriggerPhase.BEFORE, event, src, dst);
} catch (final TriggerStatePerThread.CyclicTriggerException e) {
LOG.warn(e.getMessage());
return;
Expand All @@ -241,7 +241,7 @@ private void prepare(final TriggerEvent event, final DBBroker broker, final Txn
declareExternalVariables(context, TriggerPhase.BEFORE, event, src, dst, isCollection);

} catch (final XPathException | IOException | PermissionDeniedException e) {
TriggerStatePerThread.clear();
TriggerStatePerThread.clear(transaction);
throw new TriggerException(PREPARE_EXCEPTION_MESSAGE, e);
}

Expand All @@ -255,7 +255,7 @@ private void prepare(final TriggerEvent event, final DBBroker broker, final Txn
LOG.debug("Trigger fired for prepare");
}
} catch (final XPathException | PermissionDeniedException e) {
TriggerStatePerThread.clear();
TriggerStatePerThread.clear(transaction);
throw new TriggerException(PREPARE_EXCEPTION_MESSAGE, e);
} finally {
context.runCleanupTasks();
Expand All @@ -271,7 +271,7 @@ private void finish(final TriggerEvent event, final DBBroker broker, final Txn t

// avoid infinite recursion
try {
TriggerStatePerThread.setAndTest(this, TriggerPhase.AFTER, event, src, dst);
TriggerStatePerThread.setAndTest(transaction,this, TriggerPhase.AFTER, event, src, dst);
} catch (final TriggerStatePerThread.CyclicTriggerException e) {
LOG.warn(e.getMessage());
return;
Expand Down Expand Up @@ -305,7 +305,7 @@ private void finish(final TriggerEvent event, final DBBroker broker, final Txn t
context.runCleanupTasks();
}

TriggerStatePerThread.clearIfFinished(TriggerPhase.AFTER);
TriggerStatePerThread.clearIfFinished(transaction, TriggerPhase.AFTER);

if (LOG.isDebugEnabled()) {
LOG.debug("Trigger fired for finish");
Expand Down Expand Up @@ -393,10 +393,11 @@ private CompiledXQuery getScript(final DBBroker broker, final Txn transaction) t
}

private void execute(final TriggerPhase phase, final TriggerEvent event, final DBBroker broker, final Txn transaction, final QName functionName, final XmldbURI src, final XmldbURI dst) throws TriggerException {
System.err.format("phase: %s, event: %s, tx: %s, thread: %s", phase, event, transaction, Thread.currentThread()).println();

// avoid infinite recursion
try {
TriggerStatePerThread.setAndTest(this, phase, event, src, dst);
TriggerStatePerThread.setAndTest(transaction, this, phase, event, src, dst);
} catch (final TriggerStatePerThread.CyclicTriggerException e) {
LOG.warn("Skipping Trigger: {}", e.getMessage());
return;
Expand All @@ -414,7 +415,7 @@ private void execute(final TriggerPhase phase, final TriggerEvent event, final D
return;
}
} catch (final TriggerException e) {
TriggerStatePerThread.clear();
TriggerStatePerThread.clear(transaction);
throw e;
}

Expand Down Expand Up @@ -454,14 +455,14 @@ private void execute(final TriggerPhase phase, final TriggerEvent event, final D
}
}

TriggerStatePerThread.clear();
TriggerStatePerThread.clear(transaction);
throw new TriggerException(PREPARE_EXCEPTION_MESSAGE, e);
} finally {
compiledQuery.reset();
context.runCleanupTasks();
}

TriggerStatePerThread.clearIfFinished(phase);
TriggerStatePerThread.clearIfFinished(transaction, phase);

if (LOG.isDebugEnabled()) {
if (phase == TriggerPhase.AFTER) {
Expand Down

0 comments on commit 5eef0e2

Please sign in to comment.