From b27b7a3e977a3df41738c0f8af96ae26f5746d31 Mon Sep 17 00:00:00 2001 From: Laimonas Turauskas Date: Mon, 22 Jan 2024 11:21:35 -0500 Subject: [PATCH] Moving logic into FormulaRuntime. --- .../formula/coroutines/FlowRuntime.kt | 33 ++---- .../formula/rxjava3/RxJavaRuntime.kt | 38 ++---- .../com/instacart/formula/FormulaRuntime.kt | 109 +++++++++++++----- 3 files changed, 101 insertions(+), 79 deletions(-) diff --git a/formula-coroutines/src/main/java/com/instacart/formula/coroutines/FlowRuntime.kt b/formula-coroutines/src/main/java/com/instacart/formula/coroutines/FlowRuntime.kt index df3fdefe..8be5a65a 100644 --- a/formula-coroutines/src/main/java/com/instacart/formula/coroutines/FlowRuntime.kt +++ b/formula-coroutines/src/main/java/com/instacart/formula/coroutines/FlowRuntime.kt @@ -27,35 +27,18 @@ object FlowRuntime { return callbackFlow { threadChecker.check("Need to subscribe on main thread.") - val mergedInspector = FormulaPlugins.inspector( - type = formula.type(), - local = inspector, + val runtime = FormulaRuntime( + threadChecker = threadChecker, + formula = formula, + onOutput = this::trySendBlocking, + onError = this::close, + inspector = inspector, + isValidationEnabled = isValidationEnabled, ) - val runtimeFactory = { - FormulaRuntime( - threadChecker = threadChecker, - formula = formula, - onOutput = this::trySendBlocking, - onError = this::close, - inspector = mergedInspector, - isValidationEnabled = isValidationEnabled, - ) - } - - var runtime = runtimeFactory() - - input.onEach { input -> - threadChecker.check("Input arrived on a wrong thread.") - if (!runtime.isKeyValid(input)) { - runtime.terminate() - runtime = runtimeFactory() - } - runtime.onInput(input) - }.launchIn(this) + input.onEach { input -> runtime.onInput(input) }.launchIn(this) awaitClose { - threadChecker.check("Need to unsubscribe on the main thread.") runtime.terminate() } }.distinctUntilChanged() diff --git a/formula-rxjava3/src/main/java/com/instacart/formula/rxjava3/RxJavaRuntime.kt b/formula-rxjava3/src/main/java/com/instacart/formula/rxjava3/RxJavaRuntime.kt index 09101a17..ded6770d 100644 --- a/formula-rxjava3/src/main/java/com/instacart/formula/rxjava3/RxJavaRuntime.kt +++ b/formula-rxjava3/src/main/java/com/instacart/formula/rxjava3/RxJavaRuntime.kt @@ -17,42 +17,24 @@ object RxJavaRuntime { isValidationEnabled: Boolean = false, ): Observable { val threadChecker = ThreadChecker(formula) - return Observable.create { emitter -> - val mergedInspector = FormulaPlugins.inspector( - type = formula.type(), - local = inspector, - ) - val runtimeFactory = { - FormulaRuntime( - threadChecker = threadChecker, - formula = formula, - onOutput = emitter::onNext, - onError = emitter::onError, - inspector = mergedInspector, - isValidationEnabled = isValidationEnabled, - ) - } - + return Observable.create { emitter -> threadChecker.check("Need to subscribe on main thread.") - var runtime = runtimeFactory() + val runtime = FormulaRuntime( + threadChecker = threadChecker, + formula = formula, + onOutput = emitter::onNext, + onError = emitter::onError, + inspector = inspector, + isValidationEnabled = isValidationEnabled, + ) val disposables = CompositeDisposable() disposables.add(input.subscribe({ input -> - threadChecker.check("Input arrived on a wrong thread.") - if (!runtime.isKeyValid(input)) { - runtime.terminate() - runtime = runtimeFactory() - } runtime.onInput(input) }, emitter::onError)) - val runnable = Runnable { - threadChecker.check("Need to unsubscribe on the main thread.") - runtime.terminate() - } - disposables.add(FormulaDisposableHelper.fromRunnable(runnable)) - + disposables.add(FormulaDisposableHelper.fromRunnable(runtime::terminate)) emitter.setDisposable(disposables) }.distinctUntilChanged() } diff --git a/formula/src/main/java/com/instacart/formula/FormulaRuntime.kt b/formula/src/main/java/com/instacart/formula/FormulaRuntime.kt index 73b47bd6..c07235cd 100644 --- a/formula/src/main/java/com/instacart/formula/FormulaRuntime.kt +++ b/formula/src/main/java/com/instacart/formula/FormulaRuntime.kt @@ -15,10 +15,15 @@ class FormulaRuntime( private val onOutput: (Output) -> Unit, private val onError: (Throwable) -> Unit, private val isValidationEnabled: Boolean = false, - private val inspector: Inspector? = null, + inspector: Inspector? = null, ) : ManagerDelegate { private val implementation = formula.implementation() private var manager: FormulaManagerImpl? = null + private val inspector = FormulaPlugins.inspector( + type = formula.type(), + local = inspector, + ) + private var emitOutput = false private var lastOutput: Output? = null @@ -49,33 +54,62 @@ class FormulaRuntime( */ private var isExecutingEffects: Boolean = false - fun isKeyValid(input: Input): Boolean { + /** + * This is a global termination flag that indicates that upstream has disposed of the + * this [FormulaRuntime] instance. We will not accept any more [onInput] changes and will + * not emit any new [Output] events. + */ + private var isRuntimeTerminated: Boolean = false + + private fun isKeyValid(input: Input): Boolean { return this.input == null || key == formula.key(input) } fun onInput(input: Input) { - val initialization = this.input == null + threadChecker.check("Input arrived on a wrong thread.") + + if (isRuntimeTerminated) return + + val isKeyValid = isKeyValid(input) + this.input = input this.key = formula.key(input) - if (initialization) { - manager = FormulaManagerImpl( - delegate = this, - formula = implementation, - initialInput = input, - loggingType = formula::class, - inspector = inspector, - ) - run() + val current = manager + if (current == null) { + // First input arrived, need to start a formula manager + startNewManager(input) + } else if (!isKeyValid) { + // Formula key changed, need to reset the formula state. We mark old manager as + // terminated, will perform termination effects and then start a new manager. + current.markAsTerminated() + + // Input changed, increment the id + inputId += 1 + + if (isRunning) { + // Since we are already running, we let that function to take over. + // No need to do anything more here + } else { + // Let's first execute side-effects + current.performTerminationSideEffects() - emitOutputIfNeeded() + // Start new manager + startNewManager(input) + } } else { + // Input changed, need to re-run inputId += 1 run() } } fun terminate() { + threadChecker.check("Need to unsubscribe on the main thread.") + + if (isRuntimeTerminated) return + isRuntimeTerminated = true + manager?.apply { markAsTerminated() @@ -87,7 +121,7 @@ class FormulaRuntime( * This way, we let runFormula() exit out before we terminate everything. */ if (!isRunning) { - performTermination() + terminateManager(this) } } } @@ -133,7 +167,18 @@ class FormulaRuntime( */ if (manager.isTerminated()) { shouldRun = false - performTermination() + terminateManager(manager) + + // If runtime has been terminated, we are stopping and do + // not need to do anything else. + if (!isRuntimeTerminated) { + // Terminated manager with input change indicates that formula + // key changed and we are resetting formula state. We need to + // start a new formula manager. + if (localInputId != inputId) { + input?.let(this::startNewManager) + } + } } else { shouldRun = localInputId != inputId } @@ -154,8 +199,7 @@ class FormulaRuntime( manager?.markAsTerminated() onError(e) - - performTermination() + manager?.let(this::terminateManager) } } @@ -201,25 +245,38 @@ class FormulaRuntime( * Emits output to the formula subscriber. */ private fun emitOutputIfNeeded() { - if (emitOutput) { + if (emitOutput && !isRuntimeTerminated) { emitOutput = false onOutput(checkNotNull(lastOutput)) } -// if (isInitialRun) { -// lastOutput?.let(onOutput) -// } else if (hasInitialFinished && emitOutput) { -// emitOutput = false -// onOutput(checkNotNull(lastOutput)) -// } + } + + /** + * Creates a new formula manager and runs it. + */ + private fun startNewManager(initialInput: Input) { + manager = initManager(initialInput) + run() } /** * Performs formula termination effects and executes transition effects if needed. */ - private fun performTermination() { - manager?.performTerminationSideEffects() + private fun terminateManager(manager: FormulaManager) { + manager.performTerminationSideEffects() if (!isExecutingEffects) { executeTransitionEffects() } } + + + private fun initManager(initialInput: Input): FormulaManagerImpl { + return FormulaManagerImpl( + delegate = this, + formula = implementation, + initialInput = initialInput, + loggingType = formula::class, + inspector = inspector, + ) + } }