Skip to content

Commit

Permalink
Merge branch 'v0.9.9'
Browse files Browse the repository at this point in the history
  • Loading branch information
Woohyeok Choi committed Feb 15, 2020
2 parents 64d2e11 + c1ebc83 commit ec305c9
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 438 deletions.
2 changes: 1 addition & 1 deletion app/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ android {
buildConfigField("String", "DB_NAME", "\"abc-logger\"")
buildConfigField("String", "PREF_NAME", "\"abc-logger-pref\"")
buildConfigField("Long", "DB_MAX_SIZE", "3145728L")
buildConfigField("Boolean", "IS_TEST_MODE", "false")
buildConfigField("Boolean", "IS_TEST_MODE", "true")
buildConfigField("String", "SERVER_ADDRESS", "\"143.248.100.24:50052\"")
buildConfigField("String", "TEST_SERVER_ADDRESS", "\"143.248.90.87:50055\"")
}
Expand Down
1 change: 0 additions & 1 deletion app/src/main/AndroidManifest.xml
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@
</service>

<service android:name=".AbcCollector$LoggerService" />
<service android:name=".SyncWorker$CancelIntentService" />

<receiver
android:name=".AbcCollector$BootReceiver"
Expand Down
94 changes: 44 additions & 50 deletions app/src/main/kotlin/kaist/iclab/abclogger/SyncWorker.kt
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package kaist.iclab.abclogger

import android.app.ActivityManager
import android.app.IntentService
import android.app.PendingIntent
import android.content.Context
import android.content.Intent
import android.os.SystemClock
import android.util.Log
import androidx.core.app.NotificationCompat
import androidx.work.*
import com.google.common.util.concurrent.MoreExecutors
import io.grpc.ManagedChannel
import io.grpc.android.AndroidChannelBuilder
import kaist.iclab.abclogger.collector.Base
Expand All @@ -31,16 +30,14 @@ import kaist.iclab.abclogger.collector.survey.SurveyEntity
import kaist.iclab.abclogger.collector.traffic.DataTrafficEntity
import kaist.iclab.abclogger.collector.wifi.WifiEntity
import kaist.iclab.abclogger.commons.Notifications
import kaist.iclab.abclogger.grpc.DataOperationsCoroutineGrpc
import kaist.iclab.abclogger.grpc.DataOperationsGrpc
import kaist.iclab.abclogger.grpc.DatumProto
import kotlinx.coroutines.*
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.asExecutor
import java.util.concurrent.Semaphore
import java.util.concurrent.TimeUnit

