diff --git a/DataCapturing/Package.swift b/DataCapturing/Package.swift index fc3eac18..f360db45 100644 --- a/DataCapturing/Package.swift +++ b/DataCapturing/Package.swift @@ -61,7 +61,6 @@ let package = Package( ], exclude: ["Support/Info.plist"], resources: [ - .process("Synchronization/Upload/Background/SessionRegistry.xcdatamodeld"), .process("Model/Migrations/V3toV4/V3toV4.xcmappingmodel"), .process("Model/CyfaceModel.xcdatamodeld"), .process("Model/Migrations/V10toV11/V10toV11.xcmappingmodel"), diff --git a/DataCapturing/Sources/DataCapturing/Capturing/Measurement.swift b/DataCapturing/Sources/DataCapturing/Capturing/Measurement.swift index 370b3795..71f7c00f 100644 --- a/DataCapturing/Sources/DataCapturing/Capturing/Measurement.swift +++ b/DataCapturing/Sources/DataCapturing/Capturing/Measurement.swift @@ -35,8 +35,9 @@ This protocol defines a measurements data together with its lifecycle during dat - Since: 12.0.0 */ public protocol Measurement { + // TODO: It should not be possible to send messages via this variable. So this should be a publisher instead of a PasstroughSubject /// A combine subject used to receive messages during data capturing and forwarding them, to whoever wants to listen. - var measurementMessages: PassthroughSubject { get } + var measurementMessages: AnyPublisher { get } /// A flag to get information about whether this measurement is currently running (`true`) or not (`false`). var isRunning: Bool { get } /// A flag to get information about whether this measurement is currently paused (`true`) or not (`false`). @@ -75,7 +76,6 @@ public class MeasurementImpl { /// `true` if data capturing was running but is currently paused; `false` otherwise. public var isPaused:Bool - // TODO: This should probably be carried out using an actor: See the talk "Protect mutable state with Swift actors" from WWDC 2021 /// The background queue used to capture data. private let capturingQueue: DispatchQueue @@ -85,15 +85,15 @@ public class MeasurementImpl { private let locationCapturer: LocationCapturer // TODO: Switch to Combine --> Make this a publisher on its own. Will have to read up on how to achieve this. - public var measurementMessages: PassthroughSubject + public var messagesSubject: PassthroughSubject - // TODO: This should probably be carried out using an actor: See the talk "Protect mutable state with Swift actors" from WWDC 2021 /** A queue used to synchronize calls to the lifecycle methods `start`, `pause`, `resume` and `stop`. Using such a queue prevents successiv calls to these methods to interrupt each other. */ private let lifecycleQueue: DispatchQueue private var messageCancellable: AnyCancellable? = nil + private var finishedEventCancellable: AnyCancellable? = nil // MARK: - Initializers @@ -114,7 +114,6 @@ public class MeasurementImpl { manager.activityType = .other manager.showsBackgroundLocationIndicator = true manager.distanceFilter = kCLDistanceFilterNone - //manager.requestAlwaysAuthorization() return manager } ) { @@ -122,7 +121,7 @@ public class MeasurementImpl { self.lifecycleQueue = DispatchQueue(label: "lifecycle") self.sensorCapturer = SensorCapturer(capturingQueue: capturingQueue) self.locationCapturer = LocationCapturer(lifecycleQueue: lifecycleQueue, locationManagerFactory: locationManagerFactory) - measurementMessages = PassthroughSubject() + messagesSubject = PassthroughSubject() self.isRunning = false self.isPaused = false @@ -152,7 +151,7 @@ public class MeasurementImpl { messageCancellable = locationCapturer.start().receive(on: lifecycleQueue).merge( with: sensorCapturer.start() - ).subscribe(measurementMessages) + ).subscribe(messagesSubject) self.isRunning = true } @@ -174,6 +173,10 @@ public class MeasurementImpl { // MARK: - Measurement extension MeasurementImpl: Measurement { + public var measurementMessages: AnyPublisher { + return messagesSubject.eraseToAnyPublisher() + } + /** Starts the capturing process. @@ -194,7 +197,7 @@ Starting data capturing on paused service. Finishing paused measurements and sta } try startCapturing() - measurementMessages.send(.started(timestamp: Date())) + messagesSubject.send(.started(timestamp: Date())) } } @@ -218,8 +221,8 @@ Starting data capturing on paused service. Finishing paused measurements and sta stopCapturing() isPaused = false - measurementMessages.send(.stopped(timestamp: Date())) - measurementMessages.send(completion: .finished) + messagesSubject.send(.stopped(timestamp: Date())) + messagesSubject.send(completion: .finished) } } @@ -242,7 +245,7 @@ Starting data capturing on paused service. Finishing paused measurements and sta stopCapturing() isPaused = true - measurementMessages.send(.paused(timestamp: Date())) + messagesSubject.send(.paused(timestamp: Date())) } } @@ -267,7 +270,7 @@ Starting data capturing on paused service. Finishing paused measurements and sta isPaused = false isRunning = true - measurementMessages.send(.resumed(timestamp: Date())) + messagesSubject.send(.resumed(timestamp: Date())) } } @@ -279,7 +282,7 @@ Starting data capturing on paused service. Finishing paused measurements and sta */ public func changeModality(to modality: String) { lifecycleQueue.sync { - measurementMessages.send(.modalityChanged(to: modality)) + messagesSubject.send(.modalityChanged(to: modality)) } } } diff --git a/DataCapturing/Sources/DataCapturing/Capturing/Model/Message.swift b/DataCapturing/Sources/DataCapturing/Capturing/Model/Message.swift index bb2274f5..7f5426a7 100644 --- a/DataCapturing/Sources/DataCapturing/Capturing/Model/Message.swift +++ b/DataCapturing/Sources/DataCapturing/Capturing/Model/Message.swift @@ -69,6 +69,7 @@ public enum Message: CustomStringConvertible { /// The message sent if a new direction was captured. case capturedDirection(SensorValue) case started(timestamp: Date) + /// Sent after all sensor have stopped capturing data. case stopped(timestamp: Date) case paused(timestamp: Date) case resumed(timestamp: Date) @@ -77,3 +78,38 @@ public enum Message: CustomStringConvertible { case modalityChanged(to: String) case receivedNothingYet } + +extension Message: Equatable { + public static func == (lhs: Message, rhs: Message) -> Bool { + switch (lhs, rhs) { + case (.capturedLocation(let geoLocationLhs), .capturedLocation(let geoLocationRhs)): + return geoLocationLhs == geoLocationRhs + case (.capturedAltitude(let altitudeLhs), .capturedAltitude(let altitudeRhs)): + return altitudeLhs == altitudeRhs + case (.capturedAcceleration(let sensorValueLhs), .capturedAcceleration(let sensorValueRhs)): + return sensorValueLhs == sensorValueRhs + case (.capturedRotation(let sensorValueLhs), .capturedRotation(let sensorValueRhs)): + return sensorValueLhs == sensorValueRhs + case (.capturedDirection(let sensorValueLhs), .capturedDirection(let sensorValueRhs)): + return sensorValueLhs == sensorValueRhs + case (.started(let timestampLhs), .started(let timestampRhs)): + return timestampLhs == timestampRhs + case (.stopped(let timestampLhs), .stopped(let timestampRhs)): + return timestampLhs == timestampRhs + case (.paused(let timestampLhs), .paused(let timestampRhs)): + return timestampLhs == timestampRhs + case (.resumed(let timestampLhs), .resumed(let timestampRhs)): + return timestampLhs == timestampRhs + case (.hasFix, .hasFix): + return true + case (.fixLost, .fixLost): + return true + case (.modalityChanged(let toLhs), .modalityChanged(to: let toRhs)): + return toLhs == toRhs + case (.receivedNothingYet, .receivedNothingYet): + return true + default: + return false + } + } +} diff --git a/DataCapturing/Sources/DataCapturing/Model/Altitude.swift b/DataCapturing/Sources/DataCapturing/Model/Altitude.swift index 487dd53a..08c95473 100644 --- a/DataCapturing/Sources/DataCapturing/Model/Altitude.swift +++ b/DataCapturing/Sources/DataCapturing/Model/Altitude.swift @@ -24,6 +24,7 @@ import CoreData A struct to wrap all the information associated with a measured altitude provided by an altimeter. - Author: Klemens Muthmann + - version: 1.0.0 */ public class Altitude: CustomStringConvertible { /// The relative altitude change since the last measured value, in meters. @@ -47,3 +48,15 @@ public class Altitude: CustomStringConvertible { self.time = time } } + +extension Altitude: Equatable { + public static func == (lhs: Altitude, rhs: Altitude) -> Bool { + if lhs===rhs { + return true + } else { + return lhs.relativeAltitude.equal(rhs.relativeAltitude, precise: 3) && + lhs.pressure.equal(rhs.pressure, precise: 3) && + lhs.time == rhs.time + } + } +} diff --git a/DataCapturing/Sources/DataCapturing/Model/Double.swift b/DataCapturing/Sources/DataCapturing/Model/Double.swift new file mode 100644 index 00000000..c9b044a0 --- /dev/null +++ b/DataCapturing/Sources/DataCapturing/Model/Double.swift @@ -0,0 +1,22 @@ +// +// File.swift +// +// +// Created by Klemens Muthmann on 18.03.24. +// + +import Foundation + +public extension Double { + public func equal(_ value: Double, precise: Int) -> Bool { + let denominator: Double = pow(10.0, Double(precise)) + let maxDiff: Double = 1 / denominator + let realDiff: Double = self - value + + if fabs(realDiff) <= maxDiff { + return true + } else { + return false + } + } +} diff --git a/DataCapturing/Sources/DataCapturing/Model/FinishedMeasurement.swift b/DataCapturing/Sources/DataCapturing/Model/FinishedMeasurement.swift index 0b46fdd3..3cb740db 100644 --- a/DataCapturing/Sources/DataCapturing/Model/FinishedMeasurement.swift +++ b/DataCapturing/Sources/DataCapturing/Model/FinishedMeasurement.swift @@ -74,7 +74,7 @@ public class FinishedMeasurement: Hashable, Equatable { - parameter managedObject: The managed CoreData object to initialize this `Measurement` from. - throws: `InconstantData.locationOrderViolation` if the timestamps of the locations in this measurement are not strongly monotonically increasing. */ - convenience init(managedObject: MeasurementMO) throws { + public convenience init(managedObject: MeasurementMO) throws { let accelerationFile = SensorValueFile(fileType: .accelerationValueType, qualifier: String(managedObject.unsignedIdentifier)) let directionFile = SensorValueFile(fileType: .directionValueType, qualifier: String(managedObject.unsignedIdentifier)) let rotationFile = SensorValueFile(fileType: .rotationValueType, qualifier: String(managedObject.unsignedIdentifier)) diff --git a/DataCapturing/Sources/DataCapturing/Model/GeoLocation.swift b/DataCapturing/Sources/DataCapturing/Model/GeoLocation.swift index 9cbfd441..6fa8f1b2 100644 --- a/DataCapturing/Sources/DataCapturing/Model/GeoLocation.swift +++ b/DataCapturing/Sources/DataCapturing/Model/GeoLocation.swift @@ -123,3 +123,19 @@ public class GeoLocation: CustomStringConvertible { return distance(from: CLLocation(latitude: previousLocation.latitude, longitude: previousLocation.longitude)) } } + +extension GeoLocation: Equatable { + public static func == (lhs: GeoLocation, rhs: GeoLocation) -> Bool { + if lhs === rhs { + return true + } else { + return lhs.latitude.equal(rhs.latitude, precise: 6) && + lhs.longitude.equal(rhs.longitude, precise: 6) && + lhs.speed.equal(rhs.speed, precise: 2) && + lhs.accuracy.equal(rhs.accuracy, precise: 3) && + lhs.time == rhs.time && + lhs.altitude.equal(rhs.altitude, precise: 3) && + lhs.verticalAccuracy.equal(rhs.verticalAccuracy, precise: 3) + } + } +} diff --git a/DataCapturing/Sources/DataCapturing/Persistence/CapturedDataStorage.swift b/DataCapturing/Sources/DataCapturing/Persistence/CapturedDataStorage.swift index 35576ab5..32ea887a 100644 --- a/DataCapturing/Sources/DataCapturing/Persistence/CapturedDataStorage.swift +++ b/DataCapturing/Sources/DataCapturing/Persistence/CapturedDataStorage.swift @@ -30,35 +30,38 @@ import CoreData - version: 1.0.0 */ public protocol CapturedDataStorage { - /// Create a new measurement within the data store. - func createMeasurement(_ initialMode: String) throws -> UInt64 /// Subscribe to a running measurement and store the data produced by that measurement. + /// - Returns: The application wide unique identifier under which the new measurement is stored in the database. func subscribe( to measurement: Measurement, - _ identifier: UInt64, - _ receiveCompletion: @escaping (() -> Void)) throws + _ initialMode: String, + _ receiveCompletion: @escaping ((_ databaseIdentifier: UInt64) async -> Void)) throws -> UInt64 /// Stop receiving updates from the currently subscribed measurement or do nothing if this was not subscribed at the moment. func unsubscribe() } /** -An implementation of `CapturedDataStorage` for storing the data to a CoreData database. + An implementation of `CapturedDataStorage` for storing the data to a CoreData database. - author: Klemens Muthmann - version: 1.0.1 */ public class CapturedCoreDataStorage where SVFF.Serializable == [SensorValue] { + // MARK: - Properties /// The `DataStoreStack` to write the captured data to. let dataStoreStack: DataStoreStack /// A queue used to buffer received data until writing it as a bulk for performance reasons. let cachingQueue = DispatchQueue(label: "de.cyface.cache") /// The time interval to wait until the next batch of data is stored to the data storage. Increasing this time should improve performance but increases memory usage. let interval: TimeInterval - /// The *Combine* cancellables used so new values are transmitted. + /// The *Combine* cancellables used so new values are transmitted. References to this must be kept here, so that *Combine* does not stop the data flow. var cancellables = [AnyCancellable]() /// Creator for storing sensor values to a file. let sensorValueFileFactory: SVFF + /// A Publisher of messages sent by the persistence layer on storage events. + let persistenceMessages = PassthroughSubject() + // MARK: - Initializers /** - Parameter interval: The time interval to wait until the next batch of data is stored to the data storage. Increasing this time should improve performance but increases memory usage. */ @@ -71,11 +74,10 @@ public class CapturedCoreDataStorage where SVFF.Se self.interval = interval self.sensorValueFileFactory = sensorValueFileFactory } -} -extension CapturedCoreDataStorage: CapturedDataStorage { - - public func createMeasurement(_ initialMode: String) throws -> UInt64 { + // MARK: - Private Methods + /// Create a new measurement within the data store. + private func createMeasurement(_ initialMode: String) throws -> UInt64 { return try dataStoreStack.wrapInContextReturn { context in let time = Date() let measurementMO = MeasurementMO(context: context) @@ -90,120 +92,149 @@ extension CapturedCoreDataStorage: CapturedDataStorage { } } + private func load(measurement identifier: UInt64, from context: NSManagedObjectContext) throws -> MeasurementMO { + guard let measurementRequest = context.persistentStoreCoordinator?.managedObjectModel.fetchRequestFromTemplate( + withName: "measurementByIdentifier", + substitutionVariables: ["identifier": identifier] + ) else { + os_log( + "Unable to load measurement fetch request.", + log: OSLog.persistence, + type: .debug + ) + throw PersistenceError.measurementNotLoadable(identifier) + } + guard let measurementMO = try measurementRequest.execute().first as? MeasurementMO else { + os_log( + "Unable to load measurement to store to", + log: OSLog.persistence, + type: .debug + ) + throw PersistenceError.measurementNotLoadable(identifier) + } + + return measurementMO + } + + private func handle(messages: [Message], measurement identifier: UInt64) throws { + try self.dataStoreStack.wrapInContext { context in + let measurementMo = try self.load(measurement: identifier, from: context) + + let accelerationsFile = self.sensorValueFileFactory.create( + fileType: SensorValueFileType.accelerationValueType, + qualifier: String(measurementMo.unsignedIdentifier) + ) + let rotationsFile = self.sensorValueFileFactory.create( + fileType: SensorValueFileType.rotationValueType, + qualifier: String(measurementMo.unsignedIdentifier) + ) + let directionsFile = self.sensorValueFileFactory.create( + fileType: SensorValueFileType.directionValueType, + qualifier: String(measurementMo.unsignedIdentifier) + ) + + try messages.forEach { message in + switch message { + case .capturedLocation(let location): + self.store(location: location, to: measurementMo, context) + case .capturedAltitude(let altitude): + self.store(altitude: altitude, to: measurementMo, context) + case .capturedRotation(let rotation): + try self.store(rotation, to: rotationsFile) + case .capturedDirection(let direction): + try self.store(direction, to: directionsFile) + case .capturedAcceleration(let acceleration): + try self.store(acceleration, to: accelerationsFile) + case .started(timestamp: let time): + os_log("Storing started event to database.", log: OSLog.persistence, type: .debug) + measurementMo.addToTracks(TrackMO(context: context)) + measurementMo.addToEvents(EventMO(event: Event(time: time, type: .lifecycleStart), context: context)) + case .resumed(timestamp: let time): + measurementMo.addToTracks(TrackMO(context: context)) + measurementMo.addToEvents(EventMO(event: Event(time: time, type: .lifecycleResume), context: context)) + case .paused(timestamp: let time): + measurementMo.addToEvents(EventMO(event: Event(time: time, type: .lifecyclePause), context: context)) + case .stopped(timestamp: let time): + try self.onStop(measurement: measurementMo, context, time) + default: + os_log("Message %{PUBLIC}@ irrelevant for data storage and thus ignored.",log: OSLog.persistence, type: .debug, message.description) + } + } + } + } + + private func store(location: GeoLocation, to measurementMo: MeasurementMO, _ context: NSManagedObjectContext) { + os_log("Storing location to database.", log: OSLog.persistence, type: .debug) + if let lastTrack = measurementMo.typedTracks().last { + lastTrack.addToLocations(GeoLocationMO(location: location, context: context)) + } + } + + private func store(altitude: Altitude, to measurementMo: MeasurementMO, _ context: NSManagedObjectContext) { + if let lastTrack = measurementMo.typedTracks().last { + lastTrack.addToAltitudes(AltitudeMO(altitude: altitude, context: context)) + } + } + + private func store(_ value: SensorValue, to file: SVF) throws where SVF.Serializable == SVFF.Serializable { + do { + _ = try file.write(serializable: [value]) + } catch { + debugPrint("Unable to write data to file \(file.fileName)!") + throw error + } + } + + private func onStop(measurement measurementMo: MeasurementMO, _ context: NSManagedObjectContext, _ time: Date) throws { + os_log("Storing stopped event to database.", log: OSLog.persistence, type: .debug) + measurementMo.addToEvents(EventMO(event: Event(time: time, type: .lifecycleStop), context: context)) + measurementMo.synchronizable = true + try context.save() + os_log("Stored finished measurement.", log: OSLog.persistence, type: .debug) + } + +} + +// MARK: - Implementation of CapturedDataStorage Protocol +extension CapturedCoreDataStorage: CapturedDataStorage { + /// Recievie updates from the provided ``Measurement`` and store the data to a ``DataStoreStack``. public func subscribe( to measurement: Measurement, - _ identifier: UInt64, - _ receiveCompletion: @escaping (() -> Void) - ) throws { + _ initialMode: String, + _ receiveCompletion: @escaping ((_ databaseIdentifier: UInt64) async -> Void) + ) throws -> UInt64 { + let measurementIdentifier = try createMeasurement(initialMode) + let cachedFlow = measurement.measurementMessages.collect(.byTime(cachingQueue, 1.0)) cachedFlow - .sink(receiveCompletion: { _ in - os_log( - "Completing storage flow.", - log: OSLog.persistence, - type: .debug - ) - receiveCompletion() - }) { [weak self] (messages: [Message]) in - do { - try self?.dataStoreStack.wrapInContext { context in - guard let measurementRequest = context.persistentStoreCoordinator?.managedObjectModel.fetchRequestFromTemplate(withName: "measurementByIdentifier", substitutionVariables: ["identifier": identifier]) else { - os_log( - "Unable to load measurement fetch request.", - log: OSLog.persistence, - type: .debug - ) - return - } - guard let measurement = try measurementRequest.execute().first as? MeasurementMO else { - os_log( - "Unable to load measurement to store to", - log: OSLog.persistence, - type: .debug - ) - return - } - - let accelerationsFile = self?.sensorValueFileFactory.create( - fileType: SensorValueFileType.accelerationValueType, - qualifier: String(measurement.unsignedIdentifier) - ) - let rotationsFile = self?.sensorValueFileFactory.create( - fileType: SensorValueFileType.rotationValueType, - qualifier: String(measurement.unsignedIdentifier) - ) - let directionsFile = self?.sensorValueFileFactory.create( - fileType: SensorValueFileType.directionValueType, - qualifier: String(measurement.unsignedIdentifier) - ) - - try messages.forEach { message in - switch message { - case .capturedLocation(let location): - os_log( - "Storing location to database.", - log: OSLog.persistence, - type: .debug - ) - if let lastTrack = measurement.typedTracks().last { - lastTrack.addToLocations(GeoLocationMO(location: location, context: context)) - } - case .capturedAltitude(let altitude): - if let lastTrack = measurement.typedTracks().last { - lastTrack.addToAltitudes(AltitudeMO(altitude: altitude, context: context)) - } - case .capturedRotation(let rotation): - do { - _ = try rotationsFile?.write(serializable: [rotation]) - } catch { - debugPrint("Unable to write data to file \(String(describing: rotationsFile?.fileName))!") - throw error - } - case .capturedDirection(let direction): - do { - _ = try directionsFile?.write(serializable: [direction]) - } catch { - debugPrint("Unable to write data to file \(String(describing: directionsFile?.fileName))!") - throw error - } - case .capturedAcceleration(let acceleration): - do { - _ = try accelerationsFile?.write(serializable: [acceleration]) - } catch { - debugPrint("Unable to write data to file \(String(describing: accelerationsFile?.fileName))!") - throw error - } - case .started(timestamp: let time): - os_log("Storing started event to database.", log: OSLog.persistence, type: .debug) - measurement.addToTracks(TrackMO(context: context)) - measurement.addToEvents(EventMO(event: Event(time: time, type: .lifecycleStart), context: context)) - case .resumed(timestamp: let time): - measurement.addToTracks(TrackMO(context: context)) - measurement.addToEvents(EventMO(event: Event(time: time, type: .lifecycleResume), context: context)) - case .paused(timestamp: let time): - measurement.addToEvents(EventMO(event: Event(time: time, type: .lifecyclePause), context: context)) - case .stopped(timestamp: let time): - os_log("Storing stopped event to database.", log: OSLog.persistence, type: .debug) - measurement.addToEvents(EventMO(event: Event(time: time, type: .lifecycleStop), context: context)) - measurement.synchronizable = true - default: - os_log("Message %{PUBLIC}@ irrelevant for data storage and thus ignored.",log: OSLog.persistence, type: .debug, message.description) - } - } - - try context.save() + .sink(receiveCompletion: { status in + switch status { + case .finished: + os_log( + "Completing storage flow.", + log: OSLog.persistence, + type: .debug + ) + Task { + await receiveCompletion(measurementIdentifier) } + case .failure(let error): + os_log("Unable to complete measurement %@", log: OSLog.persistence, type: .error, error.localizedDescription) + } + } + ) { [weak self] (messages: [Message]) in + do { + try self?.handle(messages: messages, measurement: measurementIdentifier) } catch { os_log("Unable to store data! Error %{PUBLIC}@",log: OSLog.persistence ,type: .error, error.localizedDescription) } }.store(in: &cancellables) + + return measurementIdentifier } public func unsubscribe() { - cancellables.forEach { cancellable in - cancellable.cancel() - } cancellables.removeAll(keepingCapacity: true) } } diff --git a/DataCapturing/Sources/DataCapturing/Persistence/DataStoreStack.swift b/DataCapturing/Sources/DataCapturing/Persistence/DataStoreStack.swift index 52ca074f..cd405e87 100644 --- a/DataCapturing/Sources/DataCapturing/Persistence/DataStoreStack.swift +++ b/DataCapturing/Sources/DataCapturing/Persistence/DataStoreStack.swift @@ -43,13 +43,7 @@ public protocol DataStoreStack { /// /// This method must be called shortly after initialization and before any other interaction with a newly created `DataStoreStack`. func setup() async throws - /// Provide an underlaying ``PersistenceLayer``. - /// - /// This method is only here for backwards compatibility. - /// `PersistenceLayer` is scheduled to die pretty soon. - @available(*, deprecated, message: "PersistenceLayer should not be used any longer. Please write your own data interaction code using `wrapInContext` and `wrapInContextReturn`.") - func persistenceLayer() -> PersistenceLayer - /// This checks if a measurement with that identifier already exists and generates a new identifier until it finds one with no corresponding measurement. + /// This checks if a measurement with that identifier already exists and generates a new identifier until it finds one with no corresponding measurement. /// This is required to handle legacy data and installations, that still have measurements with falsely generated data. func nextValidIdentifier() throws -> UInt64 } @@ -151,10 +145,6 @@ public class CoreDataStack: DataStoreStack { } } - public func persistenceLayer() -> PersistenceLayer { - return CoreDataPersistenceLayer(onManager: self) - } - /// Load the specified `NSManagedObjectModel` from disk. public static func load(model: String = "CyfaceModel", bundle: Bundle) throws -> NSManagedObjectModel { guard let modelURL = bundle.url(forResource: model, withExtension: "momd") else { diff --git a/DataCapturing/Sources/DataCapturing/Persistence/SensorValueFileFactory.swift b/DataCapturing/Sources/DataCapturing/Persistence/SensorValueFileFactory.swift index 0a95af83..a368669e 100644 --- a/DataCapturing/Sources/DataCapturing/Persistence/SensorValueFileFactory.swift +++ b/DataCapturing/Sources/DataCapturing/Persistence/SensorValueFileFactory.swift @@ -48,10 +48,6 @@ public struct DefaultSensorValueFileFactory: SensorValueFileFactory { public typealias FileType = SensorValueFile - //public typealias Serializable = SensorValue - - //public typealias SpecificSerializer = SensorValueSerializer - public init() { // Nothing to do here. } diff --git a/DataCapturing/Sources/DataCapturing/Synchronization/Upload/Default/DefaultUploadProcess.swift b/DataCapturing/Sources/DataCapturing/Synchronization/Upload/Default/DefaultUploadProcess.swift index 6e2851d8..effe7b0b 100644 --- a/DataCapturing/Sources/DataCapturing/Synchronization/Upload/Default/DefaultUploadProcess.swift +++ b/DataCapturing/Sources/DataCapturing/Synchronization/Upload/Default/DefaultUploadProcess.swift @@ -26,6 +26,9 @@ import Combine It orchestrates the different requests required by the Cyface Upload Protocol. + Information about the current upload are distributed using the *Combine* Publisher ``DefaultUploadProcess/uploadStatus``. + For possible events see ``UploadStatusType``. + - author: Klemens Muthmann - version: 1.0.0 */ diff --git a/RFR-App/RFR.xcodeproj/project.pbxproj b/RFR-App/RFR.xcodeproj/project.pbxproj index cba513ba..9da69875 100644 --- a/RFR-App/RFR.xcodeproj/project.pbxproj +++ b/RFR-App/RFR.xcodeproj/project.pbxproj @@ -752,7 +752,7 @@ "$(inherited)", "@executable_path/Frameworks", ); - MARKETING_VERSION = 3.1.4; + MARKETING_VERSION = 3.1.5; PRODUCT_BUNDLE_IDENTIFIER = de.cyface.RFR; PRODUCT_NAME = "Ready for Robots"; SUPPORTED_PLATFORMS = "iphoneos iphonesimulator"; @@ -875,7 +875,7 @@ "$(inherited)", "@executable_path/Frameworks", ); - MARKETING_VERSION = 3.1.4; + MARKETING_VERSION = 3.1.5; PRODUCT_BUNDLE_IDENTIFIER = de.cyface.RFR.staging; PRODUCT_NAME = "Ready for Robots Staging"; SUPPORTED_PLATFORMS = "iphoneos iphonesimulator"; @@ -992,7 +992,7 @@ "$(inherited)", "@executable_path/Frameworks", ); - MARKETING_VERSION = 3.1.4; + MARKETING_VERSION = 3.1.5; PRODUCT_BUNDLE_IDENTIFIER = de.cyface.RFR.staging; PRODUCT_NAME = "Ready for Robots Staging"; SUPPORTED_PLATFORMS = "iphoneos iphonesimulator"; @@ -1109,7 +1109,7 @@ "$(inherited)", "@executable_path/Frameworks", ); - MARKETING_VERSION = 3.1.4; + MARKETING_VERSION = 3.1.5; PRODUCT_BUNDLE_IDENTIFIER = de.cyface.RFR; PRODUCT_NAME = "Ready for Robots"; SUPPORTED_PLATFORMS = "iphoneos iphonesimulator"; @@ -1734,7 +1734,7 @@ "$(inherited)", "@executable_path/Frameworks", ); - MARKETING_VERSION = 3.1.4; + MARKETING_VERSION = 3.1.5; PRODUCT_BUNDLE_IDENTIFIER = de.cyface.RFR.dev; PRODUCT_NAME = "Ready for Robots Development"; SUPPORTED_PLATFORMS = "iphoneos iphonesimulator"; @@ -1772,7 +1772,7 @@ "$(inherited)", "@executable_path/Frameworks", ); - MARKETING_VERSION = 3.1.4; + MARKETING_VERSION = 3.1.5; PRODUCT_BUNDLE_IDENTIFIER = de.cyface.RFR.dev; PRODUCT_NAME = "Ready for Robots Development"; SUPPORTED_PLATFORMS = "iphoneos iphonesimulator"; diff --git a/RFR-App/RFR/Live/LiveViewModel.swift b/RFR-App/RFR/Live/LiveViewModel.swift index 726fc0d2..c60433fa 100644 --- a/RFR-App/RFR/Live/LiveViewModel.swift +++ b/RFR-App/RFR/Live/LiveViewModel.swift @@ -67,137 +67,38 @@ class LiveViewModel: ObservableObject { return measurement } else { let measurement = MeasurementImpl() - measurement.measurementMessages - .receive(on: DispatchQueue.main) - .assign(to: &$message) - - registerLifecycleFlows(measurement) - // Only location captured events - let locationsFlow = measurement.measurementMessages.filter { if case Message.capturedLocation = $0 { return true } else { return false } }.compactMap {if case let Message.capturedLocation(location) = $0 { return location } else { return nil }} - // Use the most recent location to provide the speed value - locationsFlow.filter {location in location.speed >= 0.0 }.compactMap { location in "\(speedFormatter.string(from: location.speed as NSNumber) ?? "0.0") km/h" }.receive(on: RunLoop.main).assign(to: &$speed) - - // Organize all received locations into the local locations array, and stream that array for further processing - let trackFlow = locationsFlow - .compactMap { [weak self] location in - let endIndex = max((self?.locations.count ?? 0)-1, 0) - self?.locations[endIndex].append(location) - return self?.locations - } - // Calculate and store distance covered, by all the tracks from the current measurement. - let distanceFlow = trackFlow.map {(tracks: [[GeoLocation]]) in - return tracks - .map { track in - var trackLength = 0.0 - var prevLocation: GeoLocation? = nil - for location in track { - if let prevLocation = prevLocation { - trackLength += location.distance(from: prevLocation) - } - prevLocation = location - } - return trackLength - } - .reduce(0.0) { accumulator, next in - accumulator + next - } - } - distanceFlow.compactMap { - distanceFormatter.string(from: $0 as NSNumber) - }.map { formattedDistance in - "\(formattedDistance) km" - } - .receive(on: RunLoop.main) - .assign(to: &$distance) - // Calculate and store average speed over all the tracks from this measurement. - trackFlow.map { tracks in - Statistics.averageSpeed(timelines: tracks) - } - .filter { $0 >= 0.0} - .compactMap { - speedFormatter.string(from: $0 as NSNumber) - } - .map { formattedSpeed in - "\(formattedSpeed) km/h" - } - .receive(on: RunLoop.main) - .assign(to: &$averageSpeed) - - // Calculate and store the total duration for all the tracks in this measurement. - trackFlow - .map { tracks in - return Statistics.duration(timelines: tracks) - } - .compactMap { - timeFormatter.string(from: $0) - } - .receive(on: RunLoop.main) - .assign(to: &$duration) - - // Calculate the total rise for all the tracks in this measurement. - measurement.measurementMessages - .filter { - if case Message.capturedAltitude = $0 { - return true - } else { - return false - } - } - .compactMap { message in - if case let Message.capturedAltitude(altitude) = message { - return altitude - } else { - return nil - } - } - .compactMap { [weak self] (altitude: DataCapturing.Altitude) in - let endIndex = max((self?.altitudes.count ?? 0)-1, 0) - self?.altitudes[endIndex].append(altitude) - return self?.altitudes - } - .map { (tracks: [[DataCapturing.Altitude]]) in - return tracks.map { track in - os_log("Using altimeter values to calculate accumulated height.", log: OSLog.measurement, type: .debug) - var previousAltitude: Double? = nil - var sum = 0.0 - for altitude in track { - if let previousAltitude = previousAltitude { - let relativeAltitudeChange = altitude.relativeAltitude - previousAltitude - if relativeAltitudeChange > 0.1 { - sum += relativeAltitudeChange - } - } - previousAltitude = altitude.relativeAltitude - } - return sum - //} - } - .reduce(0.0) { accumulator, next in - accumulator + next - } - } - .compactMap { - riseFormatter.string(from: $0 as NSNumber) + // Forward finished messages so the UI can update accordingly. + let finishedMessages = finishedFlow() + let locationFlow = locationFlow() + let altitudeFlow = altitudeFlow() + let startedFlow = startFlow() + let stoppedFlow = stopFlow() + let pausedFlow = pauseFlow() + let resumedFlow = resumeFlow() + + // Send each received message to the correct stream + measurement.measurementMessages.sink { message in + switch message { + case .capturedLocation(let location): + locationFlow.send(location) + case .capturedAltitude(let altitude): + altitudeFlow.send(altitude) + case .started(timestamp: _): + startedFlow.send(MeasurementState.running) + case .stopped(timestamp: _): + stoppedFlow.send(MeasurementState.stopped) + case .paused(timestamp: _): + pausedFlow.send(MeasurementState.paused) + case .resumed(timestamp: _): + resumedFlow.send(MeasurementState.running) + case .finished(timestamp: _): + os_log("Routing finished event", log: OSLog.capturingEvent, type: .debug) + finishedMessages.send(message) + default: + os_log("Encountered unhandled message %@", log: OSLog.capturingEvent, type: .debug, message.description) } - .map { formattedRise in - "\(formattedRise) m" - } - .receive(on: RunLoop.main) - .assign(to: &$inclination) - - // - distanceFlow.map { - Statistics.avoidedEmissions($0) - } - .compactMap { - emissionsFormatter.string(from: $0 as NSNumber) - } - .map { formattedEmissions in - "\(formattedEmissions) g CO₂" - } - .receive(on: RunLoop.main) - .assign(to: &$avoidedEmissions) + }.store(in: &cancellables) self._measurement = measurement return measurement @@ -212,8 +113,8 @@ class LiveViewModel: ObservableObject { private var cancellables = [AnyCancellable]() /// Captures and publishes any error produced by this model. @Published var error: Error? - /// Always contains the most recent message received from the Cyface SDK. - @Published var message: Message = Message.receivedNothingYet + /// Always contains the most recent message received from the Cyface SDK. The measurements view listenes to this property to show updates to the live measurement if necessary. + @Published var finishedMessages: Message = Message.receivedNothingYet /** Initialize an object of this class. @@ -331,12 +232,6 @@ class LiveViewModel: ObservableObject { func onStopPressed() throws { if measurement.isRunning || measurement.isPaused { try measurement.stop() - self.cancellables.forEach { - $0.cancel() - } - self.cancellables.removeAll(keepingCapacity: true) - self._measurement = nil - self.dataStorageProcess.unsubscribe() } } @@ -345,24 +240,11 @@ class LiveViewModel: ObservableObject { */ func onPlayPressed() throws { if measurement.isPaused { - if let identifier = identifier { - try dataStorageProcess.subscribe( - to: measurement, - identifier - ) {} - - try measurement.resume() - } + try measurement.resume() } else if !measurement.isPaused && !measurement.isRunning{ // Is stopped - identifier = try dataStorageProcess.createMeasurement("BICYCLE") - if let identifier = identifier { - try dataStorageProcess.subscribe( - to: measurement, - identifier - ) {} - measurementName = String(localized: "measurement \(identifier)", comment: "Title label of a running measurement.") - try measurement.start() - } + let identifier = try dataStorageProcess.subscribe(to: measurement,"BICYCLE", onFinishedMeasurement) + measurementName = String(localized: "measurement \(identifier)", comment: "Title label of a running measurement.") + try measurement.start() } } @@ -373,71 +255,72 @@ class LiveViewModel: ObservableObject { } } - /// Register all the Combine flows required to capture lifecycle events such as `start`, `stop`, `pause` and `resume`. - private func registerLifecycleFlows(_ measurement: DataCapturing.Measurement) { - startFlow(measurement) - pauseFlow(measurement) - resumeFlow(measurement) - stopFlow(measurement) - + private func onFinishedMeasurement(_ databaseIdentifier: UInt64) { + os_log("Cleanup after measurement has finished", log: OSLog.measurement, type: .debug) + self.cancellables.removeAll(keepingCapacity: true) + self._measurement = nil + self.dataStorageProcess.unsubscribe() + finishedMessages = Message.finished(timestamp: Date.now) } /// Setup Combine flow to handle ``Measurement`` start events. - private func startFlow(_ measurement: DataCapturing.Measurement) { - // Setting state - let startedFlow = measurement.measurementMessages - .filter { if case Message.started = $0 { return true } else { return false }} - .map { _ in MeasurementState.running } - startedFlow + private func startFlow() -> PassthroughSubject { + let startFlow = PassthroughSubject() + // Setting State + startFlow .receive(on: RunLoop.main) .assign(to: &$measurementState) // Append collections for the first track - startedFlow + startFlow .sink { [weak self] _ in self?.locations.append([GeoLocation]()) self?.altitudes.append([DataCapturing.Altitude]()) } .store(in: &cancellables) + + return startFlow } /// Setup Combine flow to handle ``Measurement`` pause events. - private func pauseFlow(_ measurement: DataCapturing.Measurement) { + private func pauseFlow() -> PassthroughSubject { // Handle pause event - measurement.measurementMessages - .filter { if case Message.paused = $0 { return true } else { return false }} - .map { _ in MeasurementState.paused} + let pauseFlow = PassthroughSubject() + // Setting state + pauseFlow .receive(on: RunLoop.main) .assign(to: &$measurementState) + + return pauseFlow } /// Setup Combine flow to handle ``Measurement`` resume events. - private func resumeFlow(_ measurement: DataCapturing.Measurement) { - let resumedFlow = measurement.measurementMessages - .filter { if case Message.resumed = $0 { return true } else { return false }} - .map { _ in MeasurementState.running} - resumedFlow - .receive(on: RunLoop.main).assign(to: &$measurementState) + private func resumeFlow() -> PassthroughSubject { + let resumeFlow = PassthroughSubject() + // Setting state + resumeFlow + .receive(on: RunLoop.main) + .assign(to: &$measurementState) // Append collections for the next track - resumedFlow + resumeFlow .sink { [weak self] _ in self?.locations.append([GeoLocation]()) self?.altitudes.append([DataCapturing.Altitude]()) } .store(in: &cancellables) + + return resumeFlow } /// Setup Combine flow to handle ``Measurement`` stop events. - private func stopFlow(_ measurement: DataCapturing.Measurement) { - let stoppedEvents = measurement.measurementMessages - .filter { if case Message.stopped = $0 { return true} else { return false }} - .map { _ in MeasurementState.stopped} + private func stopFlow() -> PassthroughSubject { + let stoppedFlow = PassthroughSubject() // Setting state - stoppedEvents + stoppedFlow .receive(on: RunLoop.main) .assign(to: &$measurementState) // Clean state of this model. // Clear storage for altitudes and locations. - stoppedEvents + stoppedFlow .sink {[weak self] _ in os_log("Cleanup after Stop.") @@ -445,6 +328,141 @@ class LiveViewModel: ObservableObject { self?.altitudes.removeAll(keepingCapacity: true) } .store(in: &cancellables) + + return stoppedFlow + } + + private func finishedFlow() -> PassthroughSubject { + let finishedMessages = PassthroughSubject() + finishedMessages + .receive(on: RunLoop.main) + .assign(to: &$finishedMessages) + finishedMessages.sink { _ in + os_log("Cleanup after measurement has finished", log: OSLog.measurement, type: .debug) + /*self.cancellables.forEach { + $0.cancel() + }*/ + self.cancellables.removeAll(keepingCapacity: true) + self._measurement = nil + self.dataStorageProcess.unsubscribe() + }.store(in: &cancellables) + + return finishedMessages + } + + private func locationFlow() -> PassthroughSubject { + let locationFlow = PassthroughSubject() + // Use the most recent location to provide the speed value + locationFlow.filter {location in location.speed >= 0.0 }.compactMap { location in "\(speedFormatter.string(from: location.speed as NSNumber) ?? "0.0") km/h" }.receive(on: RunLoop.main).assign(to: &$speed) + // Organize all received locations into the local locations array, and stream that array for further processing + let trackFlow = locationFlow + .compactMap { [weak self] location in + let endIndex = max((self?.locations.count ?? 0)-1, 0) + self?.locations[endIndex].append(location) + return self?.locations + } + // Calculate and store distance covered, by all the tracks from the current measurement. + let distanceFlow = trackFlow.map {(tracks: [[GeoLocation]]) in + return tracks + .map { track in + var trackLength = 0.0 + var prevLocation: GeoLocation? = nil + for location in track { + if let prevLocation = prevLocation { + trackLength += location.distance(from: prevLocation) + } + prevLocation = location + } + return trackLength + } + .reduce(0.0) { accumulator, next in + accumulator + next + } + } + distanceFlow.compactMap { + distanceFormatter.string(from: $0 as NSNumber) + }.map { formattedDistance in + "\(formattedDistance) km" + } + .receive(on: RunLoop.main) + .assign(to: &$distance) + // Calculate and store average speed over all the tracks from this measurement. + trackFlow.map { tracks in + Statistics.averageSpeed(timelines: tracks) + } + .filter { $0 >= 0.0} + .compactMap { + speedFormatter.string(from: $0 as NSNumber) + } + .map { formattedSpeed in + "\(formattedSpeed) km/h" + } + .receive(on: RunLoop.main) + .assign(to: &$averageSpeed) + // Calculate avoided emissions + distanceFlow.map { + Statistics.avoidedEmissions($0) + } + .compactMap { + emissionsFormatter.string(from: $0 as NSNumber) + } + .map { formattedEmissions in + "\(formattedEmissions) g CO₂" + } + .receive(on: RunLoop.main) + .assign(to: &$avoidedEmissions) + // Calculate and store the total duration for all the tracks in this measurement. + trackFlow + .map { tracks in + return Statistics.duration(timelines: tracks) + } + .compactMap { + timeFormatter.string(from: $0) + } + .receive(on: RunLoop.main) + .assign(to: &$duration) + + return locationFlow + } + + private func altitudeFlow() -> PassthroughSubject { + let altitudeFlow = PassthroughSubject() + altitudeFlow + .compactMap { [weak self] (altitude: DataCapturing.Altitude) in + let endIndex = max((self?.altitudes.count ?? 0)-1, 0) + self?.altitudes[endIndex].append(altitude) + return self?.altitudes + } + .map { (tracks: [[DataCapturing.Altitude]]) in + return tracks.map { track in + os_log("Using altimeter values to calculate accumulated height.", log: OSLog.measurement, type: .debug) + var previousAltitude: Double? = nil + var sum = 0.0 + for altitude in track { + if let previousAltitude = previousAltitude { + let relativeAltitudeChange = altitude.relativeAltitude - previousAltitude + if relativeAltitudeChange > 0.1 { + sum += relativeAltitudeChange + } + } + previousAltitude = altitude.relativeAltitude + } + return sum + } + .reduce(0.0) { accumulator, next in + accumulator + next + } + } + .compactMap { + riseFormatter.string(from: $0 as NSNumber) + } + .map { formattedRise in + "\(formattedRise) m" + } + .receive(on: RunLoop.main) + .assign(to: &$inclination) + + return altitudeFlow } } diff --git a/RFR-App/RFR/Measurements/Measurement.swift b/RFR-App/RFR/Measurements/Measurement.swift index c041cef7..0482e040 100644 --- a/RFR-App/RFR/Measurements/Measurement.swift +++ b/RFR-App/RFR/Measurements/Measurement.swift @@ -209,10 +209,11 @@ enum SynchronizationState { /// Convert a database `MeasurementMO` to its `SynchronizationState` static func from(measurement: MeasurementMO) -> SynchronizationState { let request = UploadSession.fetchRequest() - request.predicate = NSPredicate(format: "measurement.identifier=%d", measurement.identifier) + request.predicate = NSPredicate(format: "measurement=%@", measurement) request.fetchLimit = 1 do { - if try request.execute().first != nil { + let sessions = try request.execute() + if !sessions.isEmpty { return .synchronizing } else if measurement.synchronized { return .synchronized diff --git a/RFR-App/RFR/Measurements/MeasurementsViewModel.swift b/RFR-App/RFR/Measurements/MeasurementsViewModel.swift index e67fce0e..caf42a98 100644 --- a/RFR-App/RFR/Measurements/MeasurementsViewModel.swift +++ b/RFR-App/RFR/Measurements/MeasurementsViewModel.swift @@ -184,7 +184,7 @@ class MeasurementsViewModel: ObservableObject { longitudinalMeters: eastWestReach + eastWestReach * 0.15 ), track: coordinates - ) + ) } /// Causes a reload and redrawn each time measurement data changes. @@ -192,6 +192,7 @@ class MeasurementsViewModel: ObservableObject { try dataStoreStack.wrapInContext { context in let measurementRequest = MeasurementMO.fetchRequest() let storedMeasurements = try measurementRequest.execute() + os_log("Loaded changed measurements", log: OSLog.measurement, type: .debug) storedMeasurements.filter { storedMeasurement in for measurement in measurements { @@ -201,9 +202,10 @@ class MeasurementsViewModel: ObservableObject { } return true }.forEach { filtered in + let measurement = self.load(measurement: filtered) DispatchQueue.main.async { [weak self] in if let self = self { - self.measurements.append(self.load(measurement: filtered)) + self.measurements.append(measurement) updateStatistics() } } @@ -292,23 +294,19 @@ extension MeasurementsViewModel: DataCapturingMessageSubscriber { func subscribe(to messages: some Publisher) { Task { os_log("Subscribing measurements view model to updates from live view!", log: OSLog.measurement, type: .debug) - measurementEventsSubscription = messages.filter { - os_log("Filtering for messages for stopped event! Received %@", log: OSLog.measurement, type: .debug, $0.description) - if case Message.stopped(timestamp: _) = $0 { - os_log("Receiving stopped event", log: OSLog.measurement, type: .debug) - return true - } else { - return false - - } - }.sink { [weak self] message in + measurementEventsSubscription = messages.sink { [weak self] message in + switch message { + case .finished(timestamp: _): os_log("Updating view model with new measurement.") do { try self?.onMeasurementsChanged() } catch { self?.error = error } + default: + os_log("Received invalid message %@", log: OSLog.measurement, type: .debug, message.description) } + } } } } diff --git a/RFR-App/RFR/ViewModel/DataCapturingViewModel.swift b/RFR-App/RFR/ViewModel/DataCapturingViewModel.swift index 7e84e60a..4b959f04 100644 --- a/RFR-App/RFR/ViewModel/DataCapturingViewModel.swift +++ b/RFR-App/RFR/ViewModel/DataCapturingViewModel.swift @@ -38,7 +38,7 @@ class DataCapturingViewModel: ObservableObject { let measurementsViewModel: MeasurementsViewModel /// The view model used to handle measurement synchronization. let syncViewModel: SynchronizationViewModel - // TODO: All of this only concerns the `SynchronizationViewModel` and thus should only appear there. + // TODO: All of this only concerns the `SynchronizationViewModel` and thus should only appear there. --> Actually this class seems to have become unnecessary. Putting stuff together should happen in the AppModel only. Further down only dependency injection should be necessary. /// The authenticator used to authenticate for data uploads let authenticator: Authenticator @@ -59,7 +59,7 @@ class DataCapturingViewModel: ObservableObject { dataStoreStack: dataStoreStack, uploadPublisher: syncViewModel.uploadStatusPublisher ) - measurementsViewModel.subscribe(to: liveViewModel.$message) + measurementsViewModel.subscribe(to: liveViewModel.$finishedMessages) self.dataStoreStack = dataStoreStack try dataStoreStack.wrapInContext { context in let request = MeasurementMO.fetchRequest() diff --git a/RFR-App/RFR/ViewModel/SynchronizationViewModel.swift b/RFR-App/RFR/ViewModel/SynchronizationViewModel.swift index fcaefe3c..ba1f4847 100644 --- a/RFR-App/RFR/ViewModel/SynchronizationViewModel.swift +++ b/RFR-App/RFR/ViewModel/SynchronizationViewModel.swift @@ -63,7 +63,11 @@ class SynchronizationViewModel: NSObject, ObservableObject { self?.uploadStatusPublisher.send(UploadStatus(measurement: status.upload.measurement, status: status.status)) } do { - let measurements = try dataStoreStack.persistenceLayer().loadSynchronizableMeasurements() + let measurements = try dataStoreStack.wrapInContextReturn { context in + let request = MeasurementMO.fetchRequest() + request.predicate = NSPredicate(format: "synchronizable=%@ AND synchronized=%@", NSNumber(booleanLiteral: true), NSNumber(booleanLiteral: false)) + return try request.execute().map { try FinishedMeasurement(managedObject: $0)} + } os_log("Sync: Synchronizing %d measurements!", log: OSLog.synchronization, type: .debug, measurements.count) for measurement in measurements { os_log(.debug, log: OSLog.synchronization, "Sync: Starting synchronization of measurement %d!", measurement.identifier)