From 402462c7e446805027dfde26fda1e76eaec35d56 Mon Sep 17 00:00:00 2001 From: JoonWon Choi Date: Thu, 28 Mar 2024 15:47:24 -0700 Subject: [PATCH] Reconcile batch process optimization bug #2727 --- .../datastore/syncengine/Merger.kt | 8 ++- .../datastore/syncengine/MergerTest.kt | 56 +++++++++++++++++++ 2 files changed, 63 insertions(+), 1 deletion(-) diff --git a/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/Merger.kt b/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/Merger.kt index c1e9e57cf..fd117854b 100644 --- a/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/Merger.kt +++ b/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/Merger.kt @@ -66,8 +66,14 @@ internal class Merger( return@rxCompletable } + // Keep the models with the highest version for each ids, abandon the rest + var modelsWithUniqueId = modelsWithMetadata.groupBy { modelWithMetadata -> modelWithMetadata.model.primaryKeyString } + .map {group -> + group.value.maxBy { dupModels -> dupModels.syncMetadata.version ?: 0 } + }.toList() + // create (key, model metadata) map for quick lookup to re-associate - val modelMetadataMap = modelsWithMetadata.associateBy { it.syncMetadata.primaryKeyString } + val modelMetadataMap = modelsWithUniqueId.associateBy { it.syncMetadata.primaryKeyString} // Consumer to announce Model merges val mergedConsumer = Consumer> { diff --git a/aws-datastore/src/test/java/com/amplifyframework/datastore/syncengine/MergerTest.kt b/aws-datastore/src/test/java/com/amplifyframework/datastore/syncengine/MergerTest.kt index da6624ec0..a6e55fcfd 100644 --- a/aws-datastore/src/test/java/com/amplifyframework/datastore/syncengine/MergerTest.kt +++ b/aws-datastore/src/test/java/com/amplifyframework/datastore/syncengine/MergerTest.kt @@ -36,6 +36,7 @@ import com.amplifyframework.testmodels.commentsblog.Blog import com.amplifyframework.testmodels.commentsblog.BlogOwner import com.amplifyframework.testutils.random.RandomString import java.util.concurrent.TimeUnit +import java.util.UUID import org.junit.Assert.assertEquals import org.junit.Assert.assertTrue import org.junit.Before @@ -667,6 +668,61 @@ class MergerTest { assertEquals(expectedStorageItemChanges, capturedStorageItemChanges) } + /** + * THIS IS NOT FINAL + * When several models with the same identifier, but different versions with different flag for deleted, + * Only the model with the latest version should be considered as it represents the latest known state of the model + * @throws + * @throws + */ + @Test + @Throws(DataStoreException::class, InterruptedException::class) + fun temporaryMergerTestReconcilation() { + + var capturedStorageItemChanges = mutableListOf() + val changeTypeConsumer = Consumer { + capturedStorageItemChanges.add(it) + } + + // Random UUID following RFC 4122 version 4 spec + val sameRandomId = UUID.randomUUID().toString() + + val remotemodels = listOf( + ModelWithMetadata( + Blog.builder().name("Hideo K.").id("DS1").build(), + ModelMetadata(sameRandomId, false, 1, Temporal.Timestamp.now()) + ), + ModelWithMetadata( + Blog.builder().name("Hideo K.").id("DS1").build(), + ModelMetadata(sameRandomId, true, 3, Temporal.Timestamp.now()) + ), + ModelWithMetadata( + Blog.builder().name("Hideo K.").id("DS1").build(), + ModelMetadata(sameRandomId, true, 2, Temporal.Timestamp.now()) + ), + ModelWithMetadata( + Blog.builder().name("Hideo K.").id("DS2").build(), + ModelMetadata(sameRandomId, false, 11, Temporal.Timestamp.now()) + ), + ModelWithMetadata( + Blog.builder().name("Hideo K.").id("DS2").build(), + ModelMetadata(sameRandomId, true, 8, Temporal.Timestamp.now()) + ), + ModelWithMetadata( + Blog.builder().name("Hideo K.").id("DS2").build(), + ModelMetadata(sameRandomId, true, 17, Temporal.Timestamp.now()) + ), + ModelWithMetadata( + Blog.builder().name("Hideo K.").id("DS2").build(), + ModelMetadata(sameRandomId, false, 34, Temporal.Timestamp.now()) + ) + ) + + val observer = merger.merge(remotemodels, changeTypeConsumer).test() + assertTrue(observer.await(REASONABLE_WAIT_TIME, TimeUnit.MILLISECONDS)) + observer.assertNoErrors().assertComplete() + } + companion object { private val REASONABLE_WAIT_TIME = TimeUnit.SECONDS.toMillis(2) }