Skip to content

Commit

Permalink
fix(datastore): fix observe may receive duplicate events in Android (#…
Browse files Browse the repository at this point in the history
…1339)

* fix(datastore): fix observe may receive duplicate events in Android

* Use AtomicBoolean
* Resolve comments
  • Loading branch information
HuiSF authored Feb 21, 2022
1 parent 4648ff9 commit be34ed8
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import io.flutter.plugin.common.MethodChannel.Result
import java.util.Locale
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import kotlin.collections.HashMap

/** AmplifyDataStorePlugin */
Expand All @@ -69,6 +70,7 @@ class AmplifyDataStorePlugin : FlutterPlugin, MethodCallHandler {
private val dataStoreHubEventStreamHandler: DataStoreHubEventStreamHandler
private val uiThreadHandler = Handler(Looper.getMainLooper())
private val LOG = Amplify.Logging.forNamespace("amplify:flutter:datastore")
private var isSettingUpObserve = AtomicBoolean();

val modelProvider = FlutterModelProvider.instance

Expand Down Expand Up @@ -403,17 +405,18 @@ class AmplifyDataStorePlugin : FlutterPlugin, MethodCallHandler {
}

fun onSetUpObserve(flutterResult: Result) {
if (this::observeCancelable.isInitialized) {
if (this::observeCancelable.isInitialized || isSettingUpObserve.getAndSet(true)) {
flutterResult.success(true)
return
}

val plugin = Amplify.DataStore.getPlugin("awsDataStorePlugin") as AWSDataStorePlugin

plugin.observe(
{ cancelable ->
LOG.info("Established a new stream form flutter $cancelable")
observeCancelable = cancelable
isSettingUpObserve.set(false);
flutterResult.success(true)
},
{ event ->
LOG.debug("Received event: $event")
Expand All @@ -427,15 +430,19 @@ class AmplifyDataStorePlugin : FlutterPlugin, MethodCallHandler {
}
},
{ failure: DataStoreException ->
LOG.error("Received an error", failure)
dataStoreObserveEventStreamHandler.error(
"DataStoreException",
createSerializedError(failure)
)
if (failure.message?.contains("Failed to start DataStore", true) == true) {
isSettingUpObserve.set(false);
flutterResult.success(false)
} else {
LOG.error("Received an error", failure)
dataStoreObserveEventStreamHandler.error(
"DataStoreException",
createSerializedError(failure)
)
}
},
{ LOG.info("Observation complete.") }
)
flutterResult.success(true)
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class AmplifyDataStorePluginTest {
mock(DataStoreHubEventStreamHandler::class.java)
private val dataStoreException =
DataStoreException("Some useful exception message", "Some useful recovery message")
private val dataStoreObserveStartFailure = DataStoreException("Failed to start DataStore.", "Retry")
private val mockModelSchemas = mutableListOf(
mapOf(
"name" to "Post",
Expand Down Expand Up @@ -578,6 +579,54 @@ class AmplifyDataStorePluginTest {
)
}

@Test
fun test_observe_set_up_success() {
flutterPlugin = AmplifyDataStorePlugin(
eventHandler = mockStreamHandler,
hubEventHandler = mockHubHandler
)

doAnswer { invocation: InvocationOnMock ->
(invocation.arguments[0] as Consumer<Cancelable>).accept(
Cancelable { }
)
null
}.`when`(mockAmplifyDataStorePlugin).observe(
any<Consumer<Cancelable>>(),
any<Consumer<DataStoreItemChange<out Model>>>(),
any<Consumer<DataStoreException>>(),
any<Action>()
)

flutterPlugin.onSetUpObserve(mockResult)

verify(mockResult, times(1)).success(true)
}

@Test
fun test_observe_set_up_failure() {
flutterPlugin = AmplifyDataStorePlugin(
eventHandler = mockStreamHandler,
hubEventHandler = mockHubHandler
)

doAnswer { invocation: InvocationOnMock ->
(invocation.arguments[2] as Consumer<DataStoreException>).accept(
dataStoreObserveStartFailure
)
null
}.`when`(mockAmplifyDataStorePlugin).observe(
any<Consumer<Cancelable>>(),
any<Consumer<DataStoreItemChange<out Model>>>(),
any<Consumer<DataStoreException>>(),
any<Action>()
)

flutterPlugin.onSetUpObserve(mockResult)

verify(mockResult, times(1)).success(false)
}

@Test
fun test_observe_success_event() {
flutterPlugin = AmplifyDataStorePlugin(
Expand Down Expand Up @@ -621,12 +670,11 @@ class AmplifyDataStorePluginTest {

flutterPlugin.onSetUpObserve(mockResult)

verify(mockResult, times(1)).success(true)
verify(mockStreamHandler, times(1)).sendEvent(eventData)
}

@Test
fun test_observe_error_event() {
fun test_observe_receive_error_event() {
flutterPlugin = AmplifyDataStorePlugin(
eventHandler = mockStreamHandler,
hubEventHandler = mockHubHandler
Expand All @@ -646,7 +694,10 @@ class AmplifyDataStorePluginTest {

flutterPlugin.onSetUpObserve(mockResult)

verify(mockResult, times(1)).success(true)
// when the observe consume receive normal error events, `flutterResult` won't be invoked
verify(mockResult, times(0)).success(true)
verify(mockResult, times(0)).success(false)

verify(mockStreamHandler, times(1)).error(
"DataStoreException",
mapOf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ class DataStorePluginUnitTests: XCTestCase {
pluginUnderTest = SwiftAmplifyDataStorePlugin(bridge: dataStoreBridge, modelSchemaRegistry: modelSchemaRegistry, customTypeSchemasRegistry: customTypeSchemaRegistry, dataStoreObserveEventStreamHandler: streamHandler)

pluginUnderTest.onSetUpObserve(flutterResult: { result in
XCTAssertNil(result)
XCTAssertTrue(result as! Bool)
})

dataStoreBridge.mockPublisher.send(MutationEvent(
Expand Down Expand Up @@ -272,7 +272,7 @@ class DataStorePluginUnitTests: XCTestCase {
pluginUnderTest = SwiftAmplifyDataStorePlugin(bridge: dataStoreBridge, modelSchemaRegistry: modelSchemaRegistry, customTypeSchemasRegistry: customTypeSchemaRegistry, dataStoreObserveEventStreamHandler: streamHandler)

pluginUnderTest.onSetUpObserve(flutterResult: { result in
XCTAssertNil(result)
XCTAssertTrue(result as! Bool)
})

dataStoreBridge.mockPublisher.send(MutationEvent(
Expand Down Expand Up @@ -311,7 +311,7 @@ class DataStorePluginUnitTests: XCTestCase {
pluginUnderTest = SwiftAmplifyDataStorePlugin(bridge: dataStoreBridge, modelSchemaRegistry: modelSchemaRegistry, customTypeSchemasRegistry: customTypeSchemaRegistry, dataStoreObserveEventStreamHandler: streamHandler)

pluginUnderTest.onSetUpObserve(flutterResult: { result in
XCTAssertNil(result)
XCTAssertTrue(result as! Bool)
})

dataStoreBridge.mockPublisher.send(completion:
Expand All @@ -323,6 +323,26 @@ class DataStorePluginUnitTests: XCTestCase {
wait(for: [eventSentExp!], timeout: 1)
}

func test_observe_set_up_failure() throws {
class MockDataStoreBridge: DataStoreBridge {
override func getPlugin() throws -> AWSDataStorePlugin {
throw DataStoreError.configuration("No plugin has been added for 'awsDataStorePlugin'.",
"Either add a plugin for 'awsDataStorePlugin', or use one of the known keys: awsDataStorePlugin")
}
}

class MockStreamHandler: DataStoreObserveEventStreamHandler {}

let dataStoreBridge: MockDataStoreBridge = MockDataStoreBridge()
let streamHandler: MockStreamHandler = MockStreamHandler()

pluginUnderTest = SwiftAmplifyDataStorePlugin(bridge: dataStoreBridge, modelSchemaRegistry: modelSchemaRegistry, customTypeSchemasRegistry: customTypeSchemaRegistry, dataStoreObserveEventStreamHandler: streamHandler)

pluginUnderTest.onSetUpObserve(flutterResult: { result in
XCTAssertFalse(result as! Bool)
})
}

func test_clear_success() throws {

class MockDataStoreBridge: DataStoreBridge {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,11 +396,11 @@ public class SwiftAmplifyDataStorePlugin: NSObject, FlutterPlugin {
// TODO communicate using datastore error handler?
}
}
flutterResult(true)
} catch {
print("Failed to get the datastore plugin \(error)")
flutterResult(false)
}
flutterResult(nil)
}

func onClear(flutterResult: @escaping FlutterResult) {
Expand Down

0 comments on commit be34ed8

Please sign in to comment.