Skip to content

Commit

Permalink
[Formula] Explore event batching.
Browse files Browse the repository at this point in the history
  • Loading branch information
Laimiux committed Dec 6, 2023
1 parent d014e18 commit cec1fe0
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 2 deletions.
7 changes: 7 additions & 0 deletions formula/src/main/java/com/instacart/formula/Action.kt
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,13 @@ interface Action<Event> {
* An identifier used to distinguish between different types of actions.
*/
fun key(): Any?

/**
* Defines if an action updates can be batched with other updates. This enables Formula
* to batch multiple updates into a single re-evaluation and emit a single output instead of
* emitting multiple intermediate updates.
*/
fun isBatchable() = false // TODO default
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ abstract class FormulaContext<out Input, State> internal constructor(
internal abstract fun <Event> eventListener(
key: Any,
useIndex: Boolean = true,
isBatchable: Boolean = false,
transition: Transition<Input, State, Event>
): Listener<Event>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,12 @@ internal class ActionBuilderImpl<out Input, State> internal constructor(
transition: Transition<Input, State, Event>,
): DeferredAction<Event> {
val key = snapshot.context.createScopedKey(transition.type(), stream.key())
val listener = snapshot.context.eventListener(key, useIndex = false, transition)
val listener = snapshot.context.eventListener(
key = key,
useIndex = false,
isBatchable = stream.isBatchable(),
transition = transition,
)
return DeferredAction(
key = key,
action = stream,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class DeferredTransition<Input, State, EventT> internal constructor(
private val listener: ListenerImpl<Input, State, EventT>,
private val transition: Transition<Input, State, EventT>,
private val event: EventT,
val isBatchable: Boolean,
) {

fun execute() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,27 @@ internal class FormulaManagerImpl<Input, State, Output>(
// then we'll execute the transition.
transitionQueue.addLast(transition)
} else {
// If we pass this to the root manager it will add it to a queue.
// - When ready, it will process first one
// - First one, triggers state change and asks for evaluation
// - Can we execute more updates before we start an evaluation?
// - If same formula has multiple batched updates, we need to evaluate it
// - If different formulas have batched updates, we likely could trigger all
// before a single evaluation setups actions and picks up new output.
// - Maybe we need to have `canAcceptTransition()` function
// - If my state changed already, I cannot accept new transition
// - If child state changed, can I accept state change myself?
// it is ready to process these updates, it will t
// We likely need to add this locally and also pass it to the parent
//
// TODO: this is the place where we need to batch


//
// If for batchable updates, we add it to the queue and request re-eval?
// that be bad for performance? We could also optimize that via some peek mechanism
// that starts the
// Is update order important?
transition.execute()
}
}
Expand Down Expand Up @@ -295,6 +316,7 @@ internal class FormulaManagerImpl<Input, State, Output>(
return true
}

@Suppress("RedundantIfStatement")
if (!terminated && actionManager.startNew(evaluationId)) {
return true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ internal class ListenerImpl<Input, State, EventT>(internal var key: Any) : Liste

internal var manager: FormulaManagerImpl<Input, State, *>? = null
internal var snapshotImpl: SnapshotImpl<Input, State>? = null
internal var isBatchable: Boolean = false

internal lateinit var transition: Transition<Input, State, EventT>

override fun invoke(event: EventT) {
// TODO: log if null listener (it might be due to formula removal or due to callback removal)
val manager = manager ?: return

val deferredTransition = DeferredTransition(this, transition, event)
val deferredTransition = DeferredTransition(this, transition, event, isBatchable)
manager.onPendingTransition(deferredTransition)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,15 @@ internal class SnapshotImpl<out Input, State> internal constructor(
override fun <Event> eventListener(
key: Any,
useIndex: Boolean,
isBatchable: Boolean,
transition: Transition<Input, State, Event>
): Listener<Event> {
ensureNotRunning()
val listener = listeners.initOrFindListener<Input, State, Event>(key, useIndex)
listener.manager = delegate
listener.snapshotImpl = this
listener.transition = transition
listener.isBatchable = isBatchable
return listener
}

Expand Down

0 comments on commit cec1fe0

Please sign in to comment.