From 223f2b2351a9efd3f79e375d3b32d54093cb4584 Mon Sep 17 00:00:00 2001 From: Armin Date: Tue, 24 Oct 2023 16:04:35 +0200 Subject: [PATCH] Refactor onPerformSync to lower complexity --- .../de/cyface/synchronization/SyncAdapter.kt | 413 ++++++++++-------- 1 file changed, 235 insertions(+), 178 deletions(-) diff --git a/synchronization/src/main/kotlin/de/cyface/synchronization/SyncAdapter.kt b/synchronization/src/main/kotlin/de/cyface/synchronization/SyncAdapter.kt index 477c456ae..2cfeb950f 100644 --- a/synchronization/src/main/kotlin/de/cyface/synchronization/SyncAdapter.kt +++ b/synchronization/src/main/kotlin/de/cyface/synchronization/SyncAdapter.kt @@ -109,217 +109,274 @@ class SyncAdapter private constructor( authority: String, provider: ContentProviderClient, syncResult: SyncResult ) { + if (shouldAbortSyncRequest(account, authority, extras)) return - // This allows us to mock the #isConnected() check for unit tests - mockIsConnectedToReturnTrue = extras.containsKey(MOCK_IS_CONNECTED_TO_RETURN_TRUE) - if (isSyncRequestAborted(account, authority)) { - return - } Log.d(TAG, "Sync started") val context = context - val serializer = MeasurementSerializer() - val persistence = - DefaultPersistenceLayer( - context, - DefaultPersistenceBehaviour() - ) + // Ensure sync errors are shown to the user when triggering sync manually val fromBackground = !extras.getBoolean(SYNC_EXTRAS_MANUAL) val syncPerformer = SyncPerformer(context, fromBackground) - + val persistence = initPersistenceLayer() // Ensure user is authorized before starting synchronization authenticator.performActionWithFreshTokens { _, _, ex -> if (ex != null) { - Log.w(TAG, ex.javaClass.simpleName + ": " + ex.message) + handleAuthenticationError(ex, syncResult, fromBackground) + return@performActionWithFreshTokens + } + + try { + val deviceId = persistence.restoreOrCreateDeviceId() + notifySyncStarted() + + val syncableMeasurements = loadSyncableMeasurements(persistence) + if (syncableMeasurements.isEmpty()) return@performActionWithFreshTokens + + processMeasurements(syncableMeasurements, syncPerformer, persistence, deviceId, syncResult, fromBackground, account, authority, provider) + } catch (e: Exception) { + handleSyncExceptions(e, syncResult, fromBackground) + } finally { + finalizeSync(syncResult) + } + } + } + + private fun finalizeSync(syncResult: SyncResult) { + Log.d(TAG,"Sync finished. (${if (syncResult.hasError()) "ERROR" else "success"})") + for (listener in progressListener) { + listener.onSyncFinished() + } + } + + private fun handleSyncExceptions( + e: Exception, + syncResult: SyncResult, + fromBackground: Boolean + ) { + Log.w(TAG, e.javaClass.simpleName + ": " + e.message) + when (e) { + is CursorIsNullException -> { + syncResult.databaseError = true + ErrorHandler.sendErrorIntent( + context, + ErrorCode.DATABASE_ERROR.code, + e.message, + fromBackground + ) + } + + is AuthenticatorException -> { syncResult.stats.numAuthExceptions++ ErrorHandler.sendErrorIntent( context, ErrorCode.AUTHENTICATION_ERROR.code, - ex.message, + e.message, fromBackground ) - } else { - try { - val deviceId = persistence.restoreOrCreateDeviceId() + } - // Inform ConnectionStatusListener - for (listener in progressListener) { - listener.onSyncStarted() - } + is SynchronizationInterruptedException -> { + syncResult.stats.numIoExceptions++ + ErrorHandler.sendErrorIntent( + context, + ErrorCode.SYNCHRONIZATION_INTERRUPTED.code, + e.message, + fromBackground + ) + } - // Load all Measurements ready for synchronization - val partiallyUploaded = - persistence.loadMeasurements(MeasurementStatus.UPLOADING) - val finishedMeasurements = - persistence.loadMeasurements(MeasurementStatus.FINISHED) - val syncableMeasurements = partiallyUploaded + finishedMeasurements - if (syncableMeasurements.isEmpty()) { - return@performActionWithFreshTokens // nothing to sync - } - val measurementCount = syncableMeasurements.size - var error = false - for (index in 0 until measurementCount) { - val measurement = syncableMeasurements[index] - if (error) { - break - } - Log.d(TAG, "Preparing Measurement (id ${measurement.id}) for upload.",) - - // Ensure the measurement is supported - val format = measurement.fileFormatVersion - Validate.isTrue(format == DefaultPersistenceLayer.PERSISTENCE_FILE_FORMAT_VERSION) - - // Load measurement data - val metaData = loadMetaData(measurement, persistence, deviceId, context) - - // Load, try to sync the file to be transferred and clean it up afterwards - var compressedTransferTempFile: File? = null - try { - runBlocking { - compressedTransferTempFile = - serializer.writeSerializedCompressed( - measurement.id, - persistence - ) - } + is NetworkErrorException -> { + syncResult.stats.numIoExceptions++ + // No need to sendErrorIntent() as CyfaceAuthenticator already throws more specific error + } - // Check whether the network settings changed to avoid using metered network without permission - if (isSyncRequestAborted(account, authority) || error) { - return@performActionWithFreshTokens - } + // This was newly added, so this might need some additional testing + else -> { + syncResult.stats.numIoExceptions++ + ErrorHandler.sendErrorIntent( + context, + ErrorCode.UNKNOWN.code, + e.message, + fromBackground + ) + } + } + } - // Synchronize measurement - val processListener = object : UploadProgressListener { - override fun updatedProgress(percent: Float) { - // Multi-measurement progress - val progressPerMeasurement = - 100.0 / measurementCount.toDouble() - val progressBeforeThis = - index.toDouble() * progressPerMeasurement - val lastMeasurement = index == measurementCount - 1 - val total = - if (lastMeasurement && percent.toDouble() == 1.0) 100.0 else progressBeforeThis + percent * progressPerMeasurement - for (listener in progressListener) { - listener.onProgress(total.toFloat(), measurement.id) - } - } - } + private fun processMeasurements( + measurements: List, + syncPerformer: SyncPerformer, + persistence: DefaultPersistenceLayer, + deviceId: String, + syncResult: SyncResult, + fromBackground: Boolean, + account: Account, + authority: String, + provider: ContentProviderClient + ) { + val serializer = MeasurementSerializer() + val measurementCount = measurements.size + var error = false + + for (index in 0 until measurementCount) { + val measurement = measurements[index] + if (error) { + break + } + Log.d(TAG, "Preparing Measurement (id ${measurement.id}) for upload.",) + + // Ensure the measurement is supported + val format = measurement.fileFormatVersion + Validate.isTrue(format == DefaultPersistenceLayer.PERSISTENCE_FILE_FORMAT_VERSION) + + // Load measurement data + val metaData = loadMetaData(measurement, persistence, deviceId, context) + + // Load, try to sync the file to be transferred and clean it up afterwards + var compressedTransferTempFile: File? = null + try { + runBlocking { + compressedTransferTempFile = + serializer.writeSerializedCompressed( + measurement.id, + persistence + ) + } + + // Check whether the network settings changed to avoid using metered network without permission + if (isSyncRequestAborted(account, authority) || error) { + return + } + + // Synchronize measurement + val processListener = object : UploadProgressListener { + override fun updatedProgress(percent: Float) { + // Multi-measurement progress + val progressPerMeasurement = + 100.0 / measurementCount.toDouble() + val progressBeforeThis = + index.toDouble() * progressPerMeasurement + val lastMeasurement = index == measurementCount - 1 + val total = + if (lastMeasurement && percent.toDouble() == 1.0) 100.0 else progressBeforeThis + percent * progressPerMeasurement + for (listener in progressListener) { + listener.onProgress(total.toFloat(), measurement.id) + } + } + } - // This is now executed asynchronously! - authenticator.performActionWithFreshTokens { accessToken, _, e -> - if (e != null) { - Log.w(TAG, e.javaClass.simpleName + ": " + e.message) - syncResult.stats.numAuthExceptions++ - ErrorHandler.sendErrorIntent( - context, - ErrorCode.AUTHENTICATION_ERROR.code, - e.message, - fromBackground + // This is now executed asynchronously! + authenticator.performActionWithFreshTokens { accessToken, _, e -> + if (e != null) { + Log.w(TAG, e.javaClass.simpleName + ": " + e.message) + syncResult.stats.numAuthExceptions++ + ErrorHandler.sendErrorIntent( + context, + ErrorCode.AUTHENTICATION_ERROR.code, + e.message, + fromBackground + ) + } else { + val result = syncPerformer.sendData( + uploader, + syncResult, + metaData, + compressedTransferTempFile!!, + processListener, + accessToken!! + ) + if (result == Result.UPLOAD_FAILED) { + error = true + } else { + + // Mark successfully transmitted measurement as synced + try { + if (result == Result.UPLOAD_SKIPPED) { + persistence.markFinishedAs( + MeasurementStatus.SKIPPED, + measurement.id + ) + } else if (result == Result.UPLOAD_SUCCESSFUL) { + persistence.markFinishedAs( + MeasurementStatus.SYNCED, + measurement.id ) } else { - val result = syncPerformer.sendData( - uploader, - syncResult, - metaData, - compressedTransferTempFile!!, - processListener, - accessToken!! + throw IllegalArgumentException( + String.format( + "Unknown result: %s", + result + ) ) - if (result == Result.UPLOAD_FAILED) { - error = true - } else { - - // Mark successfully transmitted measurement as synced - try { - if (result == Result.UPLOAD_SKIPPED) { - persistence.markFinishedAs( - MeasurementStatus.SKIPPED, - measurement.id - ) - } else if (result == Result.UPLOAD_SUCCESSFUL) { - persistence.markFinishedAs( - MeasurementStatus.SYNCED, - measurement.id - ) - } else { - throw IllegalArgumentException( - String.format( - "Unknown result: %s", - result - ) - ) - } - Log.d( - Constants.TAG, String.format( - "Measurement marked as %s.", - result.toString().lowercase() - ) - ) - } catch (e: NoSuchMeasurementException) { - throw IllegalStateException(e) - } - } } - } - } finally { - if (compressedTransferTempFile != null && compressedTransferTempFile!!.exists()) { - Validate.isTrue(compressedTransferTempFile!!.delete()) - } - if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.N) { - provider.close() - } else { - provider.release() + Log.d( + Constants.TAG, String.format( + "Measurement marked as %s.", + result.toString().lowercase() + ) + ) + } catch (e: NoSuchMeasurementException) { + throw IllegalStateException(e) } } } - } catch (e: CursorIsNullException) { - Log.w(TAG, e.javaClass.simpleName + ": " + e.message) - syncResult.databaseError = true - ErrorHandler.sendErrorIntent( - context, - ErrorCode.DATABASE_ERROR.code, - e.message, - fromBackground - ) - } catch (e: AuthenticatorException) { - Log.w(TAG, e.javaClass.simpleName + ": " + e.message) - syncResult.stats.numAuthExceptions++ - ErrorHandler.sendErrorIntent( - context, - ErrorCode.AUTHENTICATION_ERROR.code, - e.message, - fromBackground - ) - } catch (e: SynchronizationInterruptedException) { - Log.w(TAG, e.javaClass.simpleName + ": " + e.message) - syncResult.stats.numIoExceptions++ - ErrorHandler.sendErrorIntent( - context, - ErrorCode.SYNCHRONIZATION_INTERRUPTED.code, - e.message, - fromBackground - ) - } catch (e: NetworkErrorException) { - Log.w(TAG, e.javaClass.simpleName + ": " + e.message) - syncResult.stats.numIoExceptions++ - // No need to sendErrorIntent() as CyfaceAuthenticator already throws more specific error - } finally { - Log.d( - TAG, - String.format( - "Sync finished. (%s)", - if (syncResult.hasError()) "ERROR" else "success" - ) - ) - for (listener in progressListener) { - listener.onSyncFinished() - } + } + } finally { + if (compressedTransferTempFile != null && compressedTransferTempFile!!.exists()) { + Validate.isTrue(compressedTransferTempFile!!.delete()) + } + if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.N) { + provider.close() + } else { + provider.release() } } } } + private fun loadSyncableMeasurements(persistence: DefaultPersistenceLayer): List { + val partiallyUploaded = persistence.loadMeasurements(MeasurementStatus.UPLOADING) + val finishedMeasurements = persistence.loadMeasurements(MeasurementStatus.FINISHED) + return partiallyUploaded + finishedMeasurements // Returns the partially uploaded measurements first + } + + private fun notifySyncStarted() { + for (listener in progressListener) { + listener.onSyncStarted() + } + } + + private fun handleAuthenticationError( + e: Exception, + syncResult: SyncResult, + fromBackground: Boolean + ) { + Log.w(TAG, e.javaClass.simpleName + ": " + e.message) + syncResult.stats.numAuthExceptions++ + ErrorHandler.sendErrorIntent( + context, + ErrorCode.AUTHENTICATION_ERROR.code, + e.message, + fromBackground + ) + } + + private fun initPersistenceLayer(): DefaultPersistenceLayer { + return DefaultPersistenceLayer( + context, + DefaultPersistenceBehaviour() + ) + } + + /** + * This allows us to mock the #isConnected() check for unit tests + */ + private fun shouldAbortSyncRequest(account: Account, authority: String, extras: Bundle): Boolean { + mockIsConnectedToReturnTrue = extras.containsKey(MOCK_IS_CONNECTED_TO_RETURN_TRUE) + return isSyncRequestAborted(account, authority) + } + /** * Checks whether the network was disconnected or the synchronization was interrupted. *