Skip to content

Commit

Permalink
[Android] Ensure all events touching Android happen on the main threa…
Browse files Browse the repository at this point in the history
…d. (#334)

* [Android] Ensure main thread internally.

* Test AndroidUpdateScheduler.

* Add a bunch of tests.

* Fix issue with updateScheduled being removed only once.

* Make sure to clear pending update.

* Move main thread scheduler to FragmentFlowStore.
  • Loading branch information
Laimiux authored Jan 26, 2024
1 parent 5ea7759 commit 9676c10
Show file tree
Hide file tree
Showing 14 changed files with 345 additions and 29 deletions.
4 changes: 2 additions & 2 deletions formula-android-tests/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ dependencies {
implementation(libs.lifecycle.extensions)
implementation(libs.androidx.test.core.ktx)

testImplementation(libs.junit)
testImplementation(libs.truth)
testImplementation(libs.androidx.test.junit)
testImplementation(libs.androidx.test.rules)
testImplementation(libs.androidx.test.runner)
testImplementation(libs.espresso.core)
testImplementation(libs.junit)
testImplementation(libs.robolectric)
testImplementation(libs.truth)
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.instacart.formula

import android.os.Looper
import androidx.fragment.app.Fragment
import androidx.lifecycle.Lifecycle
import androidx.test.core.app.ActivityScenario
Expand All @@ -9,6 +10,8 @@ import com.google.common.truth.Truth.assertThat
import com.instacart.formula.android.FragmentFlowState
import com.instacart.formula.android.FragmentKey
import com.instacart.formula.android.BackCallback
import com.instacart.formula.android.FragmentEnvironment
import com.instacart.formula.rxjava3.toObservable
import com.instacart.formula.test.TestKey
import com.instacart.formula.test.TestKeyWithId
import com.instacart.formula.test.TestFragmentActivity
Expand All @@ -19,7 +22,11 @@ import org.junit.Rule
import org.junit.Test
import org.junit.rules.RuleChain
import org.junit.runner.RunWith
import org.robolectric.Shadows
import org.robolectric.annotation.LooperMode
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit

@RunWith(AndroidJUnit4::class)
class FragmentFlowRenderViewTest {
Expand All @@ -29,6 +36,7 @@ class FragmentFlowRenderViewTest {
private var lastState: FragmentFlowState? = null
private val stateChangeRelay = PublishRelay.create<Pair<FragmentKey, Any>>()
private var onPreCreated: (TestFragmentActivity) -> Unit = {}
private var updateThreads = linkedSetOf<Thread>()
private val formulaRule = TestFormulaRule(
initFormula = { app ->
FormulaAndroid.init(app) {
Expand All @@ -40,6 +48,8 @@ class FragmentFlowRenderViewTest {
},
onRenderFragmentState = { a, state ->
lastState = state

updateThreads.add(Thread.currentThread())
},
contracts = {
bind(TestFeatureFactory<TestKey> { stateChanges(it) })
Expand All @@ -52,6 +62,7 @@ class FragmentFlowRenderViewTest {
},
cleanUp = {
lastState = null
updateThreads = linkedSetOf()
})

private val activityRule = ActivityScenarioRule(TestFragmentActivity::class.java)
Expand Down Expand Up @@ -199,6 +210,51 @@ class FragmentFlowRenderViewTest {
assertThat(activeContracts()).containsExactly(TestKey(), TestKeyWithId(1)).inOrder()
}

@Test fun `background feature events are moved to the main thread`() {

val executor = Executors.newSingleThreadExecutor()
val latch = CountDownLatch(1)

val initial = TestKey()
val keyWithId = TestKeyWithId(1)

navigateToTaskDetail(1)
// Both contracts should be active.
assertThat(activeContracts()).containsExactly(TestKey(), TestKeyWithId(1)).inOrder()

// Pass feature updates on a background thread
executor.execute {
stateChangeRelay.accept(initial to "main-state-1")
stateChangeRelay.accept(initial to "main-state-2")
stateChangeRelay.accept(initial to "main-state-3")

stateChangeRelay.accept(keyWithId to "detail-state-1")
stateChangeRelay.accept(keyWithId to "detail-state-2")
stateChangeRelay.accept(keyWithId to "detail-state-3")
latch.countDown()
}

// Wait for background execution to finish
if(!latch.await(100, TimeUnit.MILLISECONDS)) {
throw IllegalStateException("timeout")
}

Shadows.shadowOf(Looper.getMainLooper()).idle()

val currentState = lastState?.states.orEmpty()
.mapKeys { it.key.key }
.mapValues { it.value.renderModel }

val expected = mapOf(
TestKey() to "main-state-3",
TestKeyWithId(1) to "detail-state-3"
)

assertThat(currentState).isEqualTo(expected)
assertThat(updateThreads).hasSize(1)
assertThat(updateThreads).containsExactly(Thread.currentThread())
}

private fun navigateBack() {
scenario.onActivity { it.onBackPressed() }
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package com.instacart.formula.android.internal

import android.os.Looper
import androidx.test.ext.junit.runners.AndroidJUnit4
import com.google.common.truth.Truth
import com.google.common.truth.Truth.assertThat
import org.junit.Test
import org.junit.runner.RunWith
import org.robolectric.Shadows.shadowOf
import java.util.LinkedList
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit

@RunWith(AndroidJUnit4::class)
class AndroidUpdateSchedulerTest {

@Test fun `when an update triggers another update, scheduler finishes first one before proceeding to the next`() {
val computedValues = LinkedList<String>()
val scheduler = AndroidUpdateScheduler<() -> String> { valueComputation ->
val value = valueComputation()
computedValues.addLast(value)
}

scheduler.emitUpdate {
scheduler.emitUpdate { "next" }
"first"
}

assertThat(computedValues).containsExactly("first", "next").inOrder()
}

@Test fun `when update arrives on bg thread, handle it on main thread`() {
val computedValues = LinkedList<String>()
val scheduler = AndroidUpdateScheduler<() -> String> { valueComputation ->
val value = valueComputation()
computedValues.addLast(value)
}

val latch = CountDownLatch(1)
Executors.newSingleThreadExecutor().execute {
scheduler.emitUpdate { "bg update" }
latch.countDown()
}

if (!latch.await(100, TimeUnit.MILLISECONDS)) {
throw IllegalStateException("timeout")
}

shadowOf(Looper.getMainLooper()).idle()
assertThat(computedValues).containsExactly("bg update").inOrder()
}

@Test fun `when multiple updates arrive on bg thread before main thread is ready, we handle only last`() {
val computedValues = LinkedList<String>()
val scheduler = AndroidUpdateScheduler<() -> String> { valueComputation ->
val value = valueComputation()
computedValues.addLast(value)
}

val latch = CountDownLatch(1)
Executors.newSingleThreadExecutor().execute {
scheduler.emitUpdate { "bg update-1" }
scheduler.emitUpdate { "bg update-2" }
scheduler.emitUpdate { "bg update-3" }
scheduler.emitUpdate { "bg update-4" }
latch.countDown()
}

if (!latch.await(100, TimeUnit.MILLISECONDS)) {
throw IllegalStateException("timeout")
}

shadowOf(Looper.getMainLooper()).idle()
assertThat(computedValues).containsExactly("bg update-4").inOrder()
}
}
7 changes: 5 additions & 2 deletions formula-android/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,15 @@ dependencies {
api(libs.rxandroid)
api(libs.rxrelay)


testImplementation(libs.androidx.test.rules)
testImplementation(libs.androidx.test.runner)
testImplementation(libs.androidx.test.junit)
testImplementation(libs.espresso.core)
testImplementation(libs.truth)
testImplementation(libs.kotlin.reflect)
testImplementation(libs.mockito.core)
testImplementation(libs.mockito.kotlin)
testImplementation(libs.kotlin.reflect)
testImplementation(libs.robolectric)
testImplementation(libs.truth)
}

Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import com.instacart.formula.Formula
import com.instacart.formula.Snapshot
import com.instacart.formula.android.internal.Binding
import com.instacart.formula.android.events.FragmentLifecycleEvent
import com.instacart.formula.android.internal.FeatureObservableAction
import com.instacart.formula.rxjava3.RxAction
import com.instacart.formula.rxjava3.toObservable
import com.jakewharton.rxrelay3.PublishRelay
Expand Down Expand Up @@ -122,14 +123,18 @@ class FragmentFlowStore @PublishedApi internal constructor(
val fragmentId = entry.key
val feature = (entry.value as? FeatureEvent.Init)?.feature
if (feature != null) {
RxAction.fromObservable(feature) {
feature.state.onErrorResumeNext {
input.onScreenError(fragmentId.key, it)
Observable.empty()
val action = FeatureObservableAction(
fragmentEnvironment = input,
fragmentId = fragmentId,
feature = feature,
)
action.onEvent {
if (state.activeIds.contains(fragmentId)) {
val keyState = FragmentState(fragmentId.key, it)
transition(state.copy(states = state.states.plus(fragmentId to keyState)))
} else {
none()
}
}.onEvent {
val keyState = FragmentState(fragmentId.key, it)
transition(state.copy(states = state.states.plus(fragmentId to keyState)))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,18 @@ internal class ActivityManager<Activity : FragmentActivity>(
private val store: ActivityStore<Activity>
) {

private val fragmentState = store
.contracts
.state(environment)
.doOnNext(delegate.fragmentFlowStateRelay::accept)
.replay(1)

internal val stateSubscription: Disposable
private var uiSubscription: Disposable? = null
private var fragmentRenderView: FragmentFlowRenderView? = null

init {
stateSubscription = if (store.streams != null) {
val disposables = CompositeDisposable()
disposables.add(fragmentState.connect())
disposables.add(subscribeToFragmentStateChanges())
disposables.add(store.streams.invoke(StreamConfiguratorIml(delegate)))
disposables
} else {
fragmentState.connect()
subscribeToFragmentStateChanges()
}
}

Expand All @@ -61,9 +55,8 @@ internal class ActivityManager<Activity : FragmentActivity>(
delegate.attachActivity(activity)
delegate.onLifecycleStateChanged(Lifecycle.State.CREATED)
val renderView = fragmentRenderView ?: throw callOnPreCreateException(activity)
uiSubscription = fragmentState.subscribe {
Utils.assertMainThread()

uiSubscription = delegate.fragmentFlowState().subscribe {
renderView.render(it)
store.onRenderFragmentState?.invoke(activity, it)
}
Expand Down Expand Up @@ -113,6 +106,13 @@ internal class ActivityManager<Activity : FragmentActivity>(
return fragmentRenderView?.viewFactory(fragment)
}

private fun subscribeToFragmentStateChanges(): Disposable {
return store
.contracts
.state(environment)
.subscribe(delegate.fragmentFlowStateRelay::accept)
}

private fun callOnPreCreateException(activity: FragmentActivity): IllegalStateException {
return IllegalStateException("please call onPreCreate before calling Activity.super.onCreate(): ${activity::class.java.simpleName}")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,12 @@ internal class ActivityStoreContextImpl<Activity : FragmentActivity> : ActivityS

override fun send(effect: Activity.() -> Unit) {
// We allow emitting effects only after activity has started
startedActivity()?.effect() ?: run {
// Log missing activity.
if (Utils.isMainThread()) {
startedActivity()?.effect()
} else {
Utils.mainThreadHandler.post {
startedActivity()?.effect()
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package com.instacart.formula.android.internal

import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference

/**
* Handles state update scheduling to the main thread. If update arrives on a background thread,
* it will added it the main thread queue. It will throw away a pending update if a new update
* arrives.
*/
class AndroidUpdateScheduler<Value : Any>(
private val update: (Value) -> Unit,
) {
/**
* If not null, that means that we have an update pending.
*/
private val pendingValue = AtomicReference<Value>()

/**
* Defines if an update is currently scheduled.
*/
private val updateScheduled = AtomicBoolean(false)

/**
* To avoid re-entry, we track if [updateRunnable] is currently handling an update.
*/
private var isUpdating = false

private val updateRunnable = object : Runnable {
override fun run() {
updateScheduled.set(false)

var localPending = pendingValue.getAndSet(null)
while (localPending != null) {
// Handle the update
isUpdating = true
update(localPending)
isUpdating = false

// Check if another update arrived while we were processing.
localPending = pendingValue.getAndSet(null)

if (localPending != null) {
// We will perform the update, let's clear the values.
updateScheduled.set(false)
Utils.mainThreadHandler.removeCallbacks(this)
}
}
}
}

fun emitUpdate(value: Value) {
// Set pending value
pendingValue.set(value)

if (Utils.isMainThread()) {
if (isUpdating) {
// Let's exit and let the [updateRunnable] to pick up the change
return
} else {
// Since we are on main thread, let's force run it
updateRunnable.run()
}
} else {
// If no update is scheduled, schedule one
if (updateScheduled.compareAndSet(false, true)) {
Utils.mainThreadHandler.post(updateRunnable)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.instacart.formula.android.internal

import android.os.SystemClock
import com.instacart.formula.Action
import com.instacart.formula.Evaluation
import com.instacart.formula.Formula
Expand Down
Loading

0 comments on commit 9676c10

Please sign in to comment.