From b02aa8e7b357cc475b41f829df8c88c122b68cd0 Mon Sep 17 00:00:00 2001 From: Klemens Muthmann Date: Mon, 25 Mar 2024 15:43:18 +0100 Subject: [PATCH] [STAD-596] Update Handling of Measurement Finished (#169) * Add new finished event after data storage finished This is necessary, since now when the stopped event is received by the app, the measurement inside the database is not necessarily stopped and thus will not be eligeble for data synchronization. * Remove copied redeclaration of Double equal function. * Add new finished event after CoreData Storage completed * Update Documentation for DefaultUploadProcess * Actually use the finish handler of the CapturedDataStorage No finish event as implemented in the last commit is necessary, since our API has an explicit handler on the CapturedDataStorage, that is called as soon as data storage has finished. This should be used to store everything to the database and carry out clean up code at the end of a measurement. * Update RFR App with correct handling of finishing measurements * Removed obsolete Sessions database from Resources * Remove obsolete finished event from Message * Fix after merge issues --- DataCapturing/Package.swift | 1 - .../DataCapturing/Capturing/Measurement.swift | 29 +- .../Capturing/Model/Message.swift | 36 ++ .../DataCapturing/Model/Altitude.swift | 13 + .../Sources/DataCapturing/Model/Double.swift | 22 + .../Model/FinishedMeasurement.swift | 2 +- .../DataCapturing/Model/GeoLocation.swift | 16 + .../Persistence/CapturedDataStorage.swift | 253 +++++++----- .../Persistence/DataStoreStack.swift | 12 +- .../Persistence/SensorValueFileFactory.swift | 4 - .../Upload/Default/DefaultUploadProcess.swift | 3 + RFR-App/RFR.xcodeproj/project.pbxproj | 12 +- RFR-App/RFR/Live/LiveViewModel.swift | 388 +++++++++--------- RFR-App/RFR/Measurements/Measurement.swift | 5 +- .../Measurements/MeasurementsViewModel.swift | 22 +- .../ViewModel/DataCapturingViewModel.swift | 4 +- .../ViewModel/SynchronizationViewModel.swift | 6 +- 17 files changed, 479 insertions(+), 349 deletions(-) create mode 100644 DataCapturing/Sources/DataCapturing/Model/Double.swift diff --git a/DataCapturing/Package.swift b/DataCapturing/Package.swift index fc3eac18..f360db45 100644 --- a/DataCapturing/Package.swift +++ b/DataCapturing/Package.swift @@ -61,7 +61,6 @@ let package = Package( ], exclude: ["Support/Info.plist"], resources: [ - .process("Synchronization/Upload/Background/SessionRegistry.xcdatamodeld"), .process("Model/Migrations/V3toV4/V3toV4.xcmappingmodel"), .process("Model/CyfaceModel.xcdatamodeld"), .process("Model/Migrations/V10toV11/V10toV11.xcmappingmodel"), diff --git a/DataCapturing/Sources/DataCapturing/Capturing/Measurement.swift b/DataCapturing/Sources/DataCapturing/Capturing/Measurement.swift index 370b3795..71f7c00f 100644 --- a/DataCapturing/Sources/DataCapturing/Capturing/Measurement.swift +++ b/DataCapturing/Sources/DataCapturing/Capturing/Measurement.swift @@ -35,8 +35,9 @@ This protocol defines a measurements data together with its lifecycle during dat - Since: 12.0.0 */ public protocol Measurement { + // TODO: It should not be possible to send messages via this variable. So this should be a publisher instead of a PasstroughSubject /// A combine subject used to receive messages during data capturing and forwarding them, to whoever wants to listen. - var measurementMessages: PassthroughSubject { get } + var measurementMessages: AnyPublisher { get } /// A flag to get information about whether this measurement is currently running (`true`) or not (`false`). var isRunning: Bool { get } /// A flag to get information about whether this measurement is currently paused (`true`) or not (`false`). @@ -75,7 +76,6 @@ public class MeasurementImpl { /// `true` if data capturing was running but is currently paused; `false` otherwise. public var isPaused:Bool - // TODO: This should probably be carried out using an actor: See the talk "Protect mutable state with Swift actors" from WWDC 2021 /// The background queue used to capture data. private let capturingQueue: DispatchQueue @@ -85,15 +85,15 @@ public class MeasurementImpl { private let locationCapturer: LocationCapturer // TODO: Switch to Combine --> Make this a publisher on its own. Will have to read up on how to achieve this. - public var measurementMessages: PassthroughSubject + public var messagesSubject: PassthroughSubject - // TODO: This should probably be carried out using an actor: See the talk "Protect mutable state with Swift actors" from WWDC 2021 /** A queue used to synchronize calls to the lifecycle methods `start`, `pause`, `resume` and `stop`. Using such a queue prevents successiv calls to these methods to interrupt each other. */ private let lifecycleQueue: DispatchQueue private var messageCancellable: AnyCancellable? = nil + private var finishedEventCancellable: AnyCancellable? = nil // MARK: - Initializers @@ -114,7 +114,6 @@ public class MeasurementImpl { manager.activityType = .other manager.showsBackgroundLocationIndicator = true manager.distanceFilter = kCLDistanceFilterNone - //manager.requestAlwaysAuthorization() return manager } ) { @@ -122,7 +121,7 @@ public class MeasurementImpl { self.lifecycleQueue = DispatchQueue(label: "lifecycle") self.sensorCapturer = SensorCapturer(capturingQueue: capturingQueue) self.locationCapturer = LocationCapturer(lifecycleQueue: lifecycleQueue, locationManagerFactory: locationManagerFactory) - measurementMessages = PassthroughSubject() + messagesSubject = PassthroughSubject() self.isRunning = false self.isPaused = false @@ -152,7 +151,7 @@ public class MeasurementImpl { messageCancellable = locationCapturer.start().receive(on: lifecycleQueue).merge( with: sensorCapturer.start() - ).subscribe(measurementMessages) + ).subscribe(messagesSubject) self.isRunning = true } @@ -174,6 +173,10 @@ public class MeasurementImpl { // MARK: - Measurement extension MeasurementImpl: Measurement { + public var measurementMessages: AnyPublisher { + return messagesSubject.eraseToAnyPublisher() + } + /** Starts the capturing process. @@ -194,7 +197,7 @@ Starting data capturing on paused service. Finishing paused measurements and sta } try startCapturing() - measurementMessages.send(.started(timestamp: Date())) + messagesSubject.send(.started(timestamp: Date())) } } @@ -218,8 +221,8 @@ Starting data capturing on paused service. Finishing paused measurements and sta stopCapturing() isPaused = false - measurementMessages.send(.stopped(timestamp: Date())) - measurementMessages.send(completion: .finished) + messagesSubject.send(.stopped(timestamp: Date())) + messagesSubject.send(completion: .finished) } } @@ -242,7 +245,7 @@ Starting data capturing on paused service. Finishing paused measurements and sta stopCapturing() isPaused = true - measurementMessages.send(.paused(timestamp: Date())) + messagesSubject.send(.paused(timestamp: Date())) } } @@ -267,7 +270,7 @@ Starting data capturing on paused service. Finishing paused measurements and sta isPaused = false isRunning = true - measurementMessages.send(.resumed(timestamp: Date())) + messagesSubject.send(.resumed(timestamp: Date())) } } @@ -279,7 +282,7 @@ Starting data capturing on paused service. Finishing paused measurements and sta */ public func changeModality(to modality: String) { lifecycleQueue.sync { - measurementMessages.send(.modalityChanged(to: modality)) + messagesSubject.send(.modalityChanged(to: modality)) } } } diff --git a/DataCapturing/Sources/DataCapturing/Capturing/Model/Message.swift b/DataCapturing/Sources/DataCapturing/Capturing/Model/Message.swift index bb2274f5..7f5426a7 100644 --- a/DataCapturing/Sources/DataCapturing/Capturing/Model/Message.swift +++ b/DataCapturing/Sources/DataCapturing/Capturing/Model/Message.swift @@ -69,6 +69,7 @@ public enum Message: CustomStringConvertible { /// The message sent if a new direction was captured. case capturedDirection(SensorValue) case started(timestamp: Date) + /// Sent after all sensor have stopped capturing data. case stopped(timestamp: Date) case paused(timestamp: Date) case resumed(timestamp: Date) @@ -77,3 +78,38 @@ public enum Message: CustomStringConvertible { case modalityChanged(to: String) case receivedNothingYet } + +extension Message: Equatable { + public static func == (lhs: Message, rhs: Message) -> Bool { + switch (lhs, rhs) { + case (.capturedLocation(let geoLocationLhs), .capturedLocation(let geoLocationRhs)): + return geoLocationLhs == geoLocationRhs + case (.capturedAltitude(let altitudeLhs), .capturedAltitude(let altitudeRhs)): + return altitudeLhs == altitudeRhs + case (.capturedAcceleration(let sensorValueLhs), .capturedAcceleration(let sensorValueRhs)): + return sensorValueLhs == sensorValueRhs + case (.capturedRotation(let sensorValueLhs), .capturedRotation(let sensorValueRhs)): + return sensorValueLhs == sensorValueRhs + case (.capturedDirection(let sensorValueLhs), .capturedDirection(let sensorValueRhs)): + return sensorValueLhs == sensorValueRhs + case (.started(let timestampLhs), .started(let timestampRhs)): + return timestampLhs == timestampRhs + case (.stopped(let timestampLhs), .stopped(let timestampRhs)): + return timestampLhs == timestampRhs + case (.paused(let timestampLhs), .paused(let timestampRhs)): + return timestampLhs == timestampRhs + case (.resumed(let timestampLhs), .resumed(let timestampRhs)): + return timestampLhs == timestampRhs + case (.hasFix, .hasFix): + return true + case (.fixLost, .fixLost): + return true + case (.modalityChanged(let toLhs), .modalityChanged(to: let toRhs)): + return toLhs == toRhs + case (.receivedNothingYet, .receivedNothingYet): + return true + default: + return false + } + } +} diff --git a/DataCapturing/Sources/DataCapturing/Model/Altitude.swift b/DataCapturing/Sources/DataCapturing/Model/Altitude.swift index 487dd53a..08c95473 100644 --- a/DataCapturing/Sources/DataCapturing/Model/Altitude.swift +++ b/DataCapturing/Sources/DataCapturing/Model/Altitude.swift @@ -24,6 +24,7 @@ import CoreData A struct to wrap all the information associated with a measured altitude provided by an altimeter. - Author: Klemens Muthmann + - version: 1.0.0 */ public class Altitude: CustomStringConvertible { /// The relative altitude change since the last measured value, in meters. @@ -47,3 +48,15 @@ public class Altitude: CustomStringConvertible { self.time = time } } + +extension Altitude: Equatable { + public static func == (lhs: Altitude, rhs: Altitude) -> Bool { + if lhs===rhs { + return true + } else { + return lhs.relativeAltitude.equal(rhs.relativeAltitude, precise: 3) && + lhs.pressure.equal(rhs.pressure, precise: 3) && + lhs.time == rhs.time + } + } +} diff --git a/DataCapturing/Sources/DataCapturing/Model/Double.swift b/DataCapturing/Sources/DataCapturing/Model/Double.swift new file mode 100644 index 00000000..c9b044a0 --- /dev/null +++ b/DataCapturing/Sources/DataCapturing/Model/Double.swift @@ -0,0 +1,22 @@ +// +// File.swift +// +// +// Created by Klemens Muthmann on 18.03.24. +// + +import Foundation + +public extension Double { + public func equal(_ value: Double, precise: Int) -> Bool { + let denominator: Double = pow(10.0, Double(precise)) + let maxDiff: Double = 1 / denominator + let realDiff: Double = self - value + + if fabs(realDiff) <= maxDiff { + return true + } else { + return false + } + } +} diff --git a/DataCapturing/Sources/DataCapturing/Model/FinishedMeasurement.swift b/DataCapturing/Sources/DataCapturing/Model/FinishedMeasurement.swift index 0b46fdd3..3cb740db 100644 --- a/DataCapturing/Sources/DataCapturing/Model/FinishedMeasurement.swift +++ b/DataCapturing/Sources/DataCapturing/Model/FinishedMeasurement.swift @@ -74,7 +74,7 @@ public class FinishedMeasurement: Hashable, Equatable { - parameter managedObject: The managed CoreData object to initialize this `Measurement` from. - throws: `InconstantData.locationOrderViolation` if the timestamps of the locations in this measurement are not strongly monotonically increasing. */ - convenience init(managedObject: MeasurementMO) throws { + public convenience init(managedObject: MeasurementMO) throws { let accelerationFile = SensorValueFile(fileType: .accelerationValueType, qualifier: String(managedObject.unsignedIdentifier)) let directionFile = SensorValueFile(fileType: .directionValueType, qualifier: String(managedObject.unsignedIdentifier)) let rotationFile = SensorValueFile(fileType: .rotationValueType, qualifier: String(managedObject.unsignedIdentifier)) diff --git a/DataCapturing/Sources/DataCapturing/Model/GeoLocation.swift b/DataCapturing/Sources/DataCapturing/Model/GeoLocation.swift index 9cbfd441..6fa8f1b2 100644 --- a/DataCapturing/Sources/DataCapturing/Model/GeoLocation.swift +++ b/DataCapturing/Sources/DataCapturing/Model/GeoLocation.swift @@ -123,3 +123,19 @@ public class GeoLocation: CustomStringConvertible { return distance(from: CLLocation(latitude: previousLocation.latitude, longitude: previousLocation.longitude)) } } + +extension GeoLocation: Equatable { + public static func == (lhs: GeoLocation, rhs: GeoLocation) -> Bool { + if lhs === rhs { + return true + } else { + return lhs.latitude.equal(rhs.latitude, precise: 6) && + lhs.longitude.equal(rhs.longitude, precise: 6) && + lhs.speed.equal(rhs.speed, precise: 2) && + lhs.accuracy.equal(rhs.accuracy, precise: 3) && + lhs.time == rhs.time && + lhs.altitude.equal(rhs.altitude, precise: 3) && + lhs.verticalAccuracy.equal(rhs.verticalAccuracy, precise: 3) + } + } +} diff --git a/DataCapturing/Sources/DataCapturing/Persistence/CapturedDataStorage.swift b/DataCapturing/Sources/DataCapturing/Persistence/CapturedDataStorage.swift index 35576ab5..32ea887a 100644 --- a/DataCapturing/Sources/DataCapturing/Persistence/CapturedDataStorage.swift +++ b/DataCapturing/Sources/DataCapturing/Persistence/CapturedDataStorage.swift @@ -30,35 +30,38 @@ import CoreData - version: 1.0.0 */ public protocol CapturedDataStorage { - /// Create a new measurement within the data store. - func createMeasurement(_ initialMode: String) throws -> UInt64 /// Subscribe to a running measurement and store the data produced by that measurement. + /// - Returns: The application wide unique identifier under which the new measurement is stored in the database. func subscribe( to measurement: Measurement, - _ identifier: UInt64, - _ receiveCompletion: @escaping (() -> Void)) throws + _ initialMode: String, + _ receiveCompletion: @escaping ((_ databaseIdentifier: UInt64) async -> Void)) throws -> UInt64 /// Stop receiving updates from the currently subscribed measurement or do nothing if this was not subscribed at the moment. func unsubscribe() } /** -An implementation of `CapturedDataStorage` for storing the data to a CoreData database. + An implementation of `CapturedDataStorage` for storing the data to a CoreData database. - author: Klemens Muthmann - version: 1.0.1 */ public class CapturedCoreDataStorage where SVFF.Serializable == [SensorValue] { + // MARK: - Properties /// The `DataStoreStack` to write the captured data to. let dataStoreStack: DataStoreStack /// A queue used to buffer received data until writing it as a bulk for performance reasons. let cachingQueue = DispatchQueue(label: "de.cyface.cache") /// The time interval to wait until the next batch of data is stored to the data storage. Increasing this time should improve performance but increases memory usage. let interval: TimeInterval - /// The *Combine* cancellables used so new values are transmitted. + /// The *Combine* cancellables used so new values are transmitted. References to this must be kept here, so that *Combine* does not stop the data flow. var cancellables = [AnyCancellable]() /// Creator for storing sensor values to a file. let sensorValueFileFactory: SVFF + /// A Publisher of messages sent by the persistence layer on storage events. + let persistenceMessages = PassthroughSubject() + // MARK: - Initializers /** - Parameter interval: The time interval to wait until the next batch of data is stored to the data storage. Increasing this time should improve performance but increases memory usage. */ @@ -71,11 +74,10 @@ public class CapturedCoreDataStorage where SVFF.Se self.interval = interval self.sensorValueFileFactory = sensorValueFileFactory } -} -extension CapturedCoreDataStorage: CapturedDataStorage { - - public func createMeasurement(_ initialMode: String) throws -> UInt64 { + // MARK: - Private Methods + /// Create a new measurement within the data store. + private func createMeasurement(_ initialMode: String) throws -> UInt64 { return try dataStoreStack.wrapInContextReturn { context in let time = Date() let measurementMO = MeasurementMO(context: context) @@ -90,120 +92,149 @@ extension CapturedCoreDataStorage: CapturedDataStorage { } } + private func load(measurement identifier: UInt64, from context: NSManagedObjectContext) throws -> MeasurementMO { + guard let measurementRequest = context.persistentStoreCoordinator?.managedObjectModel.fetchRequestFromTemplate( + withName: "measurementByIdentifier", + substitutionVariables: ["identifier": identifier] + ) else { + os_log( + "Unable to load measurement fetch request.", + log: OSLog.persistence, + type: .debug + ) + throw PersistenceError.measurementNotLoadable(identifier) + } + guard let measurementMO = try measurementRequest.execute().first as? MeasurementMO else { + os_log( + "Unable to load measurement to store to", + log: OSLog.persistence, + type: .debug + ) + throw PersistenceError.measurementNotLoadable(identifier) + } + + return measurementMO + } + + private func handle(messages: [Message], measurement identifier: UInt64) throws { + try self.dataStoreStack.wrapInContext { context in + let measurementMo = try self.load(measurement: identifier, from: context) + + let accelerationsFile = self.sensorValueFileFactory.create( + fileType: SensorValueFileType.accelerationValueType, + qualifier: String(measurementMo.unsignedIdentifier) + ) + let rotationsFile = self.sensorValueFileFactory.create( + fileType: SensorValueFileType.rotationValueType, + qualifier: String(measurementMo.unsignedIdentifier) + ) + let directionsFile = self.sensorValueFileFactory.create( + fileType: SensorValueFileType.directionValueType, + qualifier: String(measurementMo.unsignedIdentifier) + ) + + try messages.forEach { message in + switch message { + case .capturedLocation(let location): + self.store(location: location, to: measurementMo, context) + case .capturedAltitude(let altitude): + self.store(altitude: altitude, to: measurementMo, context) + case .capturedRotation(let rotation): + try self.store(rotation, to: rotationsFile) + case .capturedDirection(let direction): + try self.store(direction, to: directionsFile) + case .capturedAcceleration(let acceleration): + try self.store(acceleration, to: accelerationsFile) + case .started(timestamp: let time): + os_log("Storing started event to database.", log: OSLog.persistence, type: .debug) + measurementMo.addToTracks(TrackMO(context: context)) + measurementMo.addToEvents(EventMO(event: Event(time: time, type: .lifecycleStart), context: context)) + case .resumed(timestamp: let time): + measurementMo.addToTracks(TrackMO(context: context)) + measurementMo.addToEvents(EventMO(event: Event(time: time, type: .lifecycleResume), context: context)) + case .paused(timestamp: let time): + measurementMo.addToEvents(EventMO(event: Event(time: time, type: .lifecyclePause), context: context)) + case .stopped(timestamp: let time): + try self.onStop(measurement: measurementMo, context, time) + default: + os_log("Message %{PUBLIC}@ irrelevant for data storage and thus ignored.",log: OSLog.persistence, type: .debug, message.description) + } + } + } + } + + private func store(location: GeoLocation, to measurementMo: MeasurementMO, _ context: NSManagedObjectContext) { + os_log("Storing location to database.", log: OSLog.persistence, type: .debug) + if let lastTrack = measurementMo.typedTracks().last { + lastTrack.addToLocations(GeoLocationMO(location: location, context: context)) + } + } + + private func store(altitude: Altitude, to measurementMo: MeasurementMO, _ context: NSManagedObjectContext) { + if let lastTrack = measurementMo.typedTracks().last { + lastTrack.addToAltitudes(AltitudeMO(altitude: altitude, context: context)) + } + } + + private func store(_ value: SensorValue, to file: SVF) throws where SVF.Serializable == SVFF.Serializable { + do { + _ = try file.write(serializable: [value]) + } catch { + debugPrint("Unable to write data to file \(file.fileName)!") + throw error + } + } + + private func onStop(measurement measurementMo: MeasurementMO, _ context: NSManagedObjectContext, _ time: Date) throws { + os_log("Storing stopped event to database.", log: OSLog.persistence, type: .debug) + measurementMo.addToEvents(EventMO(event: Event(time: time, type: .lifecycleStop), context: context)) + measurementMo.synchronizable = true + try context.save() + os_log("Stored finished measurement.", log: OSLog.persistence, type: .debug) + } + +} + +// MARK: - Implementation of CapturedDataStorage Protocol +extension CapturedCoreDataStorage: CapturedDataStorage { + /// Recievie updates from the provided ``Measurement`` and store the data to a ``DataStoreStack``. public func subscribe( to measurement: Measurement, - _ identifier: UInt64, - _ receiveCompletion: @escaping (() -> Void) - ) throws { + _ initialMode: String, + _ receiveCompletion: @escaping ((_ databaseIdentifier: UInt64) async -> Void) + ) throws -> UInt64 { + let measurementIdentifier = try createMeasurement(initialMode) + let cachedFlow = measurement.measurementMessages.collect(.byTime(cachingQueue, 1.0)) cachedFlow - .sink(receiveCompletion: { _ in - os_log( - "Completing storage flow.", - log: OSLog.persistence, - type: .debug - ) - receiveCompletion() - }) { [weak self] (messages: [Message]) in - do { - try self?.dataStoreStack.wrapInContext { context in - guard let measurementRequest = context.persistentStoreCoordinator?.managedObjectModel.fetchRequestFromTemplate(withName: "measurementByIdentifier", substitutionVariables: ["identifier": identifier]) else { - os_log( - "Unable to load measurement fetch request.", - log: OSLog.persistence, - type: .debug - ) - return - } - guard let measurement = try measurementRequest.execute().first as? MeasurementMO else { - os_log( - "Unable to load measurement to store to", - log: OSLog.persistence, - type: .debug - ) - return - } - - let accelerationsFile = self?.sensorValueFileFactory.create( - fileType: SensorValueFileType.accelerationValueType, - qualifier: String(measurement.unsignedIdentifier) - ) - let rotationsFile = self?.sensorValueFileFactory.create( - fileType: SensorValueFileType.rotationValueType, - qualifier: String(measurement.unsignedIdentifier) - ) - let directionsFile = self?.sensorValueFileFactory.create( - fileType: SensorValueFileType.directionValueType, - qualifier: String(measurement.unsignedIdentifier) - ) - - try messages.forEach { message in - switch message { - case .capturedLocation(let location): - os_log( - "Storing location to database.", - log: OSLog.persistence, - type: .debug - ) - if let lastTrack = measurement.typedTracks().last { - lastTrack.addToLocations(GeoLocationMO(location: location, context: context)) - } - case .capturedAltitude(let altitude): - if let lastTrack = measurement.typedTracks().last { - lastTrack.addToAltitudes(AltitudeMO(altitude: altitude, context: context)) - } - case .capturedRotation(let rotation): - do { - _ = try rotationsFile?.write(serializable: [rotation]) - } catch { - debugPrint("Unable to write data to file \(String(describing: rotationsFile?.fileName))!") - throw error - } - case .capturedDirection(let direction): - do { - _ = try directionsFile?.write(serializable: [direction]) - } catch { - debugPrint("Unable to write data to file \(String(describing: directionsFile?.fileName))!") - throw error - } - case .capturedAcceleration(let acceleration): - do { - _ = try accelerationsFile?.write(serializable: [acceleration]) - } catch { - debugPrint("Unable to write data to file \(String(describing: accelerationsFile?.fileName))!") - throw error - } - case .started(timestamp: let time): - os_log("Storing started event to database.", log: OSLog.persistence, type: .debug) - measurement.addToTracks(TrackMO(context: context)) - measurement.addToEvents(EventMO(event: Event(time: time, type: .lifecycleStart), context: context)) - case .resumed(timestamp: let time): - measurement.addToTracks(TrackMO(context: context)) - measurement.addToEvents(EventMO(event: Event(time: time, type: .lifecycleResume), context: context)) - case .paused(timestamp: let time): - measurement.addToEvents(EventMO(event: Event(time: time, type: .lifecyclePause), context: context)) - case .stopped(timestamp: let time): - os_log("Storing stopped event to database.", log: OSLog.persistence, type: .debug) - measurement.addToEvents(EventMO(event: Event(time: time, type: .lifecycleStop), context: context)) - measurement.synchronizable = true - default: - os_log("Message %{PUBLIC}@ irrelevant for data storage and thus ignored.",log: OSLog.persistence, type: .debug, message.description) - } - } - - try context.save() + .sink(receiveCompletion: { status in + switch status { + case .finished: + os_log( + "Completing storage flow.", + log: OSLog.persistence, + type: .debug + ) + Task { + await receiveCompletion(measurementIdentifier) } + case .failure(let error): + os_log("Unable to complete measurement %@", log: OSLog.persistence, type: .error, error.localizedDescription) + } + } + ) { [weak self] (messages: [Message]) in + do { + try self?.handle(messages: messages, measurement: measurementIdentifier) } catch { os_log("Unable to store data! Error %{PUBLIC}@",log: OSLog.persistence ,type: .error, error.localizedDescription) } }.store(in: &cancellables) + + return measurementIdentifier } public func unsubscribe() { - cancellables.forEach { cancellable in - cancellable.cancel() - } cancellables.removeAll(keepingCapacity: true) } } diff --git a/DataCapturing/Sources/DataCapturing/Persistence/DataStoreStack.swift b/DataCapturing/Sources/DataCapturing/Persistence/DataStoreStack.swift index 52ca074f..cd405e87 100644 --- a/DataCapturing/Sources/DataCapturing/Persistence/DataStoreStack.swift +++ b/DataCapturing/Sources/DataCapturing/Persistence/DataStoreStack.swift @@ -43,13 +43,7 @@ public protocol DataStoreStack { /// /// This method must be called shortly after initialization and before any other interaction with a newly created `DataStoreStack`. func setup() async throws - /// Provide an underlaying ``PersistenceLayer``. - /// - /// This method is only here for backwards compatibility. - /// `PersistenceLayer` is scheduled to die pretty soon. - @available(*, deprecated, message: "PersistenceLayer should not be used any longer. Please write your own data interaction code using `wrapInContext` and `wrapInContextReturn`.") - func persistenceLayer() -> PersistenceLayer - /// This checks if a measurement with that identifier already exists and generates a new identifier until it finds one with no corresponding measurement. + /// This checks if a measurement with that identifier already exists and generates a new identifier until it finds one with no corresponding measurement. /// This is required to handle legacy data and installations, that still have measurements with falsely generated data. func nextValidIdentifier() throws -> UInt64 } @@ -151,10 +145,6 @@ public class CoreDataStack: DataStoreStack { } } - public func persistenceLayer() -> PersistenceLayer { - return CoreDataPersistenceLayer(onManager: self) - } - /// Load the specified `NSManagedObjectModel` from disk. public static func load(model: String = "CyfaceModel", bundle: Bundle) throws -> NSManagedObjectModel { guard let modelURL = bundle.url(forResource: model, withExtension: "momd") else { diff --git a/DataCapturing/Sources/DataCapturing/Persistence/SensorValueFileFactory.swift b/DataCapturing/Sources/DataCapturing/Persistence/SensorValueFileFactory.swift index 0a95af83..a368669e 100644 --- a/DataCapturing/Sources/DataCapturing/Persistence/SensorValueFileFactory.swift +++ b/DataCapturing/Sources/DataCapturing/Persistence/SensorValueFileFactory.swift @@ -48,10 +48,6 @@ public struct DefaultSensorValueFileFactory: SensorValueFileFactory { public typealias FileType = SensorValueFile - //public typealias Serializable = SensorValue - - //public typealias SpecificSerializer = SensorValueSerializer - public init() { // Nothing to do here. } diff --git a/DataCapturing/Sources/DataCapturing/Synchronization/Upload/Default/DefaultUploadProcess.swift b/DataCapturing/Sources/DataCapturing/Synchronization/Upload/Default/DefaultUploadProcess.swift index 6e2851d8..effe7b0b 100644 --- a/DataCapturing/Sources/DataCapturing/Synchronization/Upload/Default/DefaultUploadProcess.swift +++ b/DataCapturing/Sources/DataCapturing/Synchronization/Upload/Default/DefaultUploadProcess.swift @@ -26,6 +26,9 @@ import Combine It orchestrates the different requests required by the Cyface Upload Protocol. + Information about the current upload are distributed using the *Combine* Publisher ``DefaultUploadProcess/uploadStatus``. + For possible events see ``UploadStatusType``. + - author: Klemens Muthmann - version: 1.0.0 */ diff --git a/RFR-App/RFR.xcodeproj/project.pbxproj b/RFR-App/RFR.xcodeproj/project.pbxproj index cba513ba..9da69875 100644 --- a/RFR-App/RFR.xcodeproj/project.pbxproj +++ b/RFR-App/RFR.xcodeproj/project.pbxproj @@ -752,7 +752,7 @@ "$(inherited)", "@executable_path/Frameworks", ); - MARKETING_VERSION = 3.1.4; + MARKETING_VERSION = 3.1.5; PRODUCT_BUNDLE_IDENTIFIER = de.cyface.RFR; PRODUCT_NAME = "Ready for Robots"; SUPPORTED_PLATFORMS = "iphoneos iphonesimulator"; @@ -875,7 +875,7 @@ "$(inherited)", "@executable_path/Frameworks", ); - MARKETING_VERSION = 3.1.4; + MARKETING_VERSION = 3.1.5; PRODUCT_BUNDLE_IDENTIFIER = de.cyface.RFR.staging; PRODUCT_NAME = "Ready for Robots Staging"; SUPPORTED_PLATFORMS = "iphoneos iphonesimulator"; @@ -992,7 +992,7 @@ "$(inherited)", "@executable_path/Frameworks", ); - MARKETING_VERSION = 3.1.4; + MARKETING_VERSION = 3.1.5; PRODUCT_BUNDLE_IDENTIFIER = de.cyface.RFR.staging; PRODUCT_NAME = "Ready for Robots Staging"; SUPPORTED_PLATFORMS = "iphoneos iphonesimulator"; @@ -1109,7 +1109,7 @@ "$(inherited)", "@executable_path/Frameworks", ); - MARKETING_VERSION = 3.1.4; + MARKETING_VERSION = 3.1.5; PRODUCT_BUNDLE_IDENTIFIER = de.cyface.RFR; PRODUCT_NAME = "Ready for Robots"; SUPPORTED_PLATFORMS = "iphoneos iphonesimulator"; @@ -1734,7 +1734,7 @@ "$(inherited)", "@executable_path/Frameworks", ); - MARKETING_VERSION = 3.1.4; + MARKETING_VERSION = 3.1.5; PRODUCT_BUNDLE_IDENTIFIER = de.cyface.RFR.dev; PRODUCT_NAME = "Ready for Robots Development"; SUPPORTED_PLATFORMS = "iphoneos iphonesimulator"; @@ -1772,7 +1772,7 @@ "$(inherited)", "@executable_path/Frameworks", ); - MARKETING_VERSION = 3.1.4; + MARKETING_VERSION = 3.1.5; PRODUCT_BUNDLE_IDENTIFIER = de.cyface.RFR.dev; PRODUCT_NAME = "Ready for Robots Development"; SUPPORTED_PLATFORMS = "iphoneos iphonesimulator"; diff --git a/RFR-App/RFR/Live/LiveViewModel.swift b/RFR-App/RFR/Live/LiveViewModel.swift index 726fc0d2..c60433fa 100644 --- a/RFR-App/RFR/Live/LiveViewModel.swift +++ b/RFR-App/RFR/Live/LiveViewModel.swift @@ -67,137 +67,38 @@ class LiveViewModel: ObservableObject { return measurement } else { let measurement = MeasurementImpl() - measurement.measurementMessages - .receive(on: DispatchQueue.main) - .assign(to: &$message) - - registerLifecycleFlows(measurement) - // Only location captured events - let locationsFlow = measurement.measurementMessages.filter { if case Message.capturedLocation = $0 { return true } else { return false } }.compactMap {if case let Message.capturedLocation(location) = $0 { return location } else { return nil }} - // Use the most recent location to provide the speed value - locationsFlow.filter {location in location.speed >= 0.0 }.compactMap { location in "\(speedFormatter.string(from: location.speed as NSNumber) ?? "0.0") km/h" }.receive(on: RunLoop.main).assign(to: &$speed) - - // Organize all received locations into the local locations array, and stream that array for further processing - let trackFlow = locationsFlow - .compactMap { [weak self] location in - let endIndex = max((self?.locations.count ?? 0)-1, 0) - self?.locations[endIndex].append(location) - return self?.locations - } - // Calculate and store distance covered, by all the tracks from the current measurement. - let distanceFlow = trackFlow.map {(tracks: [[GeoLocation]]) in - return tracks - .map { track in - var trackLength = 0.0 - var prevLocation: GeoLocation? = nil - for location in track { - if let prevLocation = prevLocation { - trackLength += location.distance(from: prevLocation) - } - prevLocation = location - } - return trackLength - } - .reduce(0.0) { accumulator, next in - accumulator + next - } - } - distanceFlow.compactMap { - distanceFormatter.string(from: $0 as NSNumber) - }.map { formattedDistance in - "\(formattedDistance) km" - } - .receive(on: RunLoop.main) - .assign(to: &$distance) - // Calculate and store average speed over all the tracks from this measurement. - trackFlow.map { tracks in - Statistics.averageSpeed(timelines: tracks) - } - .filter { $0 >= 0.0} - .compactMap { - speedFormatter.string(from: $0 as NSNumber) - } - .map { formattedSpeed in - "\(formattedSpeed) km/h" - } - .receive(on: RunLoop.main) - .assign(to: &$averageSpeed) - - // Calculate and store the total duration for all the tracks in this measurement. - trackFlow - .map { tracks in - return Statistics.duration(timelines: tracks) - } - .compactMap { - timeFormatter.string(from: $0) - } - .receive(on: RunLoop.main) - .assign(to: &$duration) - - // Calculate the total rise for all the tracks in this measurement. - measurement.measurementMessages - .filter { - if case Message.capturedAltitude = $0 { - return true - } else { - return false - } - } - .compactMap { message in - if case let Message.capturedAltitude(altitude) = message { - return altitude - } else { - return nil - } - } - .compactMap { [weak self] (altitude: DataCapturing.Altitude) in - let endIndex = max((self?.altitudes.count ?? 0)-1, 0) - self?.altitudes[endIndex].append(altitude) - return self?.altitudes - } - .map { (tracks: [[DataCapturing.Altitude]]) in - return tracks.map { track in - os_log("Using altimeter values to calculate accumulated height.", log: OSLog.measurement, type: .debug) - var previousAltitude: Double? = nil - var sum = 0.0 - for altitude in track { - if let previousAltitude = previousAltitude { - let relativeAltitudeChange = altitude.relativeAltitude - previousAltitude - if relativeAltitudeChange > 0.1 { - sum += relativeAltitudeChange - } - } - previousAltitude = altitude.relativeAltitude - } - return sum - //} - } - .reduce(0.0) { accumulator, next in - accumulator + next - } - } - .compactMap { - riseFormatter.string(from: $0 as NSNumber) + // Forward finished messages so the UI can update accordingly. + let finishedMessages = finishedFlow() + let locationFlow = locationFlow() + let altitudeFlow = altitudeFlow() + let startedFlow = startFlow() + let stoppedFlow = stopFlow() + let pausedFlow = pauseFlow() + let resumedFlow = resumeFlow() + + // Send each received message to the correct stream + measurement.measurementMessages.sink { message in + switch message { + case .capturedLocation(let location): + locationFlow.send(location) + case .capturedAltitude(let altitude): + altitudeFlow.send(altitude) + case .started(timestamp: _): + startedFlow.send(MeasurementState.running) + case .stopped(timestamp: _): + stoppedFlow.send(MeasurementState.stopped) + case .paused(timestamp: _): + pausedFlow.send(MeasurementState.paused) + case .resumed(timestamp: _): + resumedFlow.send(MeasurementState.running) + case .finished(timestamp: _): + os_log("Routing finished event", log: OSLog.capturingEvent, type: .debug) + finishedMessages.send(message) + default: + os_log("Encountered unhandled message %@", log: OSLog.capturingEvent, type: .debug, message.description) } - .map { formattedRise in - "\(formattedRise) m" - } - .receive(on: RunLoop.main) - .assign(to: &$inclination) - - // - distanceFlow.map { - Statistics.avoidedEmissions($0) - } - .compactMap { - emissionsFormatter.string(from: $0 as NSNumber) - } - .map { formattedEmissions in - "\(formattedEmissions) g CO₂" - } - .receive(on: RunLoop.main) - .assign(to: &$avoidedEmissions) + }.store(in: &cancellables) self._measurement = measurement return measurement @@ -212,8 +113,8 @@ class LiveViewModel: ObservableObject { private var cancellables = [AnyCancellable]() /// Captures and publishes any error produced by this model. @Published var error: Error? - /// Always contains the most recent message received from the Cyface SDK. - @Published var message: Message = Message.receivedNothingYet + /// Always contains the most recent message received from the Cyface SDK. The measurements view listenes to this property to show updates to the live measurement if necessary. + @Published var finishedMessages: Message = Message.receivedNothingYet /** Initialize an object of this class. @@ -331,12 +232,6 @@ class LiveViewModel: ObservableObject { func onStopPressed() throws { if measurement.isRunning || measurement.isPaused { try measurement.stop() - self.cancellables.forEach { - $0.cancel() - } - self.cancellables.removeAll(keepingCapacity: true) - self._measurement = nil - self.dataStorageProcess.unsubscribe() } } @@ -345,24 +240,11 @@ class LiveViewModel: ObservableObject { */ func onPlayPressed() throws { if measurement.isPaused { - if let identifier = identifier { - try dataStorageProcess.subscribe( - to: measurement, - identifier - ) {} - - try measurement.resume() - } + try measurement.resume() } else if !measurement.isPaused && !measurement.isRunning{ // Is stopped - identifier = try dataStorageProcess.createMeasurement("BICYCLE") - if let identifier = identifier { - try dataStorageProcess.subscribe( - to: measurement, - identifier - ) {} - measurementName = String(localized: "measurement \(identifier)", comment: "Title label of a running measurement.") - try measurement.start() - } + let identifier = try dataStorageProcess.subscribe(to: measurement,"BICYCLE", onFinishedMeasurement) + measurementName = String(localized: "measurement \(identifier)", comment: "Title label of a running measurement.") + try measurement.start() } } @@ -373,71 +255,72 @@ class LiveViewModel: ObservableObject { } } - /// Register all the Combine flows required to capture lifecycle events such as `start`, `stop`, `pause` and `resume`. - private func registerLifecycleFlows(_ measurement: DataCapturing.Measurement) { - startFlow(measurement) - pauseFlow(measurement) - resumeFlow(measurement) - stopFlow(measurement) - + private func onFinishedMeasurement(_ databaseIdentifier: UInt64) { + os_log("Cleanup after measurement has finished", log: OSLog.measurement, type: .debug) + self.cancellables.removeAll(keepingCapacity: true) + self._measurement = nil + self.dataStorageProcess.unsubscribe() + finishedMessages = Message.finished(timestamp: Date.now) } /// Setup Combine flow to handle ``Measurement`` start events. - private func startFlow(_ measurement: DataCapturing.Measurement) { - // Setting state - let startedFlow = measurement.measurementMessages - .filter { if case Message.started = $0 { return true } else { return false }} - .map { _ in MeasurementState.running } - startedFlow + private func startFlow() -> PassthroughSubject { + let startFlow = PassthroughSubject() + // Setting State + startFlow .receive(on: RunLoop.main) .assign(to: &$measurementState) // Append collections for the first track - startedFlow + startFlow .sink { [weak self] _ in self?.locations.append([GeoLocation]()) self?.altitudes.append([DataCapturing.Altitude]()) } .store(in: &cancellables) + + return startFlow } /// Setup Combine flow to handle ``Measurement`` pause events. - private func pauseFlow(_ measurement: DataCapturing.Measurement) { + private func pauseFlow() -> PassthroughSubject { // Handle pause event - measurement.measurementMessages - .filter { if case Message.paused = $0 { return true } else { return false }} - .map { _ in MeasurementState.paused} + let pauseFlow = PassthroughSubject() + // Setting state + pauseFlow .receive(on: RunLoop.main) .assign(to: &$measurementState) + + return pauseFlow } /// Setup Combine flow to handle ``Measurement`` resume events. - private func resumeFlow(_ measurement: DataCapturing.Measurement) { - let resumedFlow = measurement.measurementMessages - .filter { if case Message.resumed = $0 { return true } else { return false }} - .map { _ in MeasurementState.running} - resumedFlow - .receive(on: RunLoop.main).assign(to: &$measurementState) + private func resumeFlow() -> PassthroughSubject { + let resumeFlow = PassthroughSubject() + // Setting state + resumeFlow + .receive(on: RunLoop.main) + .assign(to: &$measurementState) // Append collections for the next track - resumedFlow + resumeFlow .sink { [weak self] _ in self?.locations.append([GeoLocation]()) self?.altitudes.append([DataCapturing.Altitude]()) } .store(in: &cancellables) + + return resumeFlow } /// Setup Combine flow to handle ``Measurement`` stop events. - private func stopFlow(_ measurement: DataCapturing.Measurement) { - let stoppedEvents = measurement.measurementMessages - .filter { if case Message.stopped = $0 { return true} else { return false }} - .map { _ in MeasurementState.stopped} + private func stopFlow() -> PassthroughSubject { + let stoppedFlow = PassthroughSubject() // Setting state - stoppedEvents + stoppedFlow .receive(on: RunLoop.main) .assign(to: &$measurementState) // Clean state of this model. // Clear storage for altitudes and locations. - stoppedEvents + stoppedFlow .sink {[weak self] _ in os_log("Cleanup after Stop.") @@ -445,6 +328,141 @@ class LiveViewModel: ObservableObject { self?.altitudes.removeAll(keepingCapacity: true) } .store(in: &cancellables) + + return stoppedFlow + } + + private func finishedFlow() -> PassthroughSubject { + let finishedMessages = PassthroughSubject() + finishedMessages + .receive(on: RunLoop.main) + .assign(to: &$finishedMessages) + finishedMessages.sink { _ in + os_log("Cleanup after measurement has finished", log: OSLog.measurement, type: .debug) + /*self.cancellables.forEach { + $0.cancel() + }*/ + self.cancellables.removeAll(keepingCapacity: true) + self._measurement = nil + self.dataStorageProcess.unsubscribe() + }.store(in: &cancellables) + + return finishedMessages + } + + private func locationFlow() -> PassthroughSubject { + let locationFlow = PassthroughSubject() + // Use the most recent location to provide the speed value + locationFlow.filter {location in location.speed >= 0.0 }.compactMap { location in "\(speedFormatter.string(from: location.speed as NSNumber) ?? "0.0") km/h" }.receive(on: RunLoop.main).assign(to: &$speed) + // Organize all received locations into the local locations array, and stream that array for further processing + let trackFlow = locationFlow + .compactMap { [weak self] location in + let endIndex = max((self?.locations.count ?? 0)-1, 0) + self?.locations[endIndex].append(location) + return self?.locations + } + // Calculate and store distance covered, by all the tracks from the current measurement. + let distanceFlow = trackFlow.map {(tracks: [[GeoLocation]]) in + return tracks + .map { track in + var trackLength = 0.0 + var prevLocation: GeoLocation? = nil + for location in track { + if let prevLocation = prevLocation { + trackLength += location.distance(from: prevLocation) + } + prevLocation = location + } + return trackLength + } + .reduce(0.0) { accumulator, next in + accumulator + next + } + } + distanceFlow.compactMap { + distanceFormatter.string(from: $0 as NSNumber) + }.map { formattedDistance in + "\(formattedDistance) km" + } + .receive(on: RunLoop.main) + .assign(to: &$distance) + // Calculate and store average speed over all the tracks from this measurement. + trackFlow.map { tracks in + Statistics.averageSpeed(timelines: tracks) + } + .filter { $0 >= 0.0} + .compactMap { + speedFormatter.string(from: $0 as NSNumber) + } + .map { formattedSpeed in + "\(formattedSpeed) km/h" + } + .receive(on: RunLoop.main) + .assign(to: &$averageSpeed) + // Calculate avoided emissions + distanceFlow.map { + Statistics.avoidedEmissions($0) + } + .compactMap { + emissionsFormatter.string(from: $0 as NSNumber) + } + .map { formattedEmissions in + "\(formattedEmissions) g CO₂" + } + .receive(on: RunLoop.main) + .assign(to: &$avoidedEmissions) + // Calculate and store the total duration for all the tracks in this measurement. + trackFlow + .map { tracks in + return Statistics.duration(timelines: tracks) + } + .compactMap { + timeFormatter.string(from: $0) + } + .receive(on: RunLoop.main) + .assign(to: &$duration) + + return locationFlow + } + + private func altitudeFlow() -> PassthroughSubject { + let altitudeFlow = PassthroughSubject() + altitudeFlow + .compactMap { [weak self] (altitude: DataCapturing.Altitude) in + let endIndex = max((self?.altitudes.count ?? 0)-1, 0) + self?.altitudes[endIndex].append(altitude) + return self?.altitudes + } + .map { (tracks: [[DataCapturing.Altitude]]) in + return tracks.map { track in + os_log("Using altimeter values to calculate accumulated height.", log: OSLog.measurement, type: .debug) + var previousAltitude: Double? = nil + var sum = 0.0 + for altitude in track { + if let previousAltitude = previousAltitude { + let relativeAltitudeChange = altitude.relativeAltitude - previousAltitude + if relativeAltitudeChange > 0.1 { + sum += relativeAltitudeChange + } + } + previousAltitude = altitude.relativeAltitude + } + return sum + } + .reduce(0.0) { accumulator, next in + accumulator + next + } + } + .compactMap { + riseFormatter.string(from: $0 as NSNumber) + } + .map { formattedRise in + "\(formattedRise) m" + } + .receive(on: RunLoop.main) + .assign(to: &$inclination) + + return altitudeFlow } } diff --git a/RFR-App/RFR/Measurements/Measurement.swift b/RFR-App/RFR/Measurements/Measurement.swift index c041cef7..0482e040 100644 --- a/RFR-App/RFR/Measurements/Measurement.swift +++ b/RFR-App/RFR/Measurements/Measurement.swift @@ -209,10 +209,11 @@ enum SynchronizationState { /// Convert a database `MeasurementMO` to its `SynchronizationState` static func from(measurement: MeasurementMO) -> SynchronizationState { let request = UploadSession.fetchRequest() - request.predicate = NSPredicate(format: "measurement.identifier=%d", measurement.identifier) + request.predicate = NSPredicate(format: "measurement=%@", measurement) request.fetchLimit = 1 do { - if try request.execute().first != nil { + let sessions = try request.execute() + if !sessions.isEmpty { return .synchronizing } else if measurement.synchronized { return .synchronized diff --git a/RFR-App/RFR/Measurements/MeasurementsViewModel.swift b/RFR-App/RFR/Measurements/MeasurementsViewModel.swift index e67fce0e..caf42a98 100644 --- a/RFR-App/RFR/Measurements/MeasurementsViewModel.swift +++ b/RFR-App/RFR/Measurements/MeasurementsViewModel.swift @@ -184,7 +184,7 @@ class MeasurementsViewModel: ObservableObject { longitudinalMeters: eastWestReach + eastWestReach * 0.15 ), track: coordinates - ) + ) } /// Causes a reload and redrawn each time measurement data changes. @@ -192,6 +192,7 @@ class MeasurementsViewModel: ObservableObject { try dataStoreStack.wrapInContext { context in let measurementRequest = MeasurementMO.fetchRequest() let storedMeasurements = try measurementRequest.execute() + os_log("Loaded changed measurements", log: OSLog.measurement, type: .debug) storedMeasurements.filter { storedMeasurement in for measurement in measurements { @@ -201,9 +202,10 @@ class MeasurementsViewModel: ObservableObject { } return true }.forEach { filtered in + let measurement = self.load(measurement: filtered) DispatchQueue.main.async { [weak self] in if let self = self { - self.measurements.append(self.load(measurement: filtered)) + self.measurements.append(measurement) updateStatistics() } } @@ -292,23 +294,19 @@ extension MeasurementsViewModel: DataCapturingMessageSubscriber { func subscribe(to messages: some Publisher) { Task { os_log("Subscribing measurements view model to updates from live view!", log: OSLog.measurement, type: .debug) - measurementEventsSubscription = messages.filter { - os_log("Filtering for messages for stopped event! Received %@", log: OSLog.measurement, type: .debug, $0.description) - if case Message.stopped(timestamp: _) = $0 { - os_log("Receiving stopped event", log: OSLog.measurement, type: .debug) - return true - } else { - return false - - } - }.sink { [weak self] message in + measurementEventsSubscription = messages.sink { [weak self] message in + switch message { + case .finished(timestamp: _): os_log("Updating view model with new measurement.") do { try self?.onMeasurementsChanged() } catch { self?.error = error } + default: + os_log("Received invalid message %@", log: OSLog.measurement, type: .debug, message.description) } + } } } } diff --git a/RFR-App/RFR/ViewModel/DataCapturingViewModel.swift b/RFR-App/RFR/ViewModel/DataCapturingViewModel.swift index 7e84e60a..4b959f04 100644 --- a/RFR-App/RFR/ViewModel/DataCapturingViewModel.swift +++ b/RFR-App/RFR/ViewModel/DataCapturingViewModel.swift @@ -38,7 +38,7 @@ class DataCapturingViewModel: ObservableObject { let measurementsViewModel: MeasurementsViewModel /// The view model used to handle measurement synchronization. let syncViewModel: SynchronizationViewModel - // TODO: All of this only concerns the `SynchronizationViewModel` and thus should only appear there. + // TODO: All of this only concerns the `SynchronizationViewModel` and thus should only appear there. --> Actually this class seems to have become unnecessary. Putting stuff together should happen in the AppModel only. Further down only dependency injection should be necessary. /// The authenticator used to authenticate for data uploads let authenticator: Authenticator @@ -59,7 +59,7 @@ class DataCapturingViewModel: ObservableObject { dataStoreStack: dataStoreStack, uploadPublisher: syncViewModel.uploadStatusPublisher ) - measurementsViewModel.subscribe(to: liveViewModel.$message) + measurementsViewModel.subscribe(to: liveViewModel.$finishedMessages) self.dataStoreStack = dataStoreStack try dataStoreStack.wrapInContext { context in let request = MeasurementMO.fetchRequest() diff --git a/RFR-App/RFR/ViewModel/SynchronizationViewModel.swift b/RFR-App/RFR/ViewModel/SynchronizationViewModel.swift index fcaefe3c..ba1f4847 100644 --- a/RFR-App/RFR/ViewModel/SynchronizationViewModel.swift +++ b/RFR-App/RFR/ViewModel/SynchronizationViewModel.swift @@ -63,7 +63,11 @@ class SynchronizationViewModel: NSObject, ObservableObject { self?.uploadStatusPublisher.send(UploadStatus(measurement: status.upload.measurement, status: status.status)) } do { - let measurements = try dataStoreStack.persistenceLayer().loadSynchronizableMeasurements() + let measurements = try dataStoreStack.wrapInContextReturn { context in + let request = MeasurementMO.fetchRequest() + request.predicate = NSPredicate(format: "synchronizable=%@ AND synchronized=%@", NSNumber(booleanLiteral: true), NSNumber(booleanLiteral: false)) + return try request.execute().map { try FinishedMeasurement(managedObject: $0)} + } os_log("Sync: Synchronizing %d measurements!", log: OSLog.synchronization, type: .debug, measurements.count) for measurement in measurements { os_log(.debug, log: OSLog.synchronization, "Sync: Starting synchronization of measurement %d!", measurement.identifier)