Skip to content

Commit

Permalink
Reconcile batch process optimization bug #2727
Browse files Browse the repository at this point in the history
  • Loading branch information
JoonWon Choi committed Mar 28, 2024
1 parent 91ab0b4 commit 402462c
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,14 @@ internal class Merger(
return@rxCompletable
}

// Keep the models with the highest version for each ids, abandon the rest
var modelsWithUniqueId = modelsWithMetadata.groupBy { modelWithMetadata -> modelWithMetadata.model.primaryKeyString }
.map {group ->
group.value.maxBy { dupModels -> dupModels.syncMetadata.version ?: 0 }
}.toList()

// create (key, model metadata) map for quick lookup to re-associate
val modelMetadataMap = modelsWithMetadata.associateBy { it.syncMetadata.primaryKeyString }
val modelMetadataMap = modelsWithUniqueId.associateBy { it.syncMetadata.primaryKeyString}

// Consumer to announce Model merges
val mergedConsumer = Consumer<ModelWithMetadata<T>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import com.amplifyframework.testmodels.commentsblog.Blog
import com.amplifyframework.testmodels.commentsblog.BlogOwner
import com.amplifyframework.testutils.random.RandomString
import java.util.concurrent.TimeUnit
import java.util.UUID
import org.junit.Assert.assertEquals
import org.junit.Assert.assertTrue
import org.junit.Before
Expand Down Expand Up @@ -667,6 +668,61 @@ class MergerTest {
assertEquals(expectedStorageItemChanges, capturedStorageItemChanges)
}

/**
* THIS IS NOT FINAL
* When several models with the same identifier, but different versions with different flag for deleted,
* Only the model with the latest version should be considered as it represents the latest known state of the model
* @throws
* @throws
*/
@Test
@Throws(DataStoreException::class, InterruptedException::class)
fun temporaryMergerTestReconcilation() {

var capturedStorageItemChanges = mutableListOf<StorageItemChange.Type>()
val changeTypeConsumer = Consumer<StorageItemChange.Type> {
capturedStorageItemChanges.add(it)
}

// Random UUID following RFC 4122 version 4 spec
val sameRandomId = UUID.randomUUID().toString()

val remotemodels = listOf(
ModelWithMetadata(
Blog.builder().name("Hideo K.").id("DS1").build(),
ModelMetadata(sameRandomId, false, 1, Temporal.Timestamp.now())
),
ModelWithMetadata(
Blog.builder().name("Hideo K.").id("DS1").build(),
ModelMetadata(sameRandomId, true, 3, Temporal.Timestamp.now())
),
ModelWithMetadata(
Blog.builder().name("Hideo K.").id("DS1").build(),
ModelMetadata(sameRandomId, true, 2, Temporal.Timestamp.now())
),
ModelWithMetadata(
Blog.builder().name("Hideo K.").id("DS2").build(),
ModelMetadata(sameRandomId, false, 11, Temporal.Timestamp.now())
),
ModelWithMetadata(
Blog.builder().name("Hideo K.").id("DS2").build(),
ModelMetadata(sameRandomId, true, 8, Temporal.Timestamp.now())
),
ModelWithMetadata(
Blog.builder().name("Hideo K.").id("DS2").build(),
ModelMetadata(sameRandomId, true, 17, Temporal.Timestamp.now())
),
ModelWithMetadata(
Blog.builder().name("Hideo K.").id("DS2").build(),
ModelMetadata(sameRandomId, false, 34, Temporal.Timestamp.now())
)
)

val observer = merger.merge(remotemodels, changeTypeConsumer).test()
assertTrue(observer.await(REASONABLE_WAIT_TIME, TimeUnit.MILLISECONDS))
observer.assertNoErrors().assertComplete()
}

companion object {
private val REASONABLE_WAIT_TIME = TimeUnit.SECONDS.toMillis(2)
}
Expand Down

0 comments on commit 402462c

Please sign in to comment.