Skip to content

Commit

Permalink
Migrate to Shared/State Flow
Browse files Browse the repository at this point in the history
  • Loading branch information
dkrivoruchko committed Oct 25, 2020
1 parent addab23 commit 82ff0e9
Show file tree
Hide file tree
Showing 16 changed files with 198 additions and 290 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ import info.dvkr.screenstream.R
import info.dvkr.screenstream.data.other.getLog
import info.dvkr.screenstream.data.settings.Settings
import kotlinx.coroutines.CoroutineExceptionHandler
import kotlinx.coroutines.flow.safeCollect
import kotlinx.coroutines.ensureActive
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import org.koin.android.ext.android.inject

Expand All @@ -45,26 +48,25 @@ abstract class AppUpdateActivity(@LayoutRes contentLayoutId: Int) : BaseActivity
XLog.d(getLog("onCreate", "isAppUpdatePending: $isAppUpdatePending"))

lifecycleScope.launch(exceptionHandler) {
appUpdateManager.requestUpdateFlow().safeCollect { updateResult ->
try {
if (isAppUpdatePending.not() && isIAURequestTimeoutPassed() &&
updateResult is AppUpdateResult.Available && updateResult.updateInfo.isFlexibleUpdateAllowed
) {
XLog.d(this@AppUpdateActivity.getLog("AppUpdateManager", "startUpdateFlowForResult"))
isAppUpdatePending = true
appUpdateManager.startUpdateFlowForResult(
updateResult.updateInfo,
AppUpdateType.FLEXIBLE,
this@AppUpdateActivity,
APP_UPDATE_FLEXIBLE_REQUEST_CODE
)
}

if (updateResult is AppUpdateResult.Downloaded && isIAURequestTimeoutPassed()) showUpdateConfirmationDialog()
} catch (throwable: Throwable) {
XLog.e(getLog("AppUpdateManager.catch: $throwable"))
appUpdateManager.requestUpdateFlow().onEach { updateResult ->
ensureActive()
if (isAppUpdatePending.not() && isIAURequestTimeoutPassed() &&
updateResult is AppUpdateResult.Available && updateResult.updateInfo.isFlexibleUpdateAllowed
) {
XLog.d(this@AppUpdateActivity.getLog("AppUpdateManager", "startUpdateFlowForResult"))
isAppUpdatePending = true
appUpdateManager.startUpdateFlowForResult(
updateResult.updateInfo,
AppUpdateType.FLEXIBLE,
this@AppUpdateActivity,
APP_UPDATE_FLEXIBLE_REQUEST_CODE
)
}

if (updateResult is AppUpdateResult.Downloaded && isIAURequestTimeoutPassed()) showUpdateConfirmationDialog()
}
.catch { cause -> XLog.e(getLog("AppUpdateManager.catch: $cause")) }
.collect()
}
}

Expand Down
34 changes: 9 additions & 25 deletions app/src/main/kotlin/info/dvkr/screenstream/service/AppService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@ import info.dvkr.screenstream.databinding.ToastSlowConnectionBinding
import info.dvkr.screenstream.service.helper.IntentAction
import info.dvkr.screenstream.service.helper.NotificationHelper
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.ConflatedBroadcastChannel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.safeCollect
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.*
import org.koin.android.ext.android.inject
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
Expand All @@ -45,29 +43,16 @@ class AppService : Service() {
}

inner class AppServiceBinder : Binder() {
fun getServiceMessageFlow(): Flow<ServiceMessage> = serviceMessageChannel.asFlow()
fun getServiceMessageFlow(): SharedFlow<ServiceMessage> = _serviceMessageSharedFlow.asSharedFlow()
}

private val appServiceBinder = AppServiceBinder()
private val serviceMessageChannel = ConflatedBroadcastChannel<ServiceMessage>()
private val _serviceMessageSharedFlow =
MutableSharedFlow<ServiceMessage>(extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)

private fun sendMessageToActivities(serviceMessage: ServiceMessage) {
XLog.v(getLog("sendMessageToActivities", "ServiceMessage: $serviceMessage"))

if (serviceMessageChannel.isClosedForSend) {
XLog.w(getLog("sendMessageToActivities", "ServiceMessageChannel: isClosedForSend"))
return
}

try {
serviceMessageChannel.offer(serviceMessage)
} catch (ignore: CancellationException) {
XLog.w(getLog("sendMessageToActivities.ignore", ignore.toString()))
XLog.w(getLog("sendMessageToActivities.ignore"), ignore)
} catch (th: Throwable) {
XLog.e(getLog("sendMessageToActivities", th.toString()))
XLog.e(getLog("sendMessageToActivities"), th)
}
_serviceMessageSharedFlow.tryEmit(serviceMessage)
}

private val coroutineScope = CoroutineScope(
Expand Down Expand Up @@ -140,14 +125,14 @@ class AppService : Service() {

appStateMachine = AppStateMachineImpl(this, settings as SettingsReadOnly, ::onEffect)

coroutineScope.launch {
appStateMachine!!.statisticFlow.safeCollect { (clients, trafficHistory) ->
coroutineScope.launch(CoroutineName("AppService.statisticFlow")) {
appStateMachine!!.statisticFlow.onEach { (clients, trafficHistory) ->
XLog.v(this@AppService.getLog("onStatistic"))
if (settings.autoStartStop) checkAutoStartStop(clients)
if (settings.notifySlowConnections) checkForSlowClients(clients)
sendMessageToActivities(ServiceMessage.Clients(clients))
sendMessageToActivities(ServiceMessage.TrafficHistory(trafficHistory))
}
}.collect()
}

isRunning = true
Expand Down Expand Up @@ -215,7 +200,6 @@ class AppService : Service() {
stopForeground(true)
XLog.d(getLog("onDestroy", "Done"))
super.onDestroy()
// Runtime.getRuntime().exit(0)
}

private var slowClients: List<HttpClient> = emptyList()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ sealed class ServiceMessage {

data class Clients(val clients: List<HttpClient>) : ServiceMessage()
data class TrafficHistory(val trafficHistory: List<TrafficPoint>) : ServiceMessage() {
override fun toString(): String = this::class.java.simpleName
override fun toString(): String = javaClass.simpleName
}

override fun toString(): String = this::class.java.simpleName
override fun toString(): String = javaClass.simpleName
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ import info.dvkr.screenstream.R
import info.dvkr.screenstream.data.other.getLog
import info.dvkr.screenstream.service.helper.IntentAction
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.safeCollect
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.onEach

@TargetApi(Build.VERSION_CODES.N)
class TileActionService : TileService() {
Expand All @@ -28,30 +30,29 @@ class TileActionService : TileService() {
override fun onStartListening() {
super.onStartListening()
XLog.d(getLog("onStartListening", " isRunning:${AppService.isRunning}, isBound:$isBound"))
if (AppService.isRunning && isBound.not()) {
coroutineScope?.cancel()
coroutineScope = CoroutineScope(
Job() + Dispatchers.Main.immediate + CoroutineExceptionHandler { _, throwable ->
XLog.e(getLog("onCoroutineException"), throwable)
}
)

if (AppService.isRunning && isBound.not()) {
serviceConnection = object : ServiceConnection {
override fun onServiceConnected(name: ComponentName?, service: IBinder) {
XLog.d(this@TileActionService.getLog("onServiceConnected"))
coroutineScope!!.launch {
(service as AppService.AppServiceBinder).getServiceMessageFlow()
.safeCollect {
XLog.v(this@TileActionService.getLog("onServiceMessage", "$it"))
when (it) {
is ServiceMessage.ServiceState -> {
isStreaming = it.isStreaming; updateTile()
}
is ServiceMessage.FinishActivity -> {
isStreaming = false; updateTile()
coroutineScope?.cancel()
coroutineScope = CoroutineScope(Job() + Dispatchers.Main.immediate).apply {
launch(CoroutineName("TileActionService.ServiceMessageFlow")) {
(service as AppService.AppServiceBinder).getServiceMessageFlow()
.onEach { serviceMessage ->
XLog.d(this@TileActionService.getLog("onServiceMessage", "$serviceMessage"))
when (serviceMessage) {
is ServiceMessage.ServiceState -> {
isStreaming = serviceMessage.isStreaming; updateTile()
}
is ServiceMessage.FinishActivity -> {
isStreaming = false; updateTile()
}
}
}
}
.catch { cause -> XLog.e(this@TileActionService.getLog("onServiceMessage"), cause) }
.collect()
}
}
isBound = true
IntentAction.GetServiceState.sendToAppService(this@TileActionService)
Expand Down Expand Up @@ -111,7 +112,7 @@ class TileActionService : TileService() {
label = getString(R.string.notification_start)
contentDescription = getString(R.string.notification_start)
state = Tile.STATE_ACTIVE
updateTile()
runCatching { updateTile() }
}
} else {
qsTile?.apply {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,5 @@ sealed class IntentAction : Parcelable {
@Parcelize object StartOnBoot : IntentAction()
@Parcelize object RecoverError : IntentAction()

override fun toString(): String = this::class.java.simpleName
override fun toString(): String = javaClass.simpleName
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ import info.dvkr.screenstream.data.settings.SettingsReadOnly
import info.dvkr.screenstream.service.AppService
import info.dvkr.screenstream.service.ServiceMessage
import info.dvkr.screenstream.service.helper.IntentAction
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.safeCollect
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch

abstract class ServiceActivity(@LayoutRes contentLayoutId: Int) : AppUpdateActivity(contentLayoutId) {
Expand All @@ -31,16 +34,16 @@ abstract class ServiceActivity(@LayoutRes contentLayoutId: Int) : AppUpdateActiv
private val serviceConnection = object : ServiceConnection {
override fun onServiceConnected(name: ComponentName?, service: IBinder) {
XLog.d(this@ServiceActivity.getLog("onServiceConnected"))
serviceMessageFlowJob = lifecycle.coroutineScope.launch {
(service as AppService.AppServiceBinder).getServiceMessageFlow().safeCollect {
try {
XLog.v(this@ServiceActivity.getLog("onServiceMessage", "$it"))
serviceMessageLiveData.value = it
} catch (th: Throwable) {
XLog.e(this@ServiceActivity.getLog("onServiceMessage"), it)
}
serviceMessageFlowJob =
lifecycle.coroutineScope.launch(CoroutineName("ServiceActivity.ServiceMessageFlow")) {
(service as AppService.AppServiceBinder).getServiceMessageFlow()
.onEach { serviceMessage ->
XLog.v(this@ServiceActivity.getLog("onServiceMessage", "$serviceMessage"))
serviceMessageLiveData.value = serviceMessage
}
.catch { cause -> XLog.e(this@ServiceActivity.getLog("onServiceMessage"), cause) }
.collect()
}
}

isBound = true
IntentAction.GetServiceState.sendToAppService(this@ServiceActivity)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ class StreamFragment : Fragment(R.layout.fragment_stream) {

private fun onTrafficHistoryMessage(serviceMessage: ServiceMessage.TrafficHistory) {
binding.tvFragmentStreamTrafficHeader.text = getString(R.string.stream_fragment_current_traffic).run {
format(serviceMessage.trafficHistory.last().bytes.bytesToMbit()).setColorSpan(colorAccent, indexOf('%'))
val lastTrafficPoint = serviceMessage.trafficHistory.lastOrNull() ?: return@run "0"
format(lastTrafficPoint.bytes.bytesToMbit()).setColorSpan(colorAccent, indexOf('%'))
}

binding.trafficGraphFragmentStream.setDataPoints(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import io.netty.util.ResourceLeakDetector
import io.reactivex.netty.RxNetty
import io.reactivex.netty.protocol.http.server.HttpServer
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.BroadcastChannel
import kotlinx.coroutines.flow.StateFlow
import java.net.BindException
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference
Expand All @@ -25,7 +25,7 @@ class AppHttpServer(
private val settingsReadOnly: SettingsReadOnly,
private val httpServerFiles: HttpServerFiles,
private val clientStatistic: ClientStatistic,
private val bitmapChannel: BroadcastChannel<Bitmap>,
private val bitmapStateFlow: StateFlow<Bitmap>,
private val onStartStopRequest: () -> Unit,
private val onError: (AppError) -> Unit
) {
Expand Down Expand Up @@ -73,7 +73,7 @@ class AppHttpServer(
onStartStopRequest,
clientStatistic,
settingsReadOnly,
bitmapChannel
bitmapStateFlow
)

val serverEventLoop = RxNetty.getRxEventLoopProvider().globalServerEventLoop()
Expand Down
Loading

0 comments on commit 82ff0e9

Please sign in to comment.