Skip to content

Commit

Permalink
Moving logic into FormulaRuntime.
Browse files Browse the repository at this point in the history
  • Loading branch information
Laimiux committed Jan 22, 2024
1 parent 634c300 commit b27b7a3
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,35 +27,18 @@ object FlowRuntime {
return callbackFlow<Output> {
threadChecker.check("Need to subscribe on main thread.")

val mergedInspector = FormulaPlugins.inspector(
type = formula.type(),
local = inspector,
val runtime = FormulaRuntime(
threadChecker = threadChecker,
formula = formula,
onOutput = this::trySendBlocking,
onError = this::close,
inspector = inspector,
isValidationEnabled = isValidationEnabled,
)

val runtimeFactory = {
FormulaRuntime(
threadChecker = threadChecker,
formula = formula,
onOutput = this::trySendBlocking,
onError = this::close,
inspector = mergedInspector,
isValidationEnabled = isValidationEnabled,
)
}

var runtime = runtimeFactory()

input.onEach { input ->
threadChecker.check("Input arrived on a wrong thread.")
if (!runtime.isKeyValid(input)) {
runtime.terminate()
runtime = runtimeFactory()
}
runtime.onInput(input)
}.launchIn(this)
input.onEach { input -> runtime.onInput(input) }.launchIn(this)

awaitClose {
threadChecker.check("Need to unsubscribe on the main thread.")
runtime.terminate()
}
}.distinctUntilChanged()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,42 +17,24 @@ object RxJavaRuntime {
isValidationEnabled: Boolean = false,
): Observable<Output> {
val threadChecker = ThreadChecker(formula)
return Observable.create<Output> { emitter ->
val mergedInspector = FormulaPlugins.inspector(
type = formula.type(),
local = inspector,
)
val runtimeFactory = {
FormulaRuntime(
threadChecker = threadChecker,
formula = formula,
onOutput = emitter::onNext,
onError = emitter::onError,
inspector = mergedInspector,
isValidationEnabled = isValidationEnabled,
)
}

return Observable.create { emitter ->
threadChecker.check("Need to subscribe on main thread.")

var runtime = runtimeFactory()
val runtime = FormulaRuntime(
threadChecker = threadChecker,
formula = formula,
onOutput = emitter::onNext,
onError = emitter::onError,
inspector = inspector,
isValidationEnabled = isValidationEnabled,
)

val disposables = CompositeDisposable()
disposables.add(input.subscribe({ input ->
threadChecker.check("Input arrived on a wrong thread.")
if (!runtime.isKeyValid(input)) {
runtime.terminate()
runtime = runtimeFactory()
}
runtime.onInput(input)
}, emitter::onError))

val runnable = Runnable {
threadChecker.check("Need to unsubscribe on the main thread.")
runtime.terminate()
}
disposables.add(FormulaDisposableHelper.fromRunnable(runnable))

disposables.add(FormulaDisposableHelper.fromRunnable(runtime::terminate))
emitter.setDisposable(disposables)
}.distinctUntilChanged()
}
Expand Down
109 changes: 83 additions & 26 deletions formula/src/main/java/com/instacart/formula/FormulaRuntime.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,15 @@ class FormulaRuntime<Input : Any, Output : Any>(
private val onOutput: (Output) -> Unit,
private val onError: (Throwable) -> Unit,
private val isValidationEnabled: Boolean = false,
private val inspector: Inspector? = null,
inspector: Inspector? = null,
) : ManagerDelegate {
private val implementation = formula.implementation()
private var manager: FormulaManagerImpl<Input, *, Output>? = null
private val inspector = FormulaPlugins.inspector(
type = formula.type(),
local = inspector,
)

private var emitOutput = false
private var lastOutput: Output? = null

Expand Down Expand Up @@ -49,33 +54,62 @@ class FormulaRuntime<Input : Any, Output : Any>(
*/
private var isExecutingEffects: Boolean = false

fun isKeyValid(input: Input): Boolean {
/**
* This is a global termination flag that indicates that upstream has disposed of the
* this [FormulaRuntime] instance. We will not accept any more [onInput] changes and will
* not emit any new [Output] events.
*/
private var isRuntimeTerminated: Boolean = false

private fun isKeyValid(input: Input): Boolean {
return this.input == null || key == formula.key(input)
}

fun onInput(input: Input) {
val initialization = this.input == null
threadChecker.check("Input arrived on a wrong thread.")

if (isRuntimeTerminated) return

val isKeyValid = isKeyValid(input)

this.input = input
this.key = formula.key(input)

if (initialization) {
manager = FormulaManagerImpl(
delegate = this,
formula = implementation,
initialInput = input,
loggingType = formula::class,
inspector = inspector,
)
run()
val current = manager
if (current == null) {
// First input arrived, need to start a formula manager
startNewManager(input)
} else if (!isKeyValid) {
// Formula key changed, need to reset the formula state. We mark old manager as
// terminated, will perform termination effects and then start a new manager.
current.markAsTerminated()

// Input changed, increment the id
inputId += 1

if (isRunning) {
// Since we are already running, we let that function to take over.
// No need to do anything more here
} else {
// Let's first execute side-effects
current.performTerminationSideEffects()

emitOutputIfNeeded()
// Start new manager
startNewManager(input)
}
} else {
// Input changed, need to re-run
inputId += 1
run()
}
}

fun terminate() {
threadChecker.check("Need to unsubscribe on the main thread.")

if (isRuntimeTerminated) return
isRuntimeTerminated = true

manager?.apply {
markAsTerminated()

Expand All @@ -87,7 +121,7 @@ class FormulaRuntime<Input : Any, Output : Any>(
* This way, we let runFormula() exit out before we terminate everything.
*/
if (!isRunning) {
performTermination()
terminateManager(this)
}
}
}
Expand Down Expand Up @@ -133,7 +167,18 @@ class FormulaRuntime<Input : Any, Output : Any>(
*/
if (manager.isTerminated()) {
shouldRun = false
performTermination()
terminateManager(manager)

// If runtime has been terminated, we are stopping and do
// not need to do anything else.
if (!isRuntimeTerminated) {
// Terminated manager with input change indicates that formula
// key changed and we are resetting formula state. We need to
// start a new formula manager.
if (localInputId != inputId) {
input?.let(this::startNewManager)
}
}
} else {
shouldRun = localInputId != inputId
}
Expand All @@ -154,8 +199,7 @@ class FormulaRuntime<Input : Any, Output : Any>(

manager?.markAsTerminated()
onError(e)

performTermination()
manager?.let(this::terminateManager)
}
}

Expand Down Expand Up @@ -201,25 +245,38 @@ class FormulaRuntime<Input : Any, Output : Any>(
* Emits output to the formula subscriber.
*/
private fun emitOutputIfNeeded() {
if (emitOutput) {
if (emitOutput && !isRuntimeTerminated) {
emitOutput = false
onOutput(checkNotNull(lastOutput))
}
// if (isInitialRun) {
// lastOutput?.let(onOutput)
// } else if (hasInitialFinished && emitOutput) {
// emitOutput = false
// onOutput(checkNotNull(lastOutput))
// }
}

/**
* Creates a new formula manager and runs it.
*/
private fun startNewManager(initialInput: Input) {
manager = initManager(initialInput)
run()
}

/**
* Performs formula termination effects and executes transition effects if needed.
*/
private fun performTermination() {
manager?.performTerminationSideEffects()
private fun terminateManager(manager: FormulaManager<Input, Output>) {
manager.performTerminationSideEffects()
if (!isExecutingEffects) {
executeTransitionEffects()
}
}


private fun initManager(initialInput: Input): FormulaManagerImpl<Input, *, Output> {
return FormulaManagerImpl(
delegate = this,
formula = implementation,
initialInput = initialInput,
loggingType = formula::class,
inspector = inspector,
)
}
}

0 comments on commit b27b7a3

Please sign in to comment.