Skip to content

Commit

Permalink
fix(datastore): sync pending mutation events with latest synced metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
5d committed Dec 6, 2023
1 parent f895496 commit 55936a2
Show file tree
Hide file tree
Showing 19 changed files with 190 additions and 624 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ extension Model where Self: Codable {
resolvedEncoder = encoder
} else {
resolvedEncoder = JSONEncoder(dateEncodingStrategy: ModelDateFormatting.encodingStrategy)
resolvedEncoder.outputFormatting = .sortedKeys
}

let data = try resolvedEncoder.encode(self)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ extension ModelValueConverter {
}

static var jsonEncoder: JSONEncoder {
JSONEncoder(dateEncodingStrategy: ModelDateFormatting.encodingStrategy)
let encoder = JSONEncoder(dateEncodingStrategy: ModelDateFormatting.encodingStrategy)
encoder.outputFormatting = .sortedKeys
return encoder
}

/// - Warning: Although this has `public` access, it is intended for internal & codegen use and should not be used
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,6 @@ extension AWSMutationDatabaseAdapter: MutationEventIngester {
func resolveConflictsThenSave(mutationEvent: MutationEvent,
storageAdapter: StorageEngineAdapter,
completionPromise: @escaping Future<MutationEvent, DataStoreError>.Promise) {

// We don't want to query MutationSync<AnyModel> because a) we already have the model, and b) delete mutations
// are submitted *after* the delete has already been applied to the local data store, meaning there is no model
// to query.
var mutationEvent = mutationEvent
do {
// swiftlint:disable:next todo
// TODO: Refactor this so that it's clear that the storage engine is not responsible for setting the version
// perhaps as simple as renaming to `submit(unversionedMutationEvent:)` or similar
let syncMetadata = try storageAdapter.queryMutationSyncMetadata(for: mutationEvent.modelId,
modelName: mutationEvent.modelName)
mutationEvent.version = syncMetadata?.version
} catch {
completionPromise(.failure(DataStoreError(error: error)))
}

MutationEvent.pendingMutationEvents(
forMutationEvent: mutationEvent,
storageAdapter: storageAdapter) { result in
Expand Down Expand Up @@ -201,8 +185,6 @@ extension AWSMutationDatabaseAdapter: MutationEventIngester {
}
resolvedEvent.mutationType = updatedMutationType

resolvedEvent.version = candidate.version

return resolvedEvent
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,9 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior {

let syncMutationToCloudOperation = SyncMutationToCloudOperation(
mutationEvent: mutationEvent,
getLatestSyncMetadata: {
try? self.storageAdapter.queryMutationSyncMetadata(for: mutationEvent.modelId, modelName: mutationEvent.modelName)
},
api: api,
authModeStrategy: authModeStrategy
) { [weak self] result in
Expand Down Expand Up @@ -259,12 +262,7 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior {
return
}
reconciliationQueue.offer([mutationSync], modelName: mutationEvent.modelName)
MutationEvent.reconcilePendingMutationEventsVersion(
sent: mutationEvent,
received: mutationSync,
storageAdapter: storageAdapter) { _ in
self.completeProcessingEvent(mutationEvent, mutationSync: mutationSync)
}
completeProcessingEvent(mutationEvent, mutationSync: mutationSync)
} else {
completeProcessingEvent(mutationEvent)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class ProcessMutationErrorFromCloudOperation: AsynchronousOperation {
private var mutationOperation: AtomicValue<GraphQLOperation<MutationSync<AnyModel>>?>
private weak var api: APICategoryGraphQLBehavior?
private weak var reconciliationQueue: IncomingEventReconciliationQueue?

init(dataStoreConfiguration: DataStoreConfiguration,
mutationEvent: MutationEvent,
api: APICategoryGraphQLBehavior,
Expand Down Expand Up @@ -334,7 +334,7 @@ class ProcessMutationErrorFromCloudOperation: AsynchronousOperation {
finish(result: .failure(dataStoreError))
return
}

reconciliationQueue.offer([graphQLResult], modelName: mutationEvent.modelName)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class SyncMutationToCloudOperation: AsynchronousOperation {
typealias MutationSyncCloudResult = GraphQLOperation<MutationSync<AnyModel>>.OperationResult

private weak var api: APICategoryGraphQLBehavior?
private let getLatestSyncMetadata: () -> MutationSyncMetadata?
private let mutationEvent: MutationEvent
private let completion: GraphQLOperation<MutationSync<AnyModel>>.ResultListener
private let requestRetryablePolicy: RequestRetryablePolicy
Expand All @@ -32,13 +33,15 @@ class SyncMutationToCloudOperation: AsynchronousOperation {
private var authTypesIterator: AWSAuthorizationTypeIterator?

init(mutationEvent: MutationEvent,
getLatestSyncMetadata: @escaping () -> MutationSyncMetadata?,
api: APICategoryGraphQLBehavior,
authModeStrategy: AuthModeStrategy,
networkReachabilityPublisher: AnyPublisher<ReachabilityUpdate, Never>? = nil,
currentAttemptNumber: Int = 1,
requestRetryablePolicy: RequestRetryablePolicy? = RequestRetryablePolicy(),
completion: @escaping GraphQLOperation<MutationSync<AnyModel>>.ResultListener) {
self.mutationEvent = mutationEvent
self.getLatestSyncMetadata = getLatestSyncMetadata
self.api = api
self.networkReachabilityPublisher = networkReachabilityPublisher
self.completion = completion
Expand Down Expand Up @@ -109,7 +112,7 @@ class SyncMutationToCloudOperation: AsynchronousOperation {
authType: AWSAuthorizationType? = nil
) -> GraphQLRequest<MutationSync<AnyModel>>? {
var request: GraphQLRequest<MutationSync<AnyModel>>

let version = getLatestSyncMetadata()?.version
do {
var graphQLFilter: GraphQLFilter?
if let graphQLFilterJSON = mutationEvent.graphQLFilterJSON {
Expand All @@ -128,7 +131,7 @@ class SyncMutationToCloudOperation: AsynchronousOperation {
request = GraphQLRequest<MutationSyncResult>.deleteMutation(of: model,
modelSchema: modelSchema,
where: graphQLFilter,
version: mutationEvent.version)
version: version)
case .update:
let model = try mutationEvent.decodeModel()
guard let modelSchema = ModelRegistry.modelSchema(from: mutationEvent.modelName) else {
Expand All @@ -140,7 +143,7 @@ class SyncMutationToCloudOperation: AsynchronousOperation {
request = GraphQLRequest<MutationSyncResult>.updateMutation(of: model,
modelSchema: modelSchema,
where: graphQLFilter,
version: mutationEvent.version)
version: version)
case .create:
let model = try mutationEvent.decodeModel()
guard let modelSchema = ModelRegistry.modelSchema(from: mutationEvent.modelName) else {
Expand All @@ -151,7 +154,7 @@ class SyncMutationToCloudOperation: AsynchronousOperation {
}
request = GraphQLRequest<MutationSyncResult>.createMutation(of: model,
modelSchema: modelSchema,
version: mutationEvent.version)
version: version)
}
} catch {
let apiError = APIError.unknown("Couldn't decode model", "", error)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,14 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {

// MARK: - Responder methods

/// The reconcile function incorporates incoming mutation events into the local database through the following steps:
/// 1. Retrieve the local metadata of the models.
/// 2. Generate dispositions based on incoming mutation events and local metadata.
/// 3. Categorize dispositions into:
/// 3.1 Apply metadata only for those with existing pending mutations.
/// 3.1.1 Notify the count of these incoming mutation events as dropped items.
/// 3.2 Apply incoming mutation and metadata for those without existing pending mutations.
/// 4. Notify the final result.
func reconcile(remoteModels: [RemoteModel]) {
guard !isCancelled else {
log.info("\(#function) - cancelled, aborting")
Expand All @@ -126,16 +134,24 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {

do {
try storageAdapter.transaction {
queryPendingMutations(forModels: remoteModels.map(\.model))
queryLocalMetadata(remoteModels)
.subscribe(on: workQueue)
.flatMap { mutationEvents -> Future<([RemoteModel], [LocalMetadata]), DataStoreError> in
let remoteModelsToApply = self.reconcile(remoteModels, pendingMutations: mutationEvents)
return self.queryLocalMetadata(remoteModelsToApply)
.map { remoteModels, localMetadatas in
self.getDispositions(for: remoteModels, localMetadatas: localMetadatas)
}
.flatMap { (remoteModelsToApply, localMetadatas) -> Future<Void, DataStoreError> in
let dispositions = self.getDispositions(for: remoteModelsToApply,
localMetadatas: localMetadatas)
return self.applyRemoteModelsDispositions(dispositions)
.flatMap { dispositions in
self.queryPendingMutations(forModels: dispositions.map(\.remoteModel.model)).map { pendingMutations in
(pendingMutations, dispositions)
}
}
.map { pendingMutations, dispositions in
self.separateDispositions(pendingMutations: pendingMutations, dispositions: dispositions)
}
.flatMap { dispositions, dispositionsOnlyApplyMetadata in
self.waitAllPublisherFinishes(publishers: dispositionsOnlyApplyMetadata.map(self.saveMetadata(disposition:)))
.flatMap { _ in
self.applyRemoteModelsDispositions(dispositions)
}
}
.sink(
receiveCompletion: {
Expand Down Expand Up @@ -195,6 +211,28 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {
}
}

func separateDispositions(
pendingMutations: [MutationEvent],
dispositions: [RemoteSyncReconciler.Disposition]
) -> ([RemoteSyncReconciler.Disposition], [RemoteSyncReconciler.Disposition]) {
if dispositions.isEmpty {
return ([], [])
}

let pendingMutationModelIds = Set(pendingMutations.map(\.modelId))

let dispositionsToApply = dispositions.filter {
!pendingMutationModelIds.contains($0.remoteModel.model.identifier)
}

let dispositionsOnlyApplyMetadata = dispositions.filter {
pendingMutationModelIds.contains($0.remoteModel.model.identifier)
}

notifyDropped(count: dispositionsOnlyApplyMetadata.count)
return (dispositionsToApply, dispositionsOnlyApplyMetadata)
}

func reconcile(_ remoteModels: [RemoteModel], pendingMutations: [MutationEvent]) -> [RemoteModel] {
guard !remoteModels.isEmpty else {
return []
Expand Down Expand Up @@ -284,8 +322,7 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {
}

let publishers = dispositions.map { disposition ->
Publishers.FlatMap<Future<Void, DataStoreError>,
Future<ReconcileAndLocalSaveOperation.ApplyRemoteModelResult, DataStoreError>> in
AnyPublisher<Void, DataStoreError> in

switch disposition {
case .create(let remoteModel):
Expand All @@ -296,7 +333,8 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {
applyResult: applyResult,
mutationType: .create)
}
return publisher

return publisher.eraseToAnyPublisher()
case .update(let remoteModel):
let publisher = self.save(storageAdapter: storageAdapter,
remoteModel: remoteModel)
Expand All @@ -305,7 +343,7 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {
applyResult: applyResult,
mutationType: .update)
}
return publisher
return publisher.eraseToAnyPublisher()
case .delete(let remoteModel):
let publisher = self.delete(storageAdapter: storageAdapter,
remoteModel: remoteModel)
Expand All @@ -314,7 +352,7 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {
applyResult: applyResult,
mutationType: .delete)
}
return publisher
return publisher.eraseToAnyPublisher()
}
}

Expand Down Expand Up @@ -367,8 +405,10 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {
}
}

private func save(storageAdapter: StorageEngineAdapter,
remoteModel: RemoteModel) -> Future<ApplyRemoteModelResult, DataStoreError> {
private func save(
storageAdapter: StorageEngineAdapter,
remoteModel: RemoteModel
) -> Future<ApplyRemoteModelResult, DataStoreError> {
Future<ApplyRemoteModelResult, DataStoreError> { promise in
storageAdapter.save(untypedModel: remoteModel.model.instance) { response in
switch response {
Expand Down Expand Up @@ -396,29 +436,69 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {
}
}

private func saveMetadata(storageAdapter: StorageEngineAdapter,
applyResult: ApplyRemoteModelResult,
mutationType: MutationEvent.MutationType) -> Future<Void, DataStoreError> {
Future<Void, DataStoreError> { promise in
guard case let .applied(inProcessModel) = applyResult else {
promise(.successfulVoid)
return
}
private func saveMetadata(
disposition: RemoteSyncReconciler.Disposition
) -> AnyPublisher<Void, Never> {
guard let storageAdapter = self.storageAdapter else {
return Just(()).eraseToAnyPublisher()
}

storageAdapter.save(inProcessModel.syncMetadata, condition: nil) { result in
switch result {
case .failure(let dataStoreError):
self.notifyDropped(error: dataStoreError)
promise(.failure(dataStoreError))
case .success(let syncMetadata):
return saveMetadata(
storageAdapter: storageAdapter,
remoteModel: disposition.remoteModel,
mutationType: disposition.mutationType
)
.map { _ in () }
.catch { _ in Just(()) }
.eraseToAnyPublisher()
}

private func saveMetadata(
storageAdapter: StorageEngineAdapter,
remoteModel: RemoteModel,
mutationType: MutationEvent.MutationType
) -> Future<MutationSyncMetadata, DataStoreError> {
Future { promise in
storageAdapter.save(
remoteModel.syncMetadata,
condition: nil) { result in
promise(result)
}
}
}

private func saveMetadata(
storageAdapter: StorageEngineAdapter,
applyResult: ApplyRemoteModelResult,
mutationType: MutationEvent.MutationType
) -> AnyPublisher<Void, DataStoreError> {
if case .applied(let inProcessModel) = applyResult {
return self.saveMetadata(storageAdapter: storageAdapter, remoteModel: inProcessModel, mutationType: mutationType)
.handleEvents(receiveOutput: { syncMetadata in
let appliedModel = MutationSync(model: inProcessModel.model, syncMetadata: syncMetadata)
self.notify(savedModel: appliedModel, mutationType: mutationType)
promise(.successfulVoid)
}
}
}, receiveCompletion: { completion in
if case .failure(let error) = completion {
self.notifyDropped(error: error)
}
})
.map { _ in () }
.eraseToAnyPublisher()
}
return Just(()).setFailureType(to: DataStoreError.self).eraseToAnyPublisher()
}

private func waitAllPublisherFinishes<T>(publishers: [AnyPublisher<T, Never>]) -> Future<Void, DataStoreError> {
Future { promise in
Publishers.MergeMany(publishers)
.collect()
.sink(receiveCompletion: { _ in
promise(.successfulVoid)
}, receiveValue: { _ in })
.store(in: &self.cancellables)
}
}

private func notifyDropped(count: Int = 1, error: DataStoreError? = nil) {
for _ in 0 ..< count {
mutationEventPublisher.send(.mutationEventDropped(modelName: modelSchema.name, error: error))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,21 @@ struct RemoteSyncReconciler {
case create(RemoteModel)
case update(RemoteModel)
case delete(RemoteModel)

var remoteModel: RemoteModel {
switch self {
case .create(let model), .update(let model), .delete(let model):
return model
}
}

var mutationType: MutationEvent.MutationType {
switch self {
case .create: return .create
case .update: return .update
case .delete: return .delete
}
}
}

/// Filter the incoming `remoteModels` against the pending mutations.
Expand Down
Loading

0 comments on commit 55936a2

Please sign in to comment.