class SyncWorker(context: Context, params: WorkerParameters) : CoroutineWorker(context, params) {
private val cancelIntent: PendingIntent = PendingIntent.getService(
applicationContext, REQUEST_CODE_CANCEL_SYNC, Intent(ACTION_CANCEL_SYNC), PendingIntent.FLAG_UPDATE_CURRENT
)

class SyncWorker(context: Context, params: WorkerParameters) : Worker(context, params) {
private val foregroundInfo = ForegroundInfo(
Notifications.ID_SYNC_PROGRESS,
Notifications.build(
Expand All @@ -50,7 +47,6 @@ class SyncWorker(context: Context, params: WorkerParameters) : CoroutineWorker(c
text = applicationContext.getString(R.string.ntf_text_sync),
progress = 0,
indeterminate = true,
intent = cancelIntent,
actions = listOf(
NotificationCompat.Action.Builder(
R.drawable.baseline_close_white_24,
Expand All @@ -61,9 +57,8 @@ class SyncWorker(context: Context, params: WorkerParameters) : CoroutineWorker(c
)
)

override suspend fun doWork(): Result = withContext(Dispatchers.IO) {
setForeground(foregroundInfo)

override fun doWork(): Result {
setForegroundAsync(foregroundInfo)

val channel: ManagedChannel = AndroidChannelBuilder
.forTarget(if (BuildConfig.IS_TEST_MODE) BuildConfig.TEST_SERVER_ADDRESS else BuildConfig.SERVER_ADDRESS)
Expand All @@ -72,7 +67,7 @@ class SyncWorker(context: Context, params: WorkerParameters) : CoroutineWorker(c
.executor(Dispatchers.IO.asExecutor())
.build()

val stub = DataOperationsCoroutineGrpc.newStubWithContext(channel)
val stub = DataOperationsGrpc.newFutureStub(channel)

uploadAll(stub)

Expand All @@ -84,37 +79,45 @@ class SyncWorker(context: Context, params: WorkerParameters) : CoroutineWorker(c
e.printStackTrace()
}

return@withContext Result.success()
return Result.success()
}

private suspend inline fun <reified T : Base> upload(stub: DataOperationsCoroutineGrpc.DataOperationsCoroutineStub) = coroutineScope {
val ids = ObjBox.query<T>()?.build()?.findIds() ?: return@coroutineScope
val size = ids.size

(0 until size step N_UPLOADS).forEach { offset ->
while (isLowMemory()) {
Log.d("SyncWorker", "isLowMemory...")
System.runFinalization()
System.gc()
delay(TimeUnit.SECONDS.toMillis(5))
private inline fun <reified T : Base> upload(stub: DataOperationsGrpc.DataOperationsFutureStub) {
val limiter = Semaphore(N_UPLOADS)
val ids = ObjBox.query<T>()?.build()?.findIds() ?: return

ids.forEach { id ->
if (isStopped) {
return
}

val deadlineStub = stub.withDeadlineAfter(1, TimeUnit.MINUTES)
try {
while (isLowMemory()) {
System.runFinalization()
System.gc()
SystemClock.sleep(TimeUnit.SECONDS.toMillis(10))
}

(offset..offset + N_UPLOADS).map { index ->
async {
try {
val id = ids[index]
val entity = ObjBox.get<T>(id)
?: throw Exception("No corresponding entity.")
val proto = toProto(entity) ?: throw Exception("No corresponding protobuf.")
limiter.acquire()

deadlineStub.createDatum(proto)
ObjBox.remove(entity)
} catch (e: Exception) {
}
}
}.awaitAll()
val entity = ObjBox.get<T>(id) ?: throw Exception("No corresponding entity.")
val proto = toProto(entity) ?: throw Exception("No corresponding protobuf.")

deadlineStub.createDatum(proto).addListener({
ObjBox.remove(entity)
limiter.release()
}, { runnable: Runnable ->
MoreExecutors.directExecutor().execute(runnable)
})
} catch (e: Exception) { }
}

val startTime = SystemClock.elapsedRealtime()

while (limiter.availablePermits() < N_UPLOADS && SystemClock.elapsedRealtime() - startTime < WAIT_TIME_RELEASE) {
SystemClock.sleep(TimeUnit.SECONDS.toMillis(5))
}
}

Expand All @@ -124,12 +127,11 @@ class SyncWorker(context: Context, params: WorkerParameters) : CoroutineWorker(c
val runtime = Runtime.getRuntime()
val usedMemory = runtime.totalMemory() - runtime.freeMemory()
val usedPercentage = usedMemory.toFloat() / (maxHeapSize * 1e6).toFloat()
Log.d("SyncWorker", "usedPercentage: $usedPercentage / maxHeapSize: $maxHeapSize")

return usedPercentage > 0.5F
}

private suspend fun uploadAll(stub: DataOperationsCoroutineGrpc.DataOperationsCoroutineStub) {
private fun uploadAll(stub: DataOperationsGrpc.DataOperationsFutureStub) {
upload<PhysicalActivityTransitionEntity>(stub)
upload<PhysicalActivityEntity>(stub)
upload<AppUsageEventEntity>(stub)
Expand Down Expand Up @@ -343,18 +345,10 @@ class SyncWorker(context: Context, params: WorkerParameters) : CoroutineWorker(c
}?.build()
}

class CancelIntentService : IntentService(CancelIntentService::class.java.name) {
override fun onHandleIntent(intent: Intent?) {
Log.d("CancelIntentService", "onHandleIntent()")
requestStop(this)
}
}

companion object {
private const val N_UPLOADS: Int = 100
private const val N_UPLOADS: Int = 250
private val WAIT_TIME_RELEASE: Long = TimeUnit.MINUTES.toMillis(1)
private val INTERVAL_SYNC = TimeUnit.HOURS.toMillis(1)
private const val REQUEST_CODE_CANCEL_SYNC = 0x12
private const val ACTION_CANCEL_SYNC = "${BuildConfig.APPLICATION_ID}.ACTION_CANCEL_SYNC "
private val WORKER_NAME = SyncWorker::class.java.name

fun requestStart(context: Context, forceStart: Boolean, enableMetered: Boolean, isPeriodic: Boolean) {
Expand Down
Loading

0 comments on commit ec305c9

Please sign in to comment.