Skip to content

Commit

Permalink
Updated SSE Example to utilize SharedFlow rather than Deprecated Broa…
Browse files Browse the repository at this point in the history
…dcastChannels (#172)

* Updated Sse example application to replaced deprecated Coroutines api with Shared Flow.

* Changed responseTextWriter to respondBytesWriter. Updated write method uses to writeStringUtf8 to avoid blocking in Java applications.

---------

Co-authored-by: johnflynn <[email protected]>
  • Loading branch information
Neuman968 and jf9327 authored May 25, 2023
1 parent e8f8b15 commit a4444e0
Showing 1 changed file with 21 additions and 23 deletions.
44 changes: 21 additions & 23 deletions sse/src/main/kotlin/io/ktor/samples/sse/SseApplication.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,17 @@ import io.ktor.server.engine.*
import io.ktor.server.netty.*
import io.ktor.server.response.*
import io.ktor.server.routing.*
import io.ktor.util.cio.*
import io.ktor.utils.io.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import kotlin.time.Duration.Companion.seconds

/**
* An SSE (Server-Sent Events) sample application.
* This is the main entrypoint of the application.
*/
@OptIn(ExperimentalCoroutinesApi::class, ObsoleteCoroutinesApi::class)
@OptIn(ExperimentalCoroutinesApi::class)
fun main() {
/**
* Here we create and start a Netty embedded server listening to the port 8080
Expand All @@ -29,43 +32,43 @@ fun main() {
data class SseEvent(val data: String, val event: String? = null, val id: String? = null)

/**
* Method that responds an [ApplicationCall] by reading all the [SseEvent]s from the specified [events] [ReceiveChannel]
* Method that responds an [ApplicationCall] by reading all the [SseEvent]s from the specified [eventFlow] [Flow]
* and serializing them in a way that is compatible with the Server-Sent Events specification.
*
* You can read more about it here: https://www.html5rocks.com/en/tutorials/eventsource/basics/
*/
suspend fun ApplicationCall.respondSse(events: ReceiveChannel<SseEvent>) {
suspend fun ApplicationCall.respondSse(eventFlow: Flow<SseEvent>) {
response.cacheControl(CacheControl.NoCache(null))
respondTextWriter(contentType = ContentType.Text.EventStream) {
for (event in events) {
respondBytesWriter(contentType = ContentType.Text.EventStream) {
eventFlow.collect { event ->
if (event.id != null) {
write("id: ${event.id}\n")
writeStringUtf8("id: ${event.id}\n")
}
if (event.event != null) {
write("event: ${event.event}\n")
writeStringUtf8("event: ${event.event}\n")
}
for (dataLine in event.data.lines()) {
write("data: $dataLine\n")
writeStringUtf8("data: $dataLine\n")
}
write("\n")
writeStringUtf8("\n")
flush()
}
}
}

fun Application.module() {
/**
* We produce a [BroadcastChannel] from a suspending function
* that send a [SseEvent] instance each second.
* We produce a [SharedFlow] from a function
* that sends an [SseEvent] instance each second.
*/
val channel = produce { // this: ProducerScope<SseEvent> ->
val sseFlow = flow {
var n = 0
while (true) {
send(SseEvent("demo$n"))
delay(1000)
emit(SseEvent("demo$n"))
delay(1.seconds)
n++
}
}.broadcast()
}.shareIn(GlobalScope, SharingStarted.Eagerly)

/**
* We use the [Routing] plugin to declare [Route] that will be
Expand All @@ -75,15 +78,10 @@ fun Application.module() {
/**
* Route to be executed when the client perform a GET `/sse` request.
* It will respond using the [respondSse] extension method defined in this same file
* that uses the [BroadcastChannel] channel we created earlier to emit those events.
* that uses the [SharedFlow] to collect sse events.
*/
get("/sse") {
val events = channel.openSubscription()
try {
call.respondSse(events)
} finally {
events.cancel()
}
call.respondSse(sseFlow)
}
/**
* Route to be executed when the client perform a GET `/` request.
Expand Down

0 comments on commit a4444e0

Please sign in to comment.