Skip to content

Commit

Permalink
I did not expect this to work first try …
Browse files Browse the repository at this point in the history
  • Loading branch information
rmgk committed Jul 30, 2024
1 parent cea8d6c commit bdb20be
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package channels

import channels.MesageBufferExtensions.asArrayBuffer
import de.rmgk.delay.syntax.toAsync
import de.rmgk.delay.{Async, Callback}
import org.scalajs.dom.{EventSource, Headers, HttpMethod, MessageEvent, ReadableStream, ReadableStreamReader, RequestInit, fetch}
import rdts.base.LocalUid

import java.net.URI
import java.nio.ByteBuffer
import scala.concurrent.ExecutionContext.Implicits.global
import scala.scalajs.js.typedarray.{Int8Array, Uint8Array}
import scala.util.chaining.scalaUtilChainingOps

object JSHttpPseudoChannel {

class SSEPseudoConnection(uri: String, rid: LocalUid) extends Connection[MessageBuffer] {
override def send(message: MessageBuffer): Async[Any, Unit] = Async {

val requestInit = new RequestInit {}.tap: ri =>
ri.method = HttpMethod.POST
ri.body = message.asArrayBuffer
ri.headers = Headers().tap: hi =>
hi.set("x-replica-id", rid.uid.delegate)

val res = fetch(uri, requestInit).toFuture.toAsync.bind
}
override def close(): Unit = ()
}

class StreamConsumer(reader: ReadableStreamReader[Uint8Array], cb: Callback[MessageBuffer]) {
var buffer: Array[Byte] = Array.empty

def loop(): Async[Any, Unit] = Async {
val chunk = reader.read().toFuture.toAsync.bind
val input = chunk.value
buffer = buffer.appendedAll(new Int8Array(input.buffer, input.byteOffset, input.length).toArray)

if buffer.size >= 4 then
val len = ByteBuffer.wrap(buffer.slice(0, 4)).getInt()
if buffer.size >= len + 4 then
val mb = ArrayMessageBuffer(buffer.slice(4, len + 4))
buffer = buffer.slice(len + 4, buffer.length)
cb.succeed(mb)

loop().bind

}

}

def connect(uri: String, rid: LocalUid): LatentConnection[MessageBuffer] = new LatentConnection[MessageBuffer] {
def prepare(incomingHandler: Handler[MessageBuffer]): Async[Abort, Connection[MessageBuffer]] = Async {

val conn = new SSEPseudoConnection(uri, rid)
val cb = incomingHandler.getCallbackFor(conn)

val requestInit = new RequestInit {}.tap: ri =>
ri.method = HttpMethod.GET
ri.headers = Headers().tap: hi =>
hi.set("x-replica-id", rid.uid.delegate)
hi.set("Accept", "text/event-stream")

val res = fetch(uri, requestInit).toFuture.toAsync.bind

val reader = res.body.getReader()

StreamConsumer(reader, cb).loop().run(using ())(_ => ())

conn

}

}

}
18 changes: 18 additions & 0 deletions Modules/Channels/js/src/test/scala/FetchConnectTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import channels.{Abort, JSHttpPseudoChannel}
import rdts.base.LocalUid

import scala.util.{Failure, Success}

object FetchConnectTest {

def main(args: Array[String]): Unit = {
JSHttpPseudoChannel.connect(s"http://localhost:58080/channel", LocalUid.gen()).prepare { conn => {
case Success(msg) => println(msg.convert: String)
case Failure(ex) => ex.printStackTrace
}}.run(using Abort()) {
case Success(conn) =>
case Failure(ex) => ex.printStackTrace()
}
}

}
9 changes: 8 additions & 1 deletion Modules/Channels/jvm/src/main/scala/channels/JavaHttp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ object JavaHttp {

addHandler { (exchange: HttpExchange) =>
val requestHeaders = exchange.getRequestHeaders
exchange.getResponseHeaders.add("Access-Control-Allow-Origin", "*")
exchange.getResponseHeaders.add("Access-Control-Allow-Methods", "POST, GET")
exchange.getResponseHeaders.add("Access-Control-Allow-Headers", "x-replica-id")
val uid = Option(requestHeaders.get(replicaIdHeader)).flatMap(_.asScala.headOption) match
case None =>
println(s"no replica ID on request?")
Expand All @@ -54,7 +57,8 @@ object JavaHttp {
}

Async.handler.succeed(conn)
else
else if exchange.getRequestMethod == "POST"
then
SSEServer.this.synchronized {
connections.get(uid)
} match
Expand All @@ -65,6 +69,9 @@ object JavaHttp {
cb.succeed(ArrayMessageBuffer(exchange.getRequestBody.readAllBytes()))
exchange.sendResponseHeaders(200, 0)
exchange.close()
else
exchange.sendResponseHeaders(200, 0)
exchange.close()
}
}
}
Expand Down
27 changes: 27 additions & 0 deletions Modules/Channels/jvm/src/test/scala/channels/JavaHttpServer.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package channels

import channels.JavaHttp.SSEServer
import channels.MessageBuffer.given_Conversion_String_MessageBuffer
import com.sun.net.httpserver.HttpServer

import java.net.InetSocketAddress
import scala.util.{Failure, Success}

object JavaHttpServerTest {

def main(args: Array[String]): Unit = {

val server = HttpServer.create(InetSocketAddress(58080), 0)

val conn = SSEServer(handler => server.createContext("/channel", handler))

conn.prepare(inc => msg => println(msg)).run(using Abort()):
case Failure(ex) => ex.printStackTrace()
case Success(conn) =>
println(s"received connection, replying")
conn.send("yay!".convert).run(using Abort())(res => println(res))

server.start()

}
}

0 comments on commit bdb20be

Please sign in to comment.