diff --git a/sse/src/main/kotlin/io/ktor/samples/sse/SseApplication.kt b/sse/src/main/kotlin/io/ktor/samples/sse/SseApplication.kt index d8db8f4f..8645bb0c 100644 --- a/sse/src/main/kotlin/io/ktor/samples/sse/SseApplication.kt +++ b/sse/src/main/kotlin/io/ktor/samples/sse/SseApplication.kt @@ -7,14 +7,17 @@ import io.ktor.server.engine.* import io.ktor.server.netty.* import io.ktor.server.response.* import io.ktor.server.routing.* +import io.ktor.util.cio.* +import io.ktor.utils.io.* import kotlinx.coroutines.* -import kotlinx.coroutines.channels.* +import kotlinx.coroutines.flow.* +import kotlin.time.Duration.Companion.seconds /** * An SSE (Server-Sent Events) sample application. * This is the main entrypoint of the application. */ -@OptIn(ExperimentalCoroutinesApi::class, ObsoleteCoroutinesApi::class) +@OptIn(ExperimentalCoroutinesApi::class) fun main() { /** * Here we create and start a Netty embedded server listening to the port 8080 @@ -29,25 +32,25 @@ fun main() { data class SseEvent(val data: String, val event: String? = null, val id: String? = null) /** - * Method that responds an [ApplicationCall] by reading all the [SseEvent]s from the specified [events] [ReceiveChannel] + * Method that responds an [ApplicationCall] by reading all the [SseEvent]s from the specified [eventFlow] [Flow] * and serializing them in a way that is compatible with the Server-Sent Events specification. * * You can read more about it here: https://www.html5rocks.com/en/tutorials/eventsource/basics/ */ -suspend fun ApplicationCall.respondSse(events: ReceiveChannel) { +suspend fun ApplicationCall.respondSse(eventFlow: Flow) { response.cacheControl(CacheControl.NoCache(null)) - respondTextWriter(contentType = ContentType.Text.EventStream) { - for (event in events) { + respondBytesWriter(contentType = ContentType.Text.EventStream) { + eventFlow.collect { event -> if (event.id != null) { - write("id: ${event.id}\n") + writeStringUtf8("id: ${event.id}\n") } if (event.event != null) { - write("event: ${event.event}\n") + writeStringUtf8("event: ${event.event}\n") } for (dataLine in event.data.lines()) { - write("data: $dataLine\n") + writeStringUtf8("data: $dataLine\n") } - write("\n") + writeStringUtf8("\n") flush() } } @@ -55,17 +58,17 @@ suspend fun ApplicationCall.respondSse(events: ReceiveChannel) { fun Application.module() { /** - * We produce a [BroadcastChannel] from a suspending function - * that send a [SseEvent] instance each second. + * We produce a [SharedFlow] from a function + * that sends an [SseEvent] instance each second. */ - val channel = produce { // this: ProducerScope -> + val sseFlow = flow { var n = 0 while (true) { - send(SseEvent("demo$n")) - delay(1000) + emit(SseEvent("demo$n")) + delay(1.seconds) n++ } - }.broadcast() + }.shareIn(GlobalScope, SharingStarted.Eagerly) /** * We use the [Routing] plugin to declare [Route] that will be @@ -75,15 +78,10 @@ fun Application.module() { /** * Route to be executed when the client perform a GET `/sse` request. * It will respond using the [respondSse] extension method defined in this same file - * that uses the [BroadcastChannel] channel we created earlier to emit those events. + * that uses the [SharedFlow] to collect sse events. */ get("/sse") { - val events = channel.openSubscription() - try { - call.respondSse(events) - } finally { - events.cancel() - } + call.respondSse(sseFlow) } /** * Route to be executed when the client perform a GET `/` request.