Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move key changes into FormulaRuntime. #333

Merged
merged 2 commits into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.instacart.formula.coroutines

import com.instacart.formula.FormulaPlugins
import com.instacart.formula.FormulaRuntime
import com.instacart.formula.IFormula
import com.instacart.formula.Inspector
Expand All @@ -27,35 +26,18 @@ object FlowRuntime {
return callbackFlow<Output> {
threadChecker.check("Need to subscribe on main thread.")

val mergedInspector = FormulaPlugins.inspector(
type = formula.type(),
local = inspector,
val runtime = FormulaRuntime(
threadChecker = threadChecker,
formula = formula,
onOutput = this::trySendBlocking,
onError = this::close,
inspector = inspector,
isValidationEnabled = isValidationEnabled,
)

val runtimeFactory = {
FormulaRuntime(
threadChecker = threadChecker,
formula = formula,
onOutput = this::trySendBlocking,
onError = this::close,
inspector = mergedInspector,
isValidationEnabled = isValidationEnabled,
)
}

var runtime = runtimeFactory()

input.onEach { input ->
threadChecker.check("Input arrived on a wrong thread.")
if (!runtime.isKeyValid(input)) {
runtime.terminate()
runtime = runtimeFactory()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was a bit weird that we recreate the runtime if key changes. Instead, the runtime will handle state reset internally.

}
runtime.onInput(input)
}.launchIn(this)
input.onEach { input -> runtime.onInput(input) }.launchIn(this)

awaitClose {
threadChecker.check("Need to unsubscribe on the main thread.")
runtime.terminate()
}
}.distinctUntilChanged()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,42 +17,24 @@ object RxJavaRuntime {
isValidationEnabled: Boolean = false,
): Observable<Output> {
val threadChecker = ThreadChecker(formula)
return Observable.create<Output> { emitter ->
val mergedInspector = FormulaPlugins.inspector(
type = formula.type(),
local = inspector,
)
val runtimeFactory = {
FormulaRuntime(
threadChecker = threadChecker,
formula = formula,
onOutput = emitter::onNext,
onError = emitter::onError,
inspector = mergedInspector,
isValidationEnabled = isValidationEnabled,
)
}

return Observable.create { emitter ->
threadChecker.check("Need to subscribe on main thread.")

var runtime = runtimeFactory()
val runtime = FormulaRuntime(
threadChecker = threadChecker,
formula = formula,
onOutput = emitter::onNext,
onError = emitter::onError,
inspector = inspector,
isValidationEnabled = isValidationEnabled,
)

val disposables = CompositeDisposable()
disposables.add(input.subscribe({ input ->
threadChecker.check("Input arrived on a wrong thread.")
if (!runtime.isKeyValid(input)) {
runtime.terminate()
runtime = runtimeFactory()
}
runtime.onInput(input)
}, emitter::onError))

val runnable = Runnable {
threadChecker.check("Need to unsubscribe on the main thread.")
runtime.terminate()
}
disposables.add(FormulaDisposableHelper.fromRunnable(runnable))

disposables.add(FormulaDisposableHelper.fromRunnable(runtime::terminate))
emitter.setDisposable(disposables)
}.distinctUntilChanged()
}
Expand Down
110 changes: 84 additions & 26 deletions formula/src/main/java/com/instacart/formula/FormulaRuntime.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@ class FormulaRuntime<Input : Any, Output : Any>(
private val onOutput: (Output) -> Unit,
private val onError: (Throwable) -> Unit,
private val isValidationEnabled: Boolean = false,
private val inspector: Inspector? = null,
inspector: Inspector? = null,
) : ManagerDelegate {
private val implementation = formula.implementation()
private var manager: FormulaManagerImpl<Input, *, Output>? = null
private var hasInitialFinished = false
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not needed anymore - previously it was used to ensure that on formula start, we wait for all actions to start running before emitting a value. Currently, it's done within FormulaManagerImpl.

private val inspector = FormulaPlugins.inspector(
type = formula.type(),
local = inspector,
)

private var emitOutput = false
private var lastOutput: Output? = null

Expand Down Expand Up @@ -50,34 +54,62 @@ class FormulaRuntime<Input : Any, Output : Any>(
*/
private var isExecutingEffects: Boolean = false

fun isKeyValid(input: Input): Boolean {
/**
* This is a global termination flag that indicates that upstream has disposed of the
* this [FormulaRuntime] instance. We will not accept any more [onInput] changes and will
* not emit any new [Output] events.
*/
private var isRuntimeTerminated: Boolean = false

private fun isKeyValid(input: Input): Boolean {
return this.input == null || key == formula.key(input)
}

fun onInput(input: Input) {
val initialization = this.input == null
threadChecker.check("Input arrived on a wrong thread.")

if (isRuntimeTerminated) return

val isKeyValid = isKeyValid(input)

this.input = input
this.key = formula.key(input)

if (initialization) {
manager = FormulaManagerImpl(
delegate = this,
formula = implementation,
initialInput = input,
loggingType = formula::class,
inspector = inspector,
)
run()
val current = manager
if (current == null) {
// First input arrived, need to start a formula manager
startNewManager(input)
} else if (!isKeyValid) {
// Formula key changed, need to reset the formula state. We mark old manager as
// terminated, will perform termination effects and then start a new manager.
current.markAsTerminated()

// Input changed, increment the id
inputId += 1

if (isRunning) {
// Since we are already running, we let that function to take over.
// No need to do anything more here
} else {
// Let's first execute side-effects
current.performTerminationSideEffects()

hasInitialFinished = true
emitOutputIfNeeded(isInitialRun = true)
// Start new manager
startNewManager(input)
}
} else {
// Input changed, need to re-run
inputId += 1
run()
}
}

fun terminate() {
threadChecker.check("Need to unsubscribe on the main thread.")

if (isRuntimeTerminated) return
isRuntimeTerminated = true

manager?.apply {
markAsTerminated()

Expand All @@ -89,7 +121,7 @@ class FormulaRuntime<Input : Any, Output : Any>(
* This way, we let runFormula() exit out before we terminate everything.
*/
if (!isRunning) {
performTermination()
terminateManager(this)
}
}
}
Expand Down Expand Up @@ -135,7 +167,18 @@ class FormulaRuntime<Input : Any, Output : Any>(
*/
if (manager.isTerminated()) {
shouldRun = false
performTermination()
terminateManager(manager)

// If runtime has been terminated, we are stopping and do
// not need to do anything else.
if (!isRuntimeTerminated) {
// Terminated manager with input change indicates that formula
// key changed and we are resetting formula state. We need to
// start a new formula manager.
if (localInputId != inputId) {
input?.let(this::startNewManager)
}
}
} else {
shouldRun = localInputId != inputId
}
Expand All @@ -149,15 +192,14 @@ class FormulaRuntime<Input : Any, Output : Any>(
executeTransitionEffects()

if (!manager.isTerminated()) {
emitOutputIfNeeded(isInitialRun = false)
emitOutputIfNeeded()
}
} catch (e: Throwable) {
isRunning = false

manager?.markAsTerminated()
onError(e)

performTermination()
manager?.let(this::terminateManager)
}
}

Expand Down Expand Up @@ -202,22 +244,38 @@ class FormulaRuntime<Input : Any, Output : Any>(
/**
* Emits output to the formula subscriber.
*/
private fun emitOutputIfNeeded(isInitialRun: Boolean) {
if (isInitialRun) {
lastOutput?.let(onOutput)
} else if (hasInitialFinished && emitOutput) {
private fun emitOutputIfNeeded() {
if (emitOutput && !isRuntimeTerminated) {
emitOutput = false
onOutput(checkNotNull(lastOutput))
}
}

/**
* Creates a new formula manager and runs it.
*/
private fun startNewManager(initialInput: Input) {
manager = initManager(initialInput)
run()
}

/**
* Performs formula termination effects and executes transition effects if needed.
*/
private fun performTermination() {
manager?.performTerminationSideEffects()
private fun terminateManager(manager: FormulaManager<Input, Output>) {
manager.performTerminationSideEffects()
if (!isExecutingEffects) {
executeTransitionEffects()
}
}

private fun initManager(initialInput: Input): FormulaManagerImpl<Input, *, Output> {
return FormulaManagerImpl(
delegate = this,
formula = implementation,
initialInput = initialInput,
loggingType = formula::class,
inspector = inspector,
)
}
}
Loading