From 82ff0e9eb97278e958439bf6202a13dc416a2bdd Mon Sep 17 00:00:00 2001 From: Dmitriy Krivoruchko Date: Sun, 25 Oct 2020 15:31:24 +0200 Subject: [PATCH] Migrate to Shared/State Flow --- .../ui/activity/AppUpdateActivity.kt | 40 +++-- .../dvkr/screenstream/service/AppService.kt | 34 +--- .../screenstream/service/ServiceMessage.kt | 4 +- .../screenstream/service/TileActionService.kt | 41 ++--- .../service/helper/IntentAction.kt | 2 +- .../ui/activity/ServiceActivity.kt | 23 +-- .../ui/fragment/StreamFragment.kt | 3 +- .../data/httpserver/AppHttpServer.kt | 6 +- .../data/httpserver/ClientStatistic.kt | 168 ++++++++---------- .../data/httpserver/HttpServerRxHandler.kt | 15 +- .../screenstream/data/image/BitmapCapture.kt | 14 +- .../dvkr/screenstream/data/model/AppError.kt | 1 - .../data/state/AppStateMachine.kt | 6 +- .../data/state/AppStateMachineImpl.kt | 88 ++++----- .../data/state/helper/BroadcastHelper.kt | 26 +-- .../kotlin/kotlinx/coroutines/flow/FlowFix.kt | 17 -- 16 files changed, 198 insertions(+), 290 deletions(-) delete mode 100644 data/src/main/kotlin/kotlinx/coroutines/flow/FlowFix.kt diff --git a/app/src/firebase/kotlin/info/dvkr/screenstream/ui/activity/AppUpdateActivity.kt b/app/src/firebase/kotlin/info/dvkr/screenstream/ui/activity/AppUpdateActivity.kt index 20cb06f5..95f37032 100644 --- a/app/src/firebase/kotlin/info/dvkr/screenstream/ui/activity/AppUpdateActivity.kt +++ b/app/src/firebase/kotlin/info/dvkr/screenstream/ui/activity/AppUpdateActivity.kt @@ -18,7 +18,10 @@ import info.dvkr.screenstream.R import info.dvkr.screenstream.data.other.getLog import info.dvkr.screenstream.data.settings.Settings import kotlinx.coroutines.CoroutineExceptionHandler -import kotlinx.coroutines.flow.safeCollect +import kotlinx.coroutines.ensureActive +import kotlinx.coroutines.flow.catch +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import org.koin.android.ext.android.inject @@ -45,26 +48,25 @@ abstract class AppUpdateActivity(@LayoutRes contentLayoutId: Int) : BaseActivity XLog.d(getLog("onCreate", "isAppUpdatePending: $isAppUpdatePending")) lifecycleScope.launch(exceptionHandler) { - appUpdateManager.requestUpdateFlow().safeCollect { updateResult -> - try { - if (isAppUpdatePending.not() && isIAURequestTimeoutPassed() && - updateResult is AppUpdateResult.Available && updateResult.updateInfo.isFlexibleUpdateAllowed - ) { - XLog.d(this@AppUpdateActivity.getLog("AppUpdateManager", "startUpdateFlowForResult")) - isAppUpdatePending = true - appUpdateManager.startUpdateFlowForResult( - updateResult.updateInfo, - AppUpdateType.FLEXIBLE, - this@AppUpdateActivity, - APP_UPDATE_FLEXIBLE_REQUEST_CODE - ) - } - - if (updateResult is AppUpdateResult.Downloaded && isIAURequestTimeoutPassed()) showUpdateConfirmationDialog() - } catch (throwable: Throwable) { - XLog.e(getLog("AppUpdateManager.catch: $throwable")) + appUpdateManager.requestUpdateFlow().onEach { updateResult -> + ensureActive() + if (isAppUpdatePending.not() && isIAURequestTimeoutPassed() && + updateResult is AppUpdateResult.Available && updateResult.updateInfo.isFlexibleUpdateAllowed + ) { + XLog.d(this@AppUpdateActivity.getLog("AppUpdateManager", "startUpdateFlowForResult")) + isAppUpdatePending = true + appUpdateManager.startUpdateFlowForResult( + updateResult.updateInfo, + AppUpdateType.FLEXIBLE, + this@AppUpdateActivity, + APP_UPDATE_FLEXIBLE_REQUEST_CODE + ) } + + if (updateResult is AppUpdateResult.Downloaded && isIAURequestTimeoutPassed()) showUpdateConfirmationDialog() } + .catch { cause -> XLog.e(getLog("AppUpdateManager.catch: $cause")) } + .collect() } } diff --git a/app/src/main/kotlin/info/dvkr/screenstream/service/AppService.kt b/app/src/main/kotlin/info/dvkr/screenstream/service/AppService.kt index c82f9c63..f84b6c3e 100644 --- a/app/src/main/kotlin/info/dvkr/screenstream/service/AppService.kt +++ b/app/src/main/kotlin/info/dvkr/screenstream/service/AppService.kt @@ -24,10 +24,8 @@ import info.dvkr.screenstream.databinding.ToastSlowConnectionBinding import info.dvkr.screenstream.service.helper.IntentAction import info.dvkr.screenstream.service.helper.NotificationHelper import kotlinx.coroutines.* -import kotlinx.coroutines.channels.ConflatedBroadcastChannel -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.asFlow -import kotlinx.coroutines.flow.safeCollect +import kotlinx.coroutines.channels.BufferOverflow +import kotlinx.coroutines.flow.* import org.koin.android.ext.android.inject import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicReference @@ -45,29 +43,16 @@ class AppService : Service() { } inner class AppServiceBinder : Binder() { - fun getServiceMessageFlow(): Flow = serviceMessageChannel.asFlow() + fun getServiceMessageFlow(): SharedFlow = _serviceMessageSharedFlow.asSharedFlow() } private val appServiceBinder = AppServiceBinder() - private val serviceMessageChannel = ConflatedBroadcastChannel() + private val _serviceMessageSharedFlow = + MutableSharedFlow(extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST) private fun sendMessageToActivities(serviceMessage: ServiceMessage) { XLog.v(getLog("sendMessageToActivities", "ServiceMessage: $serviceMessage")) - - if (serviceMessageChannel.isClosedForSend) { - XLog.w(getLog("sendMessageToActivities", "ServiceMessageChannel: isClosedForSend")) - return - } - - try { - serviceMessageChannel.offer(serviceMessage) - } catch (ignore: CancellationException) { - XLog.w(getLog("sendMessageToActivities.ignore", ignore.toString())) - XLog.w(getLog("sendMessageToActivities.ignore"), ignore) - } catch (th: Throwable) { - XLog.e(getLog("sendMessageToActivities", th.toString())) - XLog.e(getLog("sendMessageToActivities"), th) - } + _serviceMessageSharedFlow.tryEmit(serviceMessage) } private val coroutineScope = CoroutineScope( @@ -140,14 +125,14 @@ class AppService : Service() { appStateMachine = AppStateMachineImpl(this, settings as SettingsReadOnly, ::onEffect) - coroutineScope.launch { - appStateMachine!!.statisticFlow.safeCollect { (clients, trafficHistory) -> + coroutineScope.launch(CoroutineName("AppService.statisticFlow")) { + appStateMachine!!.statisticFlow.onEach { (clients, trafficHistory) -> XLog.v(this@AppService.getLog("onStatistic")) if (settings.autoStartStop) checkAutoStartStop(clients) if (settings.notifySlowConnections) checkForSlowClients(clients) sendMessageToActivities(ServiceMessage.Clients(clients)) sendMessageToActivities(ServiceMessage.TrafficHistory(trafficHistory)) - } + }.collect() } isRunning = true @@ -215,7 +200,6 @@ class AppService : Service() { stopForeground(true) XLog.d(getLog("onDestroy", "Done")) super.onDestroy() -// Runtime.getRuntime().exit(0) } private var slowClients: List = emptyList() diff --git a/app/src/main/kotlin/info/dvkr/screenstream/service/ServiceMessage.kt b/app/src/main/kotlin/info/dvkr/screenstream/service/ServiceMessage.kt index ce7d0e1a..c81785ff 100644 --- a/app/src/main/kotlin/info/dvkr/screenstream/service/ServiceMessage.kt +++ b/app/src/main/kotlin/info/dvkr/screenstream/service/ServiceMessage.kt @@ -16,8 +16,8 @@ sealed class ServiceMessage { data class Clients(val clients: List) : ServiceMessage() data class TrafficHistory(val trafficHistory: List) : ServiceMessage() { - override fun toString(): String = this::class.java.simpleName + override fun toString(): String = javaClass.simpleName } - override fun toString(): String = this::class.java.simpleName + override fun toString(): String = javaClass.simpleName } \ No newline at end of file diff --git a/app/src/main/kotlin/info/dvkr/screenstream/service/TileActionService.kt b/app/src/main/kotlin/info/dvkr/screenstream/service/TileActionService.kt index e145c6fd..56a00d5c 100644 --- a/app/src/main/kotlin/info/dvkr/screenstream/service/TileActionService.kt +++ b/app/src/main/kotlin/info/dvkr/screenstream/service/TileActionService.kt @@ -15,7 +15,9 @@ import info.dvkr.screenstream.R import info.dvkr.screenstream.data.other.getLog import info.dvkr.screenstream.service.helper.IntentAction import kotlinx.coroutines.* -import kotlinx.coroutines.flow.safeCollect +import kotlinx.coroutines.flow.catch +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.onEach @TargetApi(Build.VERSION_CODES.N) class TileActionService : TileService() { @@ -28,30 +30,29 @@ class TileActionService : TileService() { override fun onStartListening() { super.onStartListening() XLog.d(getLog("onStartListening", " isRunning:${AppService.isRunning}, isBound:$isBound")) - if (AppService.isRunning && isBound.not()) { - coroutineScope?.cancel() - coroutineScope = CoroutineScope( - Job() + Dispatchers.Main.immediate + CoroutineExceptionHandler { _, throwable -> - XLog.e(getLog("onCoroutineException"), throwable) - } - ) + if (AppService.isRunning && isBound.not()) { serviceConnection = object : ServiceConnection { override fun onServiceConnected(name: ComponentName?, service: IBinder) { XLog.d(this@TileActionService.getLog("onServiceConnected")) - coroutineScope!!.launch { - (service as AppService.AppServiceBinder).getServiceMessageFlow() - .safeCollect { - XLog.v(this@TileActionService.getLog("onServiceMessage", "$it")) - when (it) { - is ServiceMessage.ServiceState -> { - isStreaming = it.isStreaming; updateTile() - } - is ServiceMessage.FinishActivity -> { - isStreaming = false; updateTile() + coroutineScope?.cancel() + coroutineScope = CoroutineScope(Job() + Dispatchers.Main.immediate).apply { + launch(CoroutineName("TileActionService.ServiceMessageFlow")) { + (service as AppService.AppServiceBinder).getServiceMessageFlow() + .onEach { serviceMessage -> + XLog.d(this@TileActionService.getLog("onServiceMessage", "$serviceMessage")) + when (serviceMessage) { + is ServiceMessage.ServiceState -> { + isStreaming = serviceMessage.isStreaming; updateTile() + } + is ServiceMessage.FinishActivity -> { + isStreaming = false; updateTile() + } } } - } + .catch { cause -> XLog.e(this@TileActionService.getLog("onServiceMessage"), cause) } + .collect() + } } isBound = true IntentAction.GetServiceState.sendToAppService(this@TileActionService) @@ -111,7 +112,7 @@ class TileActionService : TileService() { label = getString(R.string.notification_start) contentDescription = getString(R.string.notification_start) state = Tile.STATE_ACTIVE - updateTile() + runCatching { updateTile() } } } else { qsTile?.apply { diff --git a/app/src/main/kotlin/info/dvkr/screenstream/service/helper/IntentAction.kt b/app/src/main/kotlin/info/dvkr/screenstream/service/helper/IntentAction.kt index f267651e..7dc1855a 100644 --- a/app/src/main/kotlin/info/dvkr/screenstream/service/helper/IntentAction.kt +++ b/app/src/main/kotlin/info/dvkr/screenstream/service/helper/IntentAction.kt @@ -37,5 +37,5 @@ sealed class IntentAction : Parcelable { @Parcelize object StartOnBoot : IntentAction() @Parcelize object RecoverError : IntentAction() - override fun toString(): String = this::class.java.simpleName + override fun toString(): String = javaClass.simpleName } \ No newline at end of file diff --git a/app/src/main/kotlin/info/dvkr/screenstream/ui/activity/ServiceActivity.kt b/app/src/main/kotlin/info/dvkr/screenstream/ui/activity/ServiceActivity.kt index 6d083d2c..4ccf9004 100644 --- a/app/src/main/kotlin/info/dvkr/screenstream/ui/activity/ServiceActivity.kt +++ b/app/src/main/kotlin/info/dvkr/screenstream/ui/activity/ServiceActivity.kt @@ -18,8 +18,11 @@ import info.dvkr.screenstream.data.settings.SettingsReadOnly import info.dvkr.screenstream.service.AppService import info.dvkr.screenstream.service.ServiceMessage import info.dvkr.screenstream.service.helper.IntentAction +import kotlinx.coroutines.CoroutineName import kotlinx.coroutines.Job -import kotlinx.coroutines.flow.safeCollect +import kotlinx.coroutines.flow.catch +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch abstract class ServiceActivity(@LayoutRes contentLayoutId: Int) : AppUpdateActivity(contentLayoutId) { @@ -31,16 +34,16 @@ abstract class ServiceActivity(@LayoutRes contentLayoutId: Int) : AppUpdateActiv private val serviceConnection = object : ServiceConnection { override fun onServiceConnected(name: ComponentName?, service: IBinder) { XLog.d(this@ServiceActivity.getLog("onServiceConnected")) - serviceMessageFlowJob = lifecycle.coroutineScope.launch { - (service as AppService.AppServiceBinder).getServiceMessageFlow().safeCollect { - try { - XLog.v(this@ServiceActivity.getLog("onServiceMessage", "$it")) - serviceMessageLiveData.value = it - } catch (th: Throwable) { - XLog.e(this@ServiceActivity.getLog("onServiceMessage"), it) - } + serviceMessageFlowJob = + lifecycle.coroutineScope.launch(CoroutineName("ServiceActivity.ServiceMessageFlow")) { + (service as AppService.AppServiceBinder).getServiceMessageFlow() + .onEach { serviceMessage -> + XLog.v(this@ServiceActivity.getLog("onServiceMessage", "$serviceMessage")) + serviceMessageLiveData.value = serviceMessage + } + .catch { cause -> XLog.e(this@ServiceActivity.getLog("onServiceMessage"), cause) } + .collect() } - } isBound = true IntentAction.GetServiceState.sendToAppService(this@ServiceActivity) diff --git a/app/src/main/kotlin/info/dvkr/screenstream/ui/fragment/StreamFragment.kt b/app/src/main/kotlin/info/dvkr/screenstream/ui/fragment/StreamFragment.kt index d13443fa..8ffba294 100644 --- a/app/src/main/kotlin/info/dvkr/screenstream/ui/fragment/StreamFragment.kt +++ b/app/src/main/kotlin/info/dvkr/screenstream/ui/fragment/StreamFragment.kt @@ -149,7 +149,8 @@ class StreamFragment : Fragment(R.layout.fragment_stream) { private fun onTrafficHistoryMessage(serviceMessage: ServiceMessage.TrafficHistory) { binding.tvFragmentStreamTrafficHeader.text = getString(R.string.stream_fragment_current_traffic).run { - format(serviceMessage.trafficHistory.last().bytes.bytesToMbit()).setColorSpan(colorAccent, indexOf('%')) + val lastTrafficPoint = serviceMessage.trafficHistory.lastOrNull() ?: return@run "0" + format(lastTrafficPoint.bytes.bytesToMbit()).setColorSpan(colorAccent, indexOf('%')) } binding.trafficGraphFragmentStream.setDataPoints( diff --git a/data/src/main/kotlin/info/dvkr/screenstream/data/httpserver/AppHttpServer.kt b/data/src/main/kotlin/info/dvkr/screenstream/data/httpserver/AppHttpServer.kt index 2519a59a..c6053dc5 100644 --- a/data/src/main/kotlin/info/dvkr/screenstream/data/httpserver/AppHttpServer.kt +++ b/data/src/main/kotlin/info/dvkr/screenstream/data/httpserver/AppHttpServer.kt @@ -16,7 +16,7 @@ import io.netty.util.ResourceLeakDetector import io.reactivex.netty.RxNetty import io.reactivex.netty.protocol.http.server.HttpServer import kotlinx.coroutines.* -import kotlinx.coroutines.channels.BroadcastChannel +import kotlinx.coroutines.flow.StateFlow import java.net.BindException import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicReference @@ -25,7 +25,7 @@ class AppHttpServer( private val settingsReadOnly: SettingsReadOnly, private val httpServerFiles: HttpServerFiles, private val clientStatistic: ClientStatistic, - private val bitmapChannel: BroadcastChannel, + private val bitmapStateFlow: StateFlow, private val onStartStopRequest: () -> Unit, private val onError: (AppError) -> Unit ) { @@ -73,7 +73,7 @@ class AppHttpServer( onStartStopRequest, clientStatistic, settingsReadOnly, - bitmapChannel + bitmapStateFlow ) val serverEventLoop = RxNetty.getRxEventLoopProvider().globalServerEventLoop() diff --git a/data/src/main/kotlin/info/dvkr/screenstream/data/httpserver/ClientStatistic.kt b/data/src/main/kotlin/info/dvkr/screenstream/data/httpserver/ClientStatistic.kt index ab30c34f..f1c6e35a 100644 --- a/data/src/main/kotlin/info/dvkr/screenstream/data/httpserver/ClientStatistic.kt +++ b/data/src/main/kotlin/info/dvkr/screenstream/data/httpserver/ClientStatistic.kt @@ -7,11 +7,7 @@ import info.dvkr.screenstream.data.model.HttpClient import info.dvkr.screenstream.data.model.TrafficPoint import info.dvkr.screenstream.data.other.getLog import kotlinx.coroutines.* -import kotlinx.coroutines.channels.ClosedSendChannelException -import kotlinx.coroutines.channels.ConflatedBroadcastChannel -import kotlinx.coroutines.channels.actor -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.asFlow +import kotlinx.coroutines.flow.* import java.util.* class ClientStatistic( @@ -26,25 +22,9 @@ class ClientStatistic( data class Backpressure(val id: Long) : StatisticEvent() data class NextBytes(val id: Long, val bytesCount: Int) : StatisticEvent() - override fun toString(): String = this::class.java.simpleName + override fun toString(): String = javaClass.simpleName } - companion object { - private const val CLIENT_DISCONNECT_HOLD_TIME_SECONDS = 5 - private const val TRAFFIC_HISTORY_SECONDS = 30 - } - - private val statisticChannel = ConflatedBroadcastChannel, List>>() - - val statisticFlow: Flow, List>> = statisticChannel.asFlow() - - private val coroutineExceptionHandler = CoroutineExceptionHandler { _, throwable -> - XLog.e(getLog("onCoroutineException"), throwable) - onError(FatalError.CoroutineException) - } - - private val statisticScope = CoroutineScope(Job() + Dispatchers.Default + coroutineExceptionHandler) - private data class StatisticClient( val id: Long, val clientAddressAndPort: String, @@ -58,96 +38,88 @@ class ClientStatistic( fun toHttpClient() = HttpClient(id, clientAddressAndPort, isSlowConnection, isDisconnected) } - private val statisticEventChannel = statisticScope.actor(capacity = 64) { - val clientsMap: MutableMap = mutableMapOf() - val trafficHistory = LinkedList() + companion object { + private const val CLIENT_DISCONNECT_HOLD_TIME_SECONDS = 5 + private const val TRAFFIC_HISTORY_SECONDS = 30 + } + + private val statisticScope = CoroutineScope(Job() + Dispatchers.Default) - val past = System.currentTimeMillis() - TRAFFIC_HISTORY_SECONDS * 1000 - (0..TRAFFIC_HISTORY_SECONDS + 1).forEach { i -> - trafficHistory.addLast(TrafficPoint(i * 1000 + past, 0)) - } + private val clientsMap: MutableMap = mutableMapOf() + private val trafficHistory = LinkedList() + private val past = System.currentTimeMillis() - TRAFFIC_HISTORY_SECONDS * 1000 + private val _statisticEventSharedFlow = MutableSharedFlow(extraBufferCapacity = 64) - for (event in this) { - ensureActive() - try { - when (event) { - is StatisticEvent.Connected -> - clientsMap[event.id] = StatisticClient(event.id, event.clientAddressAndPort) - - is StatisticEvent.Disconnected -> - clientsMap[event.id]?.apply { - isDisconnected = true - disconnectedTime = System.currentTimeMillis() - } - - is StatisticEvent.Backpressure -> - clientsMap[event.id]?.isSlowConnection = true - - is StatisticEvent.NextBytes -> - clientsMap[event.id]?.apply { sendBytes = sendBytes.plus(event.bytesCount) } - - is StatisticEvent.SendStatistic -> { - val now = System.currentTimeMillis() - clientsMap.values.removeAll { it.isDisconnected && it.isDisconnectHoldTimePass(now) } - val traffic = clientsMap.values.map { it.sendBytes }.sum() - clientsMap.values.forEach { it.sendBytes = 0 } - trafficHistory.removeFirst() - trafficHistory.addLast(TrafficPoint(now, traffic)) - - val clients = clientsMap.values.map { it.toHttpClient() }.sortedBy { it.clientAddressAndPort } - statisticChannel.offer(Pair(clients, trafficHistory.sortedBy { it.time })) - } - - is StatisticEvent.ClearClients -> clientsMap.clear() - } - } catch (ignore: CancellationException) { - XLog.w(this@ClientStatistic.getLog("actor.ignore", ignore.toString())) - XLog.w(this@ClientStatistic.getLog("actor.ignore"), ignore) - } catch (throwable: Throwable) { - XLog.e(this@ClientStatistic.getLog("actor.catch", throwable.toString())) - XLog.e(this@ClientStatistic.getLog("actor.catch"), throwable) - onError(FatalError.CoroutineException) - } - } + private val _statisticStateFlow = + MutableStateFlow, List>>(Pair(emptyList(), emptyList())) + val statisticFlow: StateFlow, List>> = _statisticStateFlow.asStateFlow() + + internal fun sendEvent(event: StatisticEvent) { + XLog.v(getLog("sendEvent", event.toString())) + + _statisticEventSharedFlow.tryEmit(event) || throw IllegalStateException("_eventSharedFlow IsFull") + } + + fun destroy() { + XLog.d(getLog("destroy")) + statisticScope.cancel() } init { XLog.d(getLog("init")) - statisticScope.launch { - while (true) { - ensureActive() + + (0..TRAFFIC_HISTORY_SECONDS + 1).forEach { i -> trafficHistory.addLast(TrafficPoint(i * 1000 + past, 0)) } + + statisticScope.launch(CoroutineName("ClientStatistic.SendEvent flow")) { + _statisticEventSharedFlow + .onEach { event -> onEvent(event) } + .catch { cause -> + XLog.e(this@ClientStatistic.getLog("_statisticEventSharedFlow.catch"), cause) + onError(FatalError.CoroutineException) + } + .collect() + } + + statisticScope.launch(CoroutineName("ClientStatistic.SendStatistic timer")) { + while (isActive) { sendEvent(StatisticEvent.SendStatistic) delay(1000) } } } - internal fun sendEvent(event: StatisticEvent) { - XLog.v(getLog("sendEvent", event.toString())) + private fun onEvent(event: StatisticEvent) { + XLog.v(getLog("onEvent", event.toString())) - if (statisticEventChannel.isClosedForSend) { - XLog.e(getLog("sendEvent", "ChannelIsClosed")) - return - } + when (event) { + is StatisticEvent.Connected -> + clientsMap[event.id] = StatisticClient(event.id, event.clientAddressAndPort) - try { - statisticEventChannel.offer(event) //|| throw IllegalStateException("ChannelIsFull") - } catch (ignore: CancellationException) { - XLog.w(getLog("sendEvent.ignore", ignore.toString())) - XLog.w(getLog("sendEvent.ignore"), ignore) - } catch (closedChannel: ClosedSendChannelException) { - XLog.w(getLog("sendEvent.closedChannel", closedChannel.toString())) - XLog.w(getLog("sendEvent.closedChannel"), closedChannel) - } catch (th: Throwable) { - XLog.e(getLog("sendEvent", th.toString())) - XLog.e(getLog("sendEvent"), th) - onError(FatalError.ChannelException) - } - } + is StatisticEvent.Disconnected -> + clientsMap[event.id]?.apply { + isDisconnected = true + disconnectedTime = System.currentTimeMillis() + } - fun destroy() { - XLog.d(getLog("destroy")) - statisticEventChannel.close() - statisticScope.cancel() + is StatisticEvent.Backpressure -> + clientsMap[event.id]?.isSlowConnection = true + + is StatisticEvent.NextBytes -> + clientsMap[event.id]?.apply { sendBytes = sendBytes.plus(event.bytesCount) } + + is StatisticEvent.SendStatistic -> { + val now = System.currentTimeMillis() + clientsMap.values.removeAll { it.isDisconnected && it.isDisconnectHoldTimePass(now) } + val traffic = clientsMap.values.map { it.sendBytes }.sum() + clientsMap.values.forEach { it.sendBytes = 0 } + trafficHistory.removeFirst() + trafficHistory.addLast(TrafficPoint(now, traffic)) + + val clients = clientsMap.values.map { it.toHttpClient() }.sortedBy { it.clientAddressAndPort } + _statisticStateFlow.tryEmit(Pair(clients, trafficHistory.sortedBy { it.time })) + } + + is StatisticEvent.ClearClients -> clientsMap.clear() + } } } \ No newline at end of file diff --git a/data/src/main/kotlin/info/dvkr/screenstream/data/httpserver/HttpServerRxHandler.kt b/data/src/main/kotlin/info/dvkr/screenstream/data/httpserver/HttpServerRxHandler.kt index 8fa9f7f2..0c596834 100644 --- a/data/src/main/kotlin/info/dvkr/screenstream/data/httpserver/HttpServerRxHandler.kt +++ b/data/src/main/kotlin/info/dvkr/screenstream/data/httpserver/HttpServerRxHandler.kt @@ -18,9 +18,10 @@ import io.reactivex.netty.protocol.http.server.HttpServerResponse import io.reactivex.netty.protocol.http.server.RequestHandler import io.reactivex.netty.threads.RxJavaEventloopScheduler import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.channels.BroadcastChannel -import kotlinx.coroutines.channels.consumeEach import kotlinx.coroutines.ensureActive +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import rx.BackpressureOverflow import rx.Observable @@ -36,7 +37,7 @@ internal class HttpServerRxHandler( private val onStartStopRequest: () -> Unit, private val clientStatistic: ClientStatistic, private val settingsReadOnly: SettingsReadOnly, - private val bitmapChannel: BroadcastChannel + private val bitmapStateFlow: StateFlow ) : RequestHandler { private val crlf = "\r\n".toByteArray() @@ -56,18 +57,16 @@ internal class HttpServerRxHandler( val resultJpegStream = ByteArrayOutputStream() coroutineScope.launch { - bitmapChannel.openSubscription().consumeEach { bitmap -> - this.ensureActive() + bitmapStateFlow.onEach { bitmap -> resultJpegStream.reset() bitmap.compress(Bitmap.CompressFormat.JPEG, settingsReadOnly.jpegQuality, resultJpegStream) - this.ensureActive() + ensureActive() val jpegBytes = resultJpegStream.toByteArray().also { jpegStillImg.set(it) } val jpegLength = jpegBytes.size.toString().toByteArray() jpegBytesStream.call( Unpooled.copiedBuffer(jpegBaseHeader, jpegLength, crlf, crlf, jpegBytes, crlf, jpegBoundary).array() ) - this.ensureActive() - } + }.collect() } } diff --git a/data/src/main/kotlin/info/dvkr/screenstream/data/image/BitmapCapture.kt b/data/src/main/kotlin/info/dvkr/screenstream/data/image/BitmapCapture.kt index 6d36b51e..c549f57f 100644 --- a/data/src/main/kotlin/info/dvkr/screenstream/data/image/BitmapCapture.kt +++ b/data/src/main/kotlin/info/dvkr/screenstream/data/image/BitmapCapture.kt @@ -1,11 +1,7 @@ package info.dvkr.screenstream.data.image import android.annotation.SuppressLint -import android.graphics.Bitmap -import android.graphics.Matrix -import android.graphics.PixelFormat -import android.graphics.Point -import android.graphics.SurfaceTexture +import android.graphics.* import android.hardware.display.DisplayManager import android.hardware.display.VirtualDisplay import android.media.Image @@ -30,7 +26,7 @@ import info.dvkr.screenstream.data.other.getLog import info.dvkr.screenstream.data.settings.Settings import info.dvkr.screenstream.data.settings.SettingsReadOnly import kotlinx.coroutines.* -import kotlinx.coroutines.channels.SendChannel +import kotlinx.coroutines.flow.MutableStateFlow import java.nio.ByteBuffer import java.nio.ByteOrder import java.util.concurrent.atomic.AtomicInteger @@ -40,7 +36,7 @@ class BitmapCapture( private val display: Display, private val settingsReadOnly: SettingsReadOnly, private val mediaProjection: MediaProjection, - private val outBitmapChannel: SendChannel, + private val bitmapStateFlow: MutableStateFlow, private val onError: (AppError) -> Unit ) { @@ -278,7 +274,7 @@ class BitmapCapture( val croppedBitmap = getCroppedBitmap(cleanBitmap) val upsizedBitmap = getUpsizedAndRotadedBitmap(croppedBitmap) - if (outBitmapChannel.isClosedForSend.not()) outBitmapChannel.offer(upsizedBitmap) + bitmapStateFlow.tryEmit(upsizedBitmap) } } } @@ -305,7 +301,7 @@ class BitmapCapture( val upsizedBitmap = getUpsizedAndRotadedBitmap(croppedBitmap) image.close() - if (outBitmapChannel.isClosedForSend.not()) outBitmapChannel.offer(upsizedBitmap) + bitmapStateFlow.tryEmit(upsizedBitmap) } } catch (ex: UnsupportedOperationException) { XLog.d("unsupported image format, switching to fallback image reader") diff --git a/data/src/main/kotlin/info/dvkr/screenstream/data/model/AppError.kt b/data/src/main/kotlin/info/dvkr/screenstream/data/model/AppError.kt index 87f1a40f..c7c3eebf 100644 --- a/data/src/main/kotlin/info/dvkr/screenstream/data/model/AppError.kt +++ b/data/src/main/kotlin/info/dvkr/screenstream/data/model/AppError.kt @@ -5,7 +5,6 @@ sealed class AppError : Throwable() sealed class FatalError : AppError() { object CoroutineException : FatalError() - object ActorException : FatalError() object ChannelException : FatalError() object NettyServerException : FatalError() object BitmapFormatException : FatalError() diff --git a/data/src/main/kotlin/info/dvkr/screenstream/data/state/AppStateMachine.kt b/data/src/main/kotlin/info/dvkr/screenstream/data/state/AppStateMachine.kt index ec0d5ca6..db4ba6a2 100644 --- a/data/src/main/kotlin/info/dvkr/screenstream/data/state/AppStateMachine.kt +++ b/data/src/main/kotlin/info/dvkr/screenstream/data/state/AppStateMachine.kt @@ -5,7 +5,7 @@ import info.dvkr.screenstream.data.model.AppError import info.dvkr.screenstream.data.model.HttpClient import info.dvkr.screenstream.data.model.NetInterface import info.dvkr.screenstream.data.model.TrafficPoint -import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.StateFlow interface AppStateMachine { @@ -18,7 +18,7 @@ interface AppStateMachine { object RequestPublicState : Event() object RecoverError : Event() - override fun toString(): String = this::class.java.simpleName + override fun toString(): String = javaClass.simpleName } sealed class Effect { @@ -32,7 +32,7 @@ interface AppStateMachine { ) : Effect() } - val statisticFlow: Flow, List>> + val statisticFlow: StateFlow, List>> fun sendEvent(event: Event, timeout: Long = 0) diff --git a/data/src/main/kotlin/info/dvkr/screenstream/data/state/AppStateMachineImpl.kt b/data/src/main/kotlin/info/dvkr/screenstream/data/state/AppStateMachineImpl.kt index 12231c36..5302b757 100644 --- a/data/src/main/kotlin/info/dvkr/screenstream/data/state/AppStateMachineImpl.kt +++ b/data/src/main/kotlin/info/dvkr/screenstream/data/state/AppStateMachineImpl.kt @@ -20,10 +20,7 @@ import info.dvkr.screenstream.data.state.helper.ConnectivityHelper import info.dvkr.screenstream.data.state.helper.MediaProjectionHelper import info.dvkr.screenstream.data.state.helper.NetworkHelper import kotlinx.coroutines.* -import kotlinx.coroutines.channels.ClosedSendChannelException -import kotlinx.coroutines.channels.ConflatedBroadcastChannel -import kotlinx.coroutines.channels.actor -import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.* class AppStateMachineImpl( context: Context, @@ -31,17 +28,17 @@ class AppStateMachineImpl( private val onEffect: suspend (AppStateMachine.Effect) -> Unit ) : AppStateMachine { - private val applicationContext: Context = context.applicationContext - private val bitmapChannel: ConflatedBroadcastChannel = ConflatedBroadcastChannel() + private val applicationContext = context.applicationContext + private val bitmapStateFlow = MutableStateFlow(Bitmap.createBitmap(1, 1, Bitmap.Config.ARGB_8888)) private val mediaProjectionHelper = MediaProjectionHelper(context) { sendEvent(AppStateMachine.Event.StopStream) } - private val broadcastHelper = BroadcastHelper.getInstance(context, ::onError) + private val broadcastHelper = BroadcastHelper.getInstance(context) private val connectivityHelper: ConnectivityHelper = ConnectivityHelper.getInstance(context) private val networkHelper = NetworkHelper(context) private val notificationBitmap = NotificationBitmap(context) private val clientStatistic: ClientStatistic = ClientStatistic(::onError) private val httpServerFiles = HttpServerFiles(applicationContext, settingsReadOnly) - private val httpServer: AppHttpServer = AppHttpServer( - settingsReadOnly, httpServerFiles, clientStatistic, bitmapChannel, + private val httpServer = AppHttpServer( + settingsReadOnly, httpServerFiles, clientStatistic, bitmapStateFlow.asStateFlow(), { sendEvent(InternalEvent.StartStopFromWebPage) }, ::onError ) @@ -62,7 +59,7 @@ class AppStateMachineImpl( object ScreenOff : InternalEvent() object Destroy : InternalEvent() - override fun toString(): String = this::class.java.simpleName + override fun toString(): String = javaClass.simpleName } internal sealed class RestartReason(private val msg: String) { @@ -70,7 +67,7 @@ class AppStateMachineImpl( class SettingsChanged(msg: String) : RestartReason(msg) class NetworkSettingsChanged(msg: String) : RestartReason(msg) - override fun toString(): String = "${this::class.java.simpleName}[$msg]" + override fun toString(): String = "${javaClass.simpleName}[$msg]" } private val settingsListener = object : SettingsReadOnly.OnSettingsChangeListener { @@ -90,7 +87,7 @@ class AppStateMachineImpl( } } - override val statisticFlow: Flow, List>> = clientStatistic.statisticFlow + override val statisticFlow: StateFlow, List>> = clientStatistic.statisticFlow override fun sendEvent(event: AppStateMachine.Event, timeout: Long) { if (timeout > 0) { @@ -99,44 +96,31 @@ class AppStateMachineImpl( } else { XLog.d(getLog("sendEvent", "Event: $event")) - if (eventChannel.isClosedForSend) { - XLog.e(getLog("sendEvent", "ChannelIsClosed")) - return - } - try { - eventChannel.offer(event) || throw IllegalStateException("ChannelIsFull") - } catch (ignore: CancellationException) { - XLog.w(getLog("sendEvent.ignore", ignore.toString())) - XLog.w(getLog("sendEvent.ignore"), ignore) - } catch (closedChannel: ClosedSendChannelException) { - XLog.w(getLog("sendEvent.closedChannel", closedChannel.toString())) - XLog.w(getLog("sendEvent.closedChannel"), closedChannel) + _eventSharedFlow.tryEmit(event) || throw IllegalStateException("_eventSharedFlow IsFull") } catch (th: Throwable) { - XLog.e(getLog("sendEvent", th.toString())) XLog.e(getLog("sendEvent"), th) coroutineScope.launch(NonCancellable) { onEffect( - AppStateMachine.Effect.PublicState( - false, true, false, emptyList(), FatalError.ChannelException - ) + AppStateMachine.Effect.PublicState(false, true, false, emptyList(), FatalError.ChannelException) ) } } } } - private val eventChannel = coroutineScope.actor(capacity = 32) { - var streamState = StreamState() - var previousStreamState: StreamState + private var streamState = StreamState() + private var previousStreamState = StreamState() + private val _eventSharedFlow = MutableSharedFlow(extraBufferCapacity = 32) - for (event in this) { - ensureActive() - try { - if (StateToEventMatrix.skippEvent(streamState.state, event).not()) { + init { + XLog.d(getLog("init")) + coroutineScope.launch(CoroutineName("AppStateMachineImpl.eventSharedFlow")) { + _eventSharedFlow.onEach { event -> + XLog.d(this@AppStateMachineImpl.getLog("eventSharedFlow.onEach", "$event")) + if (StateToEventMatrix.skippEvent(streamState.state, event).not()) { previousStreamState = streamState - streamState = when (event) { is InternalEvent.DiscoverAddress -> discoverAddress(streamState) is InternalEvent.StartServer -> startServer(streamState) @@ -157,22 +141,17 @@ class AppStateMachineImpl( if (streamState.isPublicStatePublishRequired(previousStreamState)) onEffect(streamState.toPublicState()) - XLog.i(this@AppStateMachineImpl.getLog("actor", "New state:${streamState.state}")) + XLog.i(this@AppStateMachineImpl.getLog("eventSharedFlow.onEach", "New state:${streamState.state}")) } - } catch (ignore: CancellationException) { - XLog.w(this@AppStateMachineImpl.getLog("actor.ignore", ignore.toString())) - XLog.w(this@AppStateMachineImpl.getLog("actor.ignore"), ignore) - } catch (throwable: Throwable) { - XLog.e(this@AppStateMachineImpl.getLog("actor.catch", throwable.toString())) - XLog.e(this@AppStateMachineImpl.getLog("actor.catch"), throwable) - streamState = componentError(streamState, FatalError.CoroutineException) - onEffect(streamState.toPublicState()) } + .catch { cause -> + XLog.e(this@AppStateMachineImpl.getLog("eventSharedFlow.catch"), cause) + streamState = componentError(streamState, FatalError.CoroutineException) + onEffect(streamState.toPublicState()) + } + .collect() } - } - init { - XLog.d(getLog("init")) settingsReadOnly.registerChangeListener(settingsListener) broadcastHelper.startListening( onScreenOff = { sendEvent(InternalEvent.ScreenOff) }, @@ -187,13 +166,12 @@ class AppStateMachineImpl( override suspend fun destroy() { XLog.d(getLog("destroy")) sendEvent(InternalEvent.Destroy) - eventChannel.close() httpServer.stop() clientStatistic.destroy() settingsReadOnly.unregisterChangeListener(settingsListener) broadcastHelper.stopListening() connectivityHelper.stopListening() - coroutineScope.cancel(CancellationException("AppStateMachine.destroy")) + coroutineScope.cancel() } private fun onError(appError: AppError) { @@ -247,7 +225,7 @@ class AppStateMachineImpl( httpServer.stop() httpServer.start(streamState.netInterfaces) notificationBitmap.getNotificationBitmap(NotificationBitmap.Type.START).let { bitmap -> - repeat(3) { bitmapChannel.send(bitmap); delay(150) } + repeat(3) { bitmapStateFlow.tryEmit(bitmap); delay(150) } } return streamState.copy(state = StreamState.State.SERVER_STARTED) @@ -270,7 +248,7 @@ class AppStateMachineImpl( val mediaProjection = mediaProjectionHelper.getMediaProjection(intent) val display = ContextCompat.getSystemService(applicationContext, WindowManager::class.java)!!.defaultDisplay - val bitmapCapture = BitmapCapture(display, settingsReadOnly, mediaProjection, bitmapChannel, ::onError) + val bitmapCapture = BitmapCapture(display, settingsReadOnly, mediaProjection, bitmapStateFlow, ::onError) bitmapCapture.start() return streamState.copy( @@ -286,7 +264,7 @@ class AppStateMachineImpl( val state = stopProjection(streamState) if (settingsReadOnly.checkAndChangeAutoChangePinOnStop().not()) notificationBitmap.getNotificationBitmap(NotificationBitmap.Type.START).let { bitmap -> - repeat(3) { bitmapChannel.send(bitmap); delay(150) } + repeat(3) { bitmapStateFlow.tryEmit(bitmap); delay(150) } } return state.copy(state = StreamState.State.SERVER_STARTED) @@ -327,12 +305,12 @@ class AppStateMachineImpl( is RestartReason.SettingsChanged -> notificationBitmap.getNotificationBitmap(NotificationBitmap.Type.RELOAD_PAGE).let { bitmap -> - repeat(3) { bitmapChannel.send(bitmap); delay(150) } + repeat(3) { bitmapStateFlow.tryEmit(bitmap); delay(150) } } is RestartReason.NetworkSettingsChanged -> notificationBitmap.getNotificationBitmap(NotificationBitmap.Type.NEW_ADDRESS).let { bitmap -> - repeat(3) { bitmapChannel.send(bitmap); delay(150) } + repeat(3) { bitmapStateFlow.tryEmit(bitmap); delay(150) } } } diff --git a/data/src/main/kotlin/info/dvkr/screenstream/data/state/helper/BroadcastHelper.kt b/data/src/main/kotlin/info/dvkr/screenstream/data/state/helper/BroadcastHelper.kt index e5bc7ea8..55fb8257 100644 --- a/data/src/main/kotlin/info/dvkr/screenstream/data/state/helper/BroadcastHelper.kt +++ b/data/src/main/kotlin/info/dvkr/screenstream/data/state/helper/BroadcastHelper.kt @@ -9,29 +9,22 @@ import android.net.ConnectivityManager import android.net.wifi.WifiManager import android.os.Build import com.elvishew.xlog.XLog -import info.dvkr.screenstream.data.model.AppError -import info.dvkr.screenstream.data.model.FatalError import info.dvkr.screenstream.data.other.getLog import kotlinx.coroutines.* -internal sealed class BroadcastHelper(context: Context, private val onError: (AppError) -> Unit) { +internal sealed class BroadcastHelper(context: Context) { companion object { - fun getInstance(context: Context, onError: (AppError) -> Unit): BroadcastHelper { + fun getInstance(context: Context): BroadcastHelper { return if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.N) - NougatBroadcastHelper(context, onError) + NougatBroadcastHelper(context) else - LegacyBroadcastHelper(context, onError) + LegacyBroadcastHelper(context) } } protected val applicationContext: Context = context.applicationContext - private val coroutineScope = CoroutineScope( - Job() + Dispatchers.Main.immediate + CoroutineExceptionHandler { _, throwable -> - XLog.e(getLog("onCoroutineException"), throwable) - onError(FatalError.CoroutineException) - } - ) + private val coroutineScope = CoroutineScope(Job() + Dispatchers.Main.immediate) protected abstract val intentFilter: IntentFilter protected abstract val broadcastReceiver: BroadcastReceiver @@ -46,7 +39,7 @@ internal sealed class BroadcastHelper(context: Context, private val onError: (Ap fun stopListening() { applicationContext.unregisterReceiver(broadcastReceiver) - coroutineScope.cancel(CancellationException("BroadcastHelper.destroy")) + coroutineScope.cancel() } protected fun onScreenIntentAction() { @@ -61,7 +54,6 @@ internal sealed class BroadcastHelper(context: Context, private val onError: (Ap isConnectionEventScheduled = true coroutineScope.launch { delay(1000) - ensureActive() isConnectionEventScheduled = false if (isFirstConnectionEvent) isFirstConnectionEvent = false else if (::onConnectionChanged.isInitialized) onConnectionChanged.invoke() @@ -69,8 +61,7 @@ internal sealed class BroadcastHelper(context: Context, private val onError: (Ap } @TargetApi(Build.VERSION_CODES.N) - private class NougatBroadcastHelper(context: Context, onError: (AppError) -> Unit) : - BroadcastHelper(context, onError) { + private class NougatBroadcastHelper(context: Context) : BroadcastHelper(context) { override val intentFilter: IntentFilter by lazy { IntentFilter().apply { @@ -92,8 +83,7 @@ internal sealed class BroadcastHelper(context: Context, private val onError: (Ap } @Suppress("Deprecation") - private class LegacyBroadcastHelper(context: Context, onError: (AppError) -> Unit) : - BroadcastHelper(context, onError) { + private class LegacyBroadcastHelper(context: Context) : BroadcastHelper(context) { override val intentFilter: IntentFilter by lazy { IntentFilter().apply { diff --git a/data/src/main/kotlin/kotlinx/coroutines/flow/FlowFix.kt b/data/src/main/kotlin/kotlinx/coroutines/flow/FlowFix.kt deleted file mode 100644 index 6d2fcd57..00000000 --- a/data/src/main/kotlin/kotlinx/coroutines/flow/FlowFix.kt +++ /dev/null @@ -1,17 +0,0 @@ -package kotlinx.coroutines.flow - -import kotlinx.coroutines.ensureActive -import kotlin.coroutines.coroutineContext - - -/** - * Only proceed with the given action if the coroutine has not been cancelled. - * Necessary because Flow.collect receives items even after coroutine was cancelled - * https://github.com/Kotlin/kotlinx.coroutines/issues/1265 - */ -suspend inline fun Flow.safeCollect(crossinline action: suspend (T) -> Unit) { - collect { - coroutineContext.ensureActive() - action(it) - } -} \ No newline at end of file