From 406d7dc9713c772be0c545a537180c79e1effefd Mon Sep 17 00:00:00 2001 From: Laimonas Turauskas Date: Thu, 25 Jan 2024 10:58:31 -0500 Subject: [PATCH] Make formula internals thread safe. (#332) * Make formula internals thread safe. * PR feedback. * Remove old test. --- .../formula/coroutines/FlowRuntime.kt | 7 +- .../formula/rxjava3/RxJavaRuntime.kt | 14 +-- .../com/instacart/formula/FormulaRuntime.kt | 25 ++-- .../instacart/formula/internal/ActionKey.kt | 2 +- .../formula/internal/ChildrenManager.kt | 1 + .../formula/internal/FormulaManagerImpl.kt | 1 + .../com/instacart/formula/internal/Frame.kt | 2 +- .../formula/internal/ListenerImpl.kt | 8 +- .../internal/SynchronizedUpdateQueue.kt | 115 ++++++++++++++++++ .../formula/internal/ThreadChecker.kt | 19 --- .../instacart/formula/FormulaRuntimeTest.kt | 83 +++++++++---- .../formula/actions/EventOnBgThreadAction.kt | 27 ---- .../formula/internal/ThreadCheckerTest.kt | 29 ----- .../formula/subjects/MultiThreadRobot.kt | 87 +++++++++++++ .../formula/subjects/SleepFormula.kt | 61 ++++++++++ 15 files changed, 347 insertions(+), 134 deletions(-) create mode 100644 formula/src/main/java/com/instacart/formula/internal/SynchronizedUpdateQueue.kt delete mode 100644 formula/src/main/java/com/instacart/formula/internal/ThreadChecker.kt delete mode 100644 formula/src/test/java/com/instacart/formula/actions/EventOnBgThreadAction.kt delete mode 100644 formula/src/test/java/com/instacart/formula/internal/ThreadCheckerTest.kt create mode 100644 formula/src/test/java/com/instacart/formula/subjects/MultiThreadRobot.kt create mode 100644 formula/src/test/java/com/instacart/formula/subjects/SleepFormula.kt diff --git a/formula-coroutines/src/main/java/com/instacart/formula/coroutines/FlowRuntime.kt b/formula-coroutines/src/main/java/com/instacart/formula/coroutines/FlowRuntime.kt index ab4a8d77..4bc6c16e 100644 --- a/formula-coroutines/src/main/java/com/instacart/formula/coroutines/FlowRuntime.kt +++ b/formula-coroutines/src/main/java/com/instacart/formula/coroutines/FlowRuntime.kt @@ -3,7 +3,6 @@ package com.instacart.formula.coroutines import com.instacart.formula.FormulaRuntime import com.instacart.formula.IFormula import com.instacart.formula.Inspector -import com.instacart.formula.internal.ThreadChecker import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.channels.awaitClose import kotlinx.coroutines.channels.trySendBlocking @@ -22,12 +21,8 @@ object FlowRuntime { inspector: Inspector? = null, isValidationEnabled: Boolean = false, ): Flow { - val threadChecker = ThreadChecker(formula) return callbackFlow { - threadChecker.check("Need to subscribe on main thread.") - val runtime = FormulaRuntime( - threadChecker = threadChecker, formula = formula, onOutput = this::trySendBlocking, onError = this::close, @@ -35,7 +30,7 @@ object FlowRuntime { isValidationEnabled = isValidationEnabled, ) - input.onEach { input -> runtime.onInput(input) }.launchIn(this) + input.onEach(runtime::onInput).launchIn(this) awaitClose { runtime.terminate() diff --git a/formula-rxjava3/src/main/java/com/instacart/formula/rxjava3/RxJavaRuntime.kt b/formula-rxjava3/src/main/java/com/instacart/formula/rxjava3/RxJavaRuntime.kt index ded6770d..0fee03b5 100644 --- a/formula-rxjava3/src/main/java/com/instacart/formula/rxjava3/RxJavaRuntime.kt +++ b/formula-rxjava3/src/main/java/com/instacart/formula/rxjava3/RxJavaRuntime.kt @@ -1,10 +1,8 @@ package com.instacart.formula.rxjava3 -import com.instacart.formula.FormulaPlugins import com.instacart.formula.FormulaRuntime import com.instacart.formula.IFormula import com.instacart.formula.Inspector -import com.instacart.formula.internal.ThreadChecker import io.reactivex.rxjava3.core.Observable import io.reactivex.rxjava3.disposables.CompositeDisposable import io.reactivex.rxjava3.disposables.FormulaDisposableHelper @@ -16,12 +14,8 @@ object RxJavaRuntime { inspector: Inspector? = null, isValidationEnabled: Boolean = false, ): Observable { - val threadChecker = ThreadChecker(formula) - return Observable.create { emitter -> - threadChecker.check("Need to subscribe on main thread.") - + return Observable.create { emitter -> val runtime = FormulaRuntime( - threadChecker = threadChecker, formula = formula, onOutput = emitter::onNext, onError = emitter::onError, @@ -30,11 +24,9 @@ object RxJavaRuntime { ) val disposables = CompositeDisposable() - disposables.add(input.subscribe({ input -> - runtime.onInput(input) - }, emitter::onError)) - + disposables.add(input.subscribe(runtime::onInput, emitter::onError)) disposables.add(FormulaDisposableHelper.fromRunnable(runtime::terminate)) + emitter.setDisposable(disposables) }.distinctUntilChanged() } diff --git a/formula/src/main/java/com/instacart/formula/FormulaRuntime.kt b/formula/src/main/java/com/instacart/formula/FormulaRuntime.kt index 5fafcd1a..5375c6d4 100644 --- a/formula/src/main/java/com/instacart/formula/FormulaRuntime.kt +++ b/formula/src/main/java/com/instacart/formula/FormulaRuntime.kt @@ -3,30 +3,27 @@ package com.instacart.formula import com.instacart.formula.internal.FormulaManager import com.instacart.formula.internal.FormulaManagerImpl import com.instacart.formula.internal.ManagerDelegate -import com.instacart.formula.internal.ThreadChecker +import com.instacart.formula.internal.SynchronizedUpdateQueue import java.util.LinkedList /** * Takes a [Formula] and creates an Observable from it. */ class FormulaRuntime( - private val threadChecker: ThreadChecker, private val formula: IFormula, private val onOutput: (Output) -> Unit, private val onError: (Throwable) -> Unit, private val isValidationEnabled: Boolean = false, inspector: Inspector? = null, ) : ManagerDelegate { + private val synchronizedUpdateQueue = SynchronizedUpdateQueue() + private val inspector = FormulaPlugins.inspector(type = formula.type(), local = inspector) private val implementation = formula.implementation() + private var manager: FormulaManagerImpl? = null - private val inspector = FormulaPlugins.inspector( - type = formula.type(), - local = inspector, - ) private var emitOutput = false private var lastOutput: Output? = null - private var input: Input? = null private var key: Any? = null @@ -43,8 +40,7 @@ class FormulaRuntime( private var inputId: Int = 0 /** - * Global transition effect queue which executes side-effects - * after all formulas are idle. + * Global transition effect queue which executes side-effects after all formulas are idle. */ private var globalEffectQueue = LinkedList() @@ -66,8 +62,10 @@ class FormulaRuntime( } fun onInput(input: Input) { - threadChecker.check("Input arrived on a wrong thread.") + synchronizedUpdateQueue.postUpdate { onInputInternal(input) } + } + private fun onInputInternal(input: Input) { if (isRuntimeTerminated) return val isKeyValid = isKeyValid(input) @@ -105,8 +103,10 @@ class FormulaRuntime( } fun terminate() { - threadChecker.check("Need to unsubscribe on the main thread.") + synchronizedUpdateQueue.postUpdate(this::terminateInternal) + } + private fun terminateInternal() { if (isRuntimeTerminated) return isRuntimeTerminated = true @@ -127,8 +127,6 @@ class FormulaRuntime( } override fun onPostTransition(effects: Effects?, evaluate: Boolean) { - threadChecker.check("Only thread that created it can post transition result") - effects?.let { globalEffectQueue.addLast(effects) } @@ -271,6 +269,7 @@ class FormulaRuntime( private fun initManager(initialInput: Input): FormulaManagerImpl { return FormulaManagerImpl( + queue = synchronizedUpdateQueue, delegate = this, formula = implementation, initialInput = initialInput, diff --git a/formula/src/main/java/com/instacart/formula/internal/ActionKey.kt b/formula/src/main/java/com/instacart/formula/internal/ActionKey.kt index d4c6a7ae..bcbee82e 100644 --- a/formula/src/main/java/com/instacart/formula/internal/ActionKey.kt +++ b/formula/src/main/java/com/instacart/formula/internal/ActionKey.kt @@ -2,5 +2,5 @@ package com.instacart.formula.internal internal data class ActionKey( val id: Long, - val delegateKey: Any?, + private val delegateKey: Any?, ) \ No newline at end of file diff --git a/formula/src/main/java/com/instacart/formula/internal/ChildrenManager.kt b/formula/src/main/java/com/instacart/formula/internal/ChildrenManager.kt index 3fcd9376..1148893a 100644 --- a/formula/src/main/java/com/instacart/formula/internal/ChildrenManager.kt +++ b/formula/src/main/java/com/instacart/formula/internal/ChildrenManager.kt @@ -110,6 +110,7 @@ internal class ChildrenManager( val childFormulaHolder = children.findOrInit(key) { val implementation = formula.implementation() FormulaManagerImpl( + queue = delegate.queue, delegate = delegate, formula = implementation, initialInput = input, diff --git a/formula/src/main/java/com/instacart/formula/internal/FormulaManagerImpl.kt b/formula/src/main/java/com/instacart/formula/internal/FormulaManagerImpl.kt index f1c2a998..12a64b89 100644 --- a/formula/src/main/java/com/instacart/formula/internal/FormulaManagerImpl.kt +++ b/formula/src/main/java/com/instacart/formula/internal/FormulaManagerImpl.kt @@ -18,6 +18,7 @@ import kotlin.reflect.KClass * a state change, it will rerun [Formula.evaluate]. */ internal class FormulaManagerImpl( + val queue: SynchronizedUpdateQueue, private val delegate: ManagerDelegate, private val formula: Formula, initialInput: Input, diff --git a/formula/src/main/java/com/instacart/formula/internal/Frame.kt b/formula/src/main/java/com/instacart/formula/internal/Frame.kt index 55c231f8..93674d40 100644 --- a/formula/src/main/java/com/instacart/formula/internal/Frame.kt +++ b/formula/src/main/java/com/instacart/formula/internal/Frame.kt @@ -14,7 +14,7 @@ import com.instacart.formula.Evaluation */ internal class Frame( val input: Input, - val state: State, + private val state: State, val evaluation: Evaluation, val associatedEvaluationId: Long, ) diff --git a/formula/src/main/java/com/instacart/formula/internal/ListenerImpl.kt b/formula/src/main/java/com/instacart/formula/internal/ListenerImpl.kt index 3cf42ca6..3535617d 100644 --- a/formula/src/main/java/com/instacart/formula/internal/ListenerImpl.kt +++ b/formula/src/main/java/com/instacart/formula/internal/ListenerImpl.kt @@ -18,8 +18,10 @@ internal class ListenerImpl(internal var key: Any) : Liste // TODO: log if null listener (it might be due to formula removal or due to callback removal) val manager = manager ?: return - val deferredTransition = DeferredTransition(this, transition, event) - manager.onPendingTransition(deferredTransition) + manager.queue.postUpdate { + val deferredTransition = DeferredTransition(this, transition, event) + manager.onPendingTransition(deferredTransition) + } } fun disable() { @@ -31,6 +33,6 @@ internal class ListenerImpl(internal var key: Any) : Liste /** * A wrapper to convert Listener from (Unit) -> Unit into () -> Unit */ -internal data class UnitListener(val delegate: Listener): () -> Unit { +internal data class UnitListener(private val delegate: Listener): () -> Unit { override fun invoke() = delegate(Unit) } \ No newline at end of file diff --git a/formula/src/main/java/com/instacart/formula/internal/SynchronizedUpdateQueue.kt b/formula/src/main/java/com/instacart/formula/internal/SynchronizedUpdateQueue.kt new file mode 100644 index 00000000..b3975ce8 --- /dev/null +++ b/formula/src/main/java/com/instacart/formula/internal/SynchronizedUpdateQueue.kt @@ -0,0 +1,115 @@ +package com.instacart.formula.internal + +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.atomic.AtomicReference + +/** + * We can only process one formula update at a time. To enable thread-safety we use a + * non-blocking event queue with a serial confinement strategy for queue processing. All external + * formula events use [postUpdate] function which adds the update to [updateQueue], and then, + * tries to start processing the queue via atomic [threadRunning] variable. If another thread + * was first to take over [threadRunning], we let that thread continue and we exit out. Given + * all formula state access is gated via [threadRunning] atomic reference, we are able to ensure + * that there is happens-before relationship between each thread and memory changes are visible + * between them. + */ +class SynchronizedUpdateQueue { + /** + * Defines a thread currently executing formula update. Null value indicates idle queue. + * + * To ensure that memory changes within formula internals are synchronized between threads, + * we piggyback on the internal synchronization of this variable. Modification to this + * variable wraps around every formula update: + * - threadRunning = MyThread + * - formulaUpdate() + * - threadRunning = null + * + * This creates happens-before relationship between multiple threads and makes sure that + * all modifications within formulaUpdate() block are visible to the next thread. + */ + private val threadRunning = AtomicReference() + + /** + * A non-blocking thread-safe FIFO queue that tracks pending updates. + */ + private val updateQueue = ConcurrentLinkedQueue<() -> Unit>() + + /** + * To ensure that we execute one update at a time, all external formula events use this + * function to post updates. We add the update to a queue and then try to start processing. + * Failure to start processing indicates that another thread was first and we allow that + * thread to continue. + */ + fun postUpdate(update: () -> Unit) { + val currentThread = Thread.currentThread() + val owner = threadRunning.get() + if (owner == currentThread) { + // This indicates a nested update where an update triggers another update. Given we + // are already thread gated, we can execute this update immediately without a need + // for any extra synchronization. + update() + return + } + + val updateExecuted = if (updateQueue.peek() == null) { + // No pending update, let's try to run our update immediately + takeOver(currentThread, update) + } else { + false + } + + if (!updateExecuted) { + updateQueue.add(update) + } + tryToDrainQueue(currentThread) + } + + /** + * Tries to drain the update queue. It will process one update at a time until + * queue is empty or another thread takes over processing. + */ + private fun tryToDrainQueue(currentThread: Thread) { + while (true) { + // First, we peek to see if there is a value to process. + val peekUpdate = updateQueue.peek() + if (peekUpdate != null) { + // Since there is a pending update, we try to process it. + val updateExecuted = takeOver(currentThread, this::pollAndExecute) + if (!updateExecuted) { + return + } + } else { + return + } + } + } + + private fun pollAndExecute() { + // We remove first update from the queue and execute if it exists. + val actualUpdate = updateQueue.poll() + actualUpdate?.invoke() + } + + /** + * Tries to take over the processing and execute an [update]. + * + * Returns true if it was able to successfully claim the ownership and execute the + * update. Otherwise, returns false (this indicates another thread claimed the right first). + */ + private fun takeOver(currentThread: Thread, update: () -> Unit): Boolean { + return if (threadRunning.compareAndSet(null, currentThread)) { + // We took over the processing, let's execute the [update] + try { + update() + } finally { + // We reset the running thread. To ensure happens-before relationship, this must + // always happen after the [update]. + threadRunning.set(null) + } + true + } else { + // Another thread is running, so we return false. + false + } + } +} \ No newline at end of file diff --git a/formula/src/main/java/com/instacart/formula/internal/ThreadChecker.kt b/formula/src/main/java/com/instacart/formula/internal/ThreadChecker.kt deleted file mode 100644 index bab7e8f4..00000000 --- a/formula/src/main/java/com/instacart/formula/internal/ThreadChecker.kt +++ /dev/null @@ -1,19 +0,0 @@ -package com.instacart.formula.internal - -import com.instacart.formula.IFormula - -/** - * A poor man's thread checker. - */ -class ThreadChecker(private val formula: IFormula<*, *>) { - private val formulaType = formula::class.qualifiedName - private val threadName = Thread.currentThread().name - private val id = Thread.currentThread().id - - fun check(errorMessage: String) { - val thread = Thread.currentThread() - if (thread.id != id) { - throw IllegalStateException("$formulaType - $errorMessage Expected: $threadName, Was: ${thread.name}") - } - } -} diff --git a/formula/src/test/java/com/instacart/formula/FormulaRuntimeTest.kt b/formula/src/test/java/com/instacart/formula/FormulaRuntimeTest.kt index 16b8ea9b..c69ce0ca 100644 --- a/formula/src/test/java/com/instacart/formula/FormulaRuntimeTest.kt +++ b/formula/src/test/java/com/instacart/formula/FormulaRuntimeTest.kt @@ -3,7 +3,6 @@ package com.instacart.formula import com.google.common.truth.Truth import com.google.common.truth.Truth.assertThat import com.instacart.formula.actions.EmptyAction -import com.instacart.formula.actions.EventOnBgThreadAction import com.instacart.formula.internal.ClearPluginsRule import com.instacart.formula.internal.FormulaKey import com.instacart.formula.internal.TestInspector @@ -29,12 +28,13 @@ import com.instacart.formula.subjects.EventFormula import com.instacart.formula.subjects.ExtremelyNestedFormula import com.instacart.formula.subjects.FromObservableWithInputFormula import com.instacart.formula.subjects.HasChildFormula -import com.instacart.formula.subjects.MultiChildIndirectStateChangeRobot import com.instacart.formula.subjects.InputChangeWhileFormulaRunningRobot import com.instacart.formula.subjects.KeyFormula import com.instacart.formula.subjects.KeyUsingListFormula import com.instacart.formula.subjects.MessageFormula import com.instacart.formula.subjects.MixingCallbackUseWithKeyUse +import com.instacart.formula.subjects.MultiChildIndirectStateChangeRobot +import com.instacart.formula.subjects.MultiThreadRobot import com.instacart.formula.subjects.MultipleChildEvents import com.instacart.formula.subjects.NestedCallbackCallRobot import com.instacart.formula.subjects.NestedChildTransitionAfterNoEvaluationPass @@ -50,8 +50,10 @@ import com.instacart.formula.subjects.ParentTransitionOnChildActionStart import com.instacart.formula.subjects.ParentUpdateChildAndSelfOnEventRobot import com.instacart.formula.subjects.PendingActionFormulaTerminatedOnActionInit import com.instacart.formula.subjects.RemovingTerminateStreamSendsNoMessagesFormula +import com.instacart.formula.subjects.ReusableFunctionCreatesUniqueListeners import com.instacart.formula.subjects.RootFormulaKeyTestSubject import com.instacart.formula.subjects.RunAgainActionFormula +import com.instacart.formula.subjects.SleepFormula import com.instacart.formula.subjects.StartStopFormula import com.instacart.formula.subjects.StateTransitionTimingFormula import com.instacart.formula.subjects.StreamInitMessageDeliveredOnce @@ -60,9 +62,8 @@ import com.instacart.formula.subjects.SubscribesToAllUpdatesBeforeDeliveringMess import com.instacart.formula.subjects.TerminateFormula import com.instacart.formula.subjects.TestKey import com.instacart.formula.subjects.TransitionAfterNoEvaluationPass -import com.instacart.formula.subjects.UseInputFormula -import com.instacart.formula.subjects.ReusableFunctionCreatesUniqueListeners import com.instacart.formula.subjects.UniqueListenersWithinLoop +import com.instacart.formula.subjects.UseInputFormula import com.instacart.formula.subjects.UsingKeyToScopeCallbacksWithinAnotherFunction import com.instacart.formula.subjects.UsingKeyToScopeChildFormula import com.instacart.formula.test.CoroutinesTestableRuntime @@ -83,7 +84,6 @@ import org.junit.rules.RuleChain import org.junit.rules.TestName import org.junit.runner.RunWith import org.junit.runners.Parameterized -import java.util.concurrent.TimeUnit import kotlin.reflect.KClass @RunWith(Parameterized::class) @@ -633,25 +633,6 @@ class FormulaRuntimeTest(val runtime: TestableRuntime, val name: String) { assertThat(eventCallback.values()).containsExactly("a", "b").inOrder() } - @Test - fun `when action returns value on background thread, we emit an error`() { - val bgAction = EventOnBgThreadAction() - val eventCallback = TestEventCallback() - val formula = OnlyUpdateFormula { - bgAction.onEvent { - transition { - eventCallback(it.toString()) - } - } - } - - val observer = runtime.test(formula, Unit) - bgAction.latch.await(50, TimeUnit.MILLISECONDS) - assertThat(bgAction.errors.values().firstOrNull()?.message).contains( - "com.instacart.formula.subjects.OnlyUpdateFormula - Only thread that created it can post transition result Expected:" - ) - } - @Test fun `stream is disposed when evaluation does not contain it`() { DynamicStreamSubject(runtime) .updateStreams(keys = arrayOf("1")) @@ -1294,6 +1275,60 @@ class FormulaRuntimeTest(val runtime: TestableRuntime, val name: String) { .assertValue(0) } + @Test + fun `formula multi-thread handoff to executing thread`() { + with(MultiThreadRobot(runtime)) { + thread("thread-a", 50) + thread("thread-b", 10) + awaitCompletion() + thread("thread-b", 10) + + awaitEvents( + SleepFormula.SleepEvent(50, "thread-a"), + // First thread-b event is handed-off to thread-a + SleepFormula.SleepEvent(10, "thread-a"), + // Second thread-b event is handled by thread-b + SleepFormula.SleepEvent(10, "thread-b") + ) + } + } + + @Test + fun `formula multi-threaded events fired at the same time`() { + with(MultiThreadRobot(runtime)) { + thread("a", 25) + thread("b", 25) + thread("c", 25) + thread("d", 25) + + awaitEvents { events -> + assertThat(events).hasSize(4) + + val durations = events.map { it.duration } + assertThat(durations).containsExactly(25L, 25L, 25L, 25L) + } + } + } + + @Test + fun `formula multi-threaded input after termination`() { + with(MultiThreadRobot(runtime)) { + thread("a", 25) + awaitCompletion() + + thread("c") { dispose() } + thread("d") { + // We delay to ensure that dispose is called first + Thread.sleep(50) + input("key-2") + } + + awaitEvents { events -> + assertThat(events).hasSize(1) + } + } + } + @Test fun `inspector events`() { val globalInspector = TestInspector() diff --git a/formula/src/test/java/com/instacart/formula/actions/EventOnBgThreadAction.kt b/formula/src/test/java/com/instacart/formula/actions/EventOnBgThreadAction.kt deleted file mode 100644 index e504887a..00000000 --- a/formula/src/test/java/com/instacart/formula/actions/EventOnBgThreadAction.kt +++ /dev/null @@ -1,27 +0,0 @@ -package com.instacart.formula.actions - -import com.instacart.formula.Action -import com.instacart.formula.Cancelable -import com.instacart.formula.test.TestEventCallback -import java.util.concurrent.CountDownLatch -import java.util.concurrent.Executors - -class EventOnBgThreadAction : Action { - val errors = TestEventCallback() - val latch = CountDownLatch(1) - - override fun key(): Any? = null - - override fun start(send: (Int) -> Unit): Cancelable? { - Executors.newSingleThreadExecutor().execute { - try { - send(0) - } catch (e: Throwable) { - errors.invoke(e) - } finally { - latch.countDown() - } - } - return null - } -} \ No newline at end of file diff --git a/formula/src/test/java/com/instacart/formula/internal/ThreadCheckerTest.kt b/formula/src/test/java/com/instacart/formula/internal/ThreadCheckerTest.kt deleted file mode 100644 index 2800eb21..00000000 --- a/formula/src/test/java/com/instacart/formula/internal/ThreadCheckerTest.kt +++ /dev/null @@ -1,29 +0,0 @@ -package com.instacart.formula.internal - -import com.google.common.truth.Truth -import com.instacart.formula.subjects.DynamicStreamSubject -import com.instacart.formula.subjects.KeyFormula -import org.junit.Test -import java.util.concurrent.CountDownLatch -import java.util.concurrent.TimeUnit - -class ThreadCheckerTest { - - @Test - fun `detects incorrect thread`() { - val checker = ThreadChecker(KeyFormula()) - - val latch = CountDownLatch(1) - Thread { - try { - checker.check("error message") - error("thread checker should fail") - } catch (e: Exception) { - Truth.assertThat(e.message).startsWith("com.instacart.formula.subjects.KeyFormula - error message") - latch.countDown() - } - }.start() - - latch.await(1, TimeUnit.SECONDS) - } -} \ No newline at end of file diff --git a/formula/src/test/java/com/instacart/formula/subjects/MultiThreadRobot.kt b/formula/src/test/java/com/instacart/formula/subjects/MultiThreadRobot.kt new file mode 100644 index 00000000..5b10417c --- /dev/null +++ b/formula/src/test/java/com/instacart/formula/subjects/MultiThreadRobot.kt @@ -0,0 +1,87 @@ +package com.instacart.formula.subjects + +import com.google.common.truth.Truth +import com.instacart.formula.subjects.SleepFormula.SleepEvent +import com.instacart.formula.test.TestableRuntime +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.CountDownLatch +import java.util.concurrent.Executor +import java.util.concurrent.Executors +import java.util.concurrent.ThreadFactory +import java.util.concurrent.TimeUnit + +class MultiThreadRobot(val runtime: TestableRuntime) { + class NamedThreadFactory(private val name: String): ThreadFactory { + override fun newThread(r: Runnable): Thread { + return Thread(r, name) + } + } + + private val threadFormula = SleepFormula() + private val observer = runtime.test(threadFormula, "initial-key") + + // Manage executors and update completion + private val executorMap = mutableMapOf() + private val eventCompletionLatches = ConcurrentLinkedQueue() + + fun thread(name: String, sleepDuration: Long) = apply { + thread(name) { + observer.output { + this.onSleep(sleepDuration) + } + } + } + + fun thread(name: String, function: () -> Unit) = apply { + val executor = executorMap.getOrPut(name) { + Executors.newSingleThreadExecutor(NamedThreadFactory(name)) + } + + // Creating a latch and adding it to a list to make sure we are able to + // wait for all event completion. + val completionLatch = CountDownLatch(1) + eventCompletionLatches.add(completionLatch) + + executor.execute { + observer.output { + function() + completionLatch.countDown() + } + } + } + + fun input(newKey: String) = apply { + observer.input(newKey) + } + + fun dispose() = apply { + observer.dispose() + } + + fun awaitCompletion() = apply { + for (latch in eventCompletionLatches) { + await(latch, 1, TimeUnit.SECONDS) + } + } + + fun awaitEvents(vararg sleepEvents: SleepEvent) = apply { + awaitCompletion() + + observer.output { + Truth.assertThat(this.sleepEvents).containsExactly(*sleepEvents).inOrder() + } + } + + fun awaitEvents(assertEvents: (List) -> Unit) = apply { + awaitCompletion() + observer.output { + assertEvents(sleepEvents) + } + } + + private fun await(latch: CountDownLatch, timeout: Long, unit: TimeUnit) { + if (!latch.await(timeout, unit)) { + throw IllegalStateException("Timeout") + } + } +} \ No newline at end of file diff --git a/formula/src/test/java/com/instacart/formula/subjects/SleepFormula.kt b/formula/src/test/java/com/instacart/formula/subjects/SleepFormula.kt new file mode 100644 index 00000000..6eb5727e --- /dev/null +++ b/formula/src/test/java/com/instacart/formula/subjects/SleepFormula.kt @@ -0,0 +1,61 @@ +package com.instacart.formula.subjects + +import com.instacart.formula.Action +import com.instacart.formula.Evaluation +import com.instacart.formula.Formula +import com.instacart.formula.Snapshot + +class SleepFormula : Formula() { + + data class SleepEvent( + val duration: Long, + val threadName: String, + ) + + data class State( + val sleepEvents: List = emptyList(), + val pendingEvent: SleepEvent? = null, + ) + + data class Output( + val sleepEvents: List, + val onSleep: (Long) -> Unit, + ) + + override fun key(input: String): Any? { + return input + } + + override fun initialState(input: String): State { + return State() + } + + override fun Snapshot.evaluate(): Evaluation { + return Evaluation( + output = Output( + sleepEvents = state.sleepEvents, + onSleep = context.onEvent { + val newEvent = SleepEvent( + duration = it, + threadName = Thread.currentThread().name, + ) + transition(state.copy(pendingEvent = newEvent)) + } + ), + actions = context.actions { + state.pendingEvent?.let { + Action.onData(it).onEvent { event -> + // Using sleep to control multi-threaded events + Thread.sleep(event.duration) + val events = state.sleepEvents + val newState = state.copy( + sleepEvents = events + event, + pendingEvent = null, + ) + transition(newState) + } + } + } + ) + } +} \ No newline at end of file