Skip to content

Commit

Permalink
rename handler apis for hopefully more clarity …
Browse files Browse the repository at this point in the history
rmgk committed Nov 26, 2024
1 parent 2fc9dff commit f1960a6
Showing 16 changed files with 59 additions and 56 deletions.
Original file line number Diff line number Diff line change
@@ -50,10 +50,10 @@ object JSHttpPseudoChannel {
}

def connect(uri: String, rid: LocalUid): LatentConnection[MessageBuffer] = new LatentConnection[MessageBuffer] {
def prepare(incomingHandler: Handler[MessageBuffer]): Async[Abort, Connection[MessageBuffer]] = Async {
def prepare(receiver: Receive[MessageBuffer]): Async[Abort, Connection[MessageBuffer]] = Async {

val conn = new SSEPseudoConnection(uri, rid)
val cb = incomingHandler.getCallbackFor(conn)
val cb = receiver.messageHandler(conn)

val requestInit = new RequestInit {}.tap: ri =>
ri.method = HttpMethod.GET
Original file line number Diff line number Diff line change
@@ -15,12 +15,12 @@ class BroadcastException(message: String, event: MessageEvent) extends Exception

object BroadcastChannelConnector {
def named(name: String): LatentConnection[MessageBuffer] = new LatentConnection {
override def prepare(incomingHandler: Handler[MessageBuffer]): Async[Abort, Connection[MessageBuffer]] = Async {
override def prepare(incomingHandler: Receive[MessageBuffer]): Async[Abort, Connection[MessageBuffer]] = Async {

val bc = new BroadcastChannel(name)
val connection = BroadcastChannelConnection(bc)

val handler = incomingHandler.getCallbackFor(connection)
val handler = incomingHandler.messageHandler(connection)

bc.onmessage = (event: dom.MessageEvent) =>
println(js.typeOf(event.data))
Original file line number Diff line number Diff line change
@@ -28,7 +28,7 @@ object WebsocketConnect {

def connect(url: String): LatentConnection[MessageBuffer] = new LatentConnection {

override def prepare(incomingHandler: Handler[MessageBuffer]): Async[Abort, Connection[MessageBuffer]] =
override def prepare(incomingHandler: Receive[MessageBuffer]): Async[Abort, Connection[MessageBuffer]] =
Async.fromCallback {

println(s"preparing connection")
@@ -40,7 +40,7 @@ object WebsocketConnect {
println(s"connection opened")

val connect = new WebsocketConnect(socket)
val callback = incomingHandler.getCallbackFor(connect)
val callback = incomingHandler.messageHandler(connect)

socket.onmessage = { (event: dom.MessageEvent) =>
event.data match {
Original file line number Diff line number Diff line change
@@ -79,9 +79,9 @@ object WebRTCConnection {

def openLatent(channel: dom.RTCDataChannel): LatentConnection[MessageBuffer] = new LatentConnection {

def succeedConnection(incoming: Handler[MessageBuffer]) = {
def succeedConnection(incoming: Receive[MessageBuffer]) = {
val connector = new WebRTCConnection(channel)
val handler = incoming.getCallbackFor(connector)
val handler = incoming.messageHandler(connector)

{
channel.onmessage = { (event: dom.MessageEvent) =>
@@ -124,7 +124,7 @@ object WebRTCConnection {
connector
}

override def prepare(incomingHandler: Handler[MessageBuffer]): Async[Abort, Connection[MessageBuffer]] =
override def prepare(incomingHandler: Receive[MessageBuffer]): Async[Abort, Connection[MessageBuffer]] =
Async.fromCallback {

channel.readyState match {
6 changes: 3 additions & 3 deletions Modules/Channels/jvm-native/src/main/scala/channels/TCP.scala
Original file line number Diff line number Diff line change
@@ -17,7 +17,7 @@ object TCP {

def handleConnection(
socket: Socket,
incoming: Handler[MessageBuffer],
incoming: Receive[MessageBuffer],
executionContext: ExecutionContext
): JIOStreamConnection = {
println(s"handling new connection")
@@ -34,7 +34,7 @@ object TCP {

def connect(bindsocket: () => Socket, executionContext: ExecutionContext): LatentConnection[MessageBuffer] =
new LatentConnection {
override def prepare(incoming: Handler[MessageBuffer]): Async[Any, Connection[MessageBuffer]] =
override def prepare(incoming: Receive[MessageBuffer]): Async[Any, Connection[MessageBuffer]] =
TCP.syncAttempt {
println(s"tcp sync attempt")
TCP.handleConnection(bindsocket(), incoming, executionContext)
@@ -57,7 +57,7 @@ object TCP {

def listen(bindsocket: () => ServerSocket, executionContext: ExecutionContext): LatentConnection[MessageBuffer] =
new LatentConnection {
override def prepare(incoming: Handler[MessageBuffer]): Async[Abort, Connection[MessageBuffer]] =
override def prepare(incoming: Receive[MessageBuffer]): Async[Abort, Connection[MessageBuffer]] =
Async.fromCallback { abort ?=>
try

8 changes: 4 additions & 4 deletions Modules/Channels/jvm/src/main/scala/channels/JavaHttp.scala
Original file line number Diff line number Diff line change
@@ -26,7 +26,7 @@ object JavaHttp {

var connections: Map[Uid, Callback[MessageBuffer]] = Map.empty

def prepare(incomingHandler: Handler[MessageBuffer]): Async[Abort, Connection[MessageBuffer]] = Async.fromCallback {
def prepare(receiver: Receive[MessageBuffer]): Async[Abort, Connection[MessageBuffer]] = Async.fromCallback {

addHandler { (exchange: HttpExchange) =>
val requestHeaders = exchange.getRequestHeaders
@@ -56,7 +56,7 @@ object JavaHttp {

SSEServer.this.synchronized {
println(s"made connection for $uid")
connections = connections.updated(uid, incomingHandler.getCallbackFor(conn))
connections = connections.updated(uid, receiver.messageHandler(conn))
}

Async.handler.succeed(conn)
@@ -97,7 +97,7 @@ object JavaHttp {
class SSEClient(client: HttpClient, uri: URI, replicaId: LocalUid, ec: ExecutionContext)
extends LatentConnection[MessageBuffer] {

def prepare(incomingHandler: Handler[MessageBuffer]): Async[Abort, Connection[MessageBuffer]] = Async {
def prepare(receiver: Receive[MessageBuffer]): Async[Abort, Connection[MessageBuffer]] = Async {

val sseRequest = HttpRequest.newBuilder()
.GET()
@@ -115,7 +115,7 @@ object JavaHttp {

val conn = SSEClientConnection(client, uri, replicaId)

ec.execute(() => JioInputStreamAdapter(rec).loopReceive(incomingHandler.getCallbackFor(conn)))
ec.execute(() => JioInputStreamAdapter(rec).loopReceive(receiver.messageHandler(conn)))

println(s"succeeding client")

4 changes: 2 additions & 2 deletions Modules/Channels/jvm/src/main/scala/channels/Jetty.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package channels.jettywebsockets

import channels.{Abort, ArrayMessageBuffer, Connection, LatentConnection, MessageBuffer, Handler as ChannelHandler}
import channels.{Abort, ArrayMessageBuffer, Connection, LatentConnection, MessageBuffer, Receive as ChannelHandler}
import de.rmgk.delay.{Async, syntax, Callback as DelayCallback}
import org.eclipse.jetty.http.pathmap.PathSpec
import org.eclipse.jetty.server.handler.{ContextHandler, ContextHandlerCollection}
@@ -124,7 +124,7 @@ class JettyWsHandler(
override def onWebSocketOpen(session: Session): Unit = {
super.onWebSocketOpen(session)
val sessionWrapper = new JettySessionWrapper(session)
internalCallback = incoming.getCallbackFor(sessionWrapper)
internalCallback = incoming.messageHandler(sessionWrapper)
connectionEstablished.succeed(sessionWrapper)
session.demand()
}
10 changes: 5 additions & 5 deletions Modules/Channels/jvm/src/main/scala/channels/NioTCP.scala
Original file line number Diff line number Diff line change
@@ -12,7 +12,7 @@ import scala.util.{Failure, Success}

case class AcceptAttachment(
callback: Callback[Connection[MessageBuffer]],
incoming: Handler[MessageBuffer],
incoming: Receive[MessageBuffer],
)

case class ReceiveAttachment(
@@ -103,14 +103,14 @@ class NioTCP {

def handleConnection(
clientChannel: SocketChannel,
incoming: Handler[MessageBuffer],
incoming: Receive[MessageBuffer],
): NioTCPConnection = {

configureChannel(clientChannel)

val conn = NioTCPConnection(clientChannel)

val callback = incoming.getCallbackFor(conn)
val callback = incoming.messageHandler(conn)
clientChannel.register(selector, SelectionKey.OP_READ, ReceiveAttachment(callback))
selector.wakeup()

@@ -135,7 +135,7 @@ class NioTCP {
bindsocket: () => SocketChannel,
): LatentConnection[MessageBuffer] =
new LatentConnection {
override def prepare(incoming: Handler[MessageBuffer]): Async[Any, Connection[MessageBuffer]] =
override def prepare(incoming: Receive[MessageBuffer]): Async[Any, Connection[MessageBuffer]] =
TCP.syncAttempt {
handleConnection(bindsocket(), incoming)
}
@@ -177,7 +177,7 @@ class NioTCP {
bindsocket: () => ServerSocketChannel,
): LatentConnection[MessageBuffer] =
new LatentConnection {
override def prepare(incoming: Handler[MessageBuffer]): Async[Abort, Connection[MessageBuffer]] =
override def prepare(incoming: Receive[MessageBuffer]): Async[Abort, Connection[MessageBuffer]] =
Async.fromCallback { abort ?=>
try {
val serverChannel: ServerSocketChannel = bindsocket()
4 changes: 2 additions & 2 deletions Modules/Channels/jvm/src/main/scala/channels/UDP.scala
Original file line number Diff line number Diff line change
@@ -26,7 +26,7 @@ class UDPPseudoConnection(
executionContext: ExecutionContext,
initializeOutbound: Async[Any, SocketAddress],
) extends LatentConnection[MessageBuffer] {
override def prepare(incomingHandler: Handler[MessageBuffer]): Async[Abort, Connection[MessageBuffer]] =
override def prepare(receiver: Receive[MessageBuffer]): Async[Abort, Connection[MessageBuffer]] =
Async.fromCallback[Connection[MessageBuffer]] {

val datagramSocket = socketFactory()
@@ -41,7 +41,7 @@ class UDPPseudoConnection(
sa, {
val dw = UDPDatagramWrapper(sa, datagramSocket)
connectionSuccess.succeed(dw)
val cb = incomingHandler.getCallbackFor(dw)
val cb = receiver.messageHandler(dw)
(dw, cb)
}
)
15 changes: 8 additions & 7 deletions Modules/Channels/shared/src/main/scala/channels/Channels.scala
Original file line number Diff line number Diff line change
@@ -13,7 +13,6 @@ object MessageBuffer {
given Conversion[String, MessageBuffer] = str => ArrayMessageBuffer(str.getBytes(StandardCharsets.UTF_8))
given Conversion[MessageBuffer, String] = buf => new String(buf.asArray, StandardCharsets.UTF_8)

type Handler = Connection[MessageBuffer] => Callback[MessageBuffer]
}

case class ArrayMessageBuffer(inner: Array[Byte]) extends MessageBuffer {
@@ -34,13 +33,15 @@ trait Connection[T] {
def close(): Unit
}

/** Provides a specification how to handle messages, given a connection context. */
trait Handler[T] {
/** Provides a specification how to handle messages, given a connection context.
* Failure calls on the callback generally indicate connection errors on the receiver side.
*/
trait Receive[T] {

/** The provided connection is not guaranteed to be useable until the first message is received.
* If you want to initiate sending messages on this connection, use the value returned by the prepare call of the latent connection instead.
*/
def getCallbackFor(conn: Connection[T]): Callback[T]
def messageHandler(answers: Connection[T]): Callback[T]
}

/** Contains all the information required to try and establish a bidirectional connection.
@@ -60,7 +61,7 @@ trait LatentConnection[T] {
*
* The async may produce multiple connections and will run [[incomingHandler]] for each of them.
*/
def prepare(incomingHandler: Handler[T]): Async[Abort, Connection[T]]
def prepare(receiver: Receive[T]): Async[Abort, Connection[T]]
}

object LatentConnection {
@@ -74,12 +75,12 @@ object LatentConnection {

def adapt[A, B](f: A => B, g: B => A)(latentConnection: LatentConnection[A]): LatentConnection[B] = {
new LatentConnection[B] {
def prepare(incomingHandler: Handler[B]): Async[Abort, Connection[B]] =
def prepare(receiver: Receive[B]): Async[Abort, Connection[B]] =
Async[Abort] {
val conn = Async.bind {
latentConnection.prepare { conn =>
val mapped = EncodingConnection(g, conn)
val cb = incomingHandler.getCallbackFor(mapped)
val cb = receiver.messageHandler(mapped)
rs => cb.complete(rs.map(f))
}
}
Original file line number Diff line number Diff line change
@@ -60,7 +60,7 @@ class JIOStreamConnection(in: InputStream, out: OutputStream, doClose: () => Uni

// frame parsing

def loopHandler(handler: Handler[MessageBuffer]): Unit =
inputStream.loopReceive(handler.getCallbackFor(this))
def loopHandler(handler: Receive[MessageBuffer]): Unit =
inputStream.loopReceive(handler.messageHandler(this))

}
Original file line number Diff line number Diff line change
@@ -17,10 +17,10 @@ class SynchronousLocalConnection[T] {
case class Establish(serverSendsOn: Connection[T], clientConnectionSendsTo: Promise[Callback[T]])
val connectionEstablished: Promise[Callback[Establish]] = Promise()

def prepare(incomingHandler: Handler[T]): Async[Abort, Connection[T]] = Async.fromCallback[Establish] {
def prepare(receiver: Receive[T]): Async[Abort, Connection[T]] = Async.fromCallback[Establish] {
connectionEstablished.succeed(Async.handler)
}.map { connChan =>
connChan.clientConnectionSendsTo.succeed(incomingHandler.getCallbackFor(connChan.serverSendsOn))
connChan.clientConnectionSendsTo.succeed(receiver.messageHandler(connChan.serverSendsOn))
connChan.serverSendsOn
}
}
@@ -43,8 +43,8 @@ class SynchronousLocalConnection[T] {
override def toString: String = s"From[$id]"
}

def prepare(incomingHandler: Handler[T]): Async[Abort, Connection[T]] = Async {
val callback = incomingHandler.getCallbackFor(toServer)
def prepare(receiver: Receive[T]): Async[Abort, Connection[T]] = Async {
val callback = receiver.messageHandler(toServer)

/* This is the connection that is passed to the server, which just calls the callback defined by the handler. */
val toClient = new Connection[T] {
6 changes: 3 additions & 3 deletions Modules/DTN/shared/src/main/scala/dtn/rdt/Channel.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package dtn.rdt

import channels.{Abort, Connection, Handler, LatentConnection}
import channels.{Abort, Connection, Receive, LatentConnection}
import com.github.plokhotnyuk.jsoniter_scala.core.{JsonValueCodec, readFromArray, writeToArray}
import de.rmgk.delay
import de.rmgk.delay.syntax.toAsync
@@ -57,11 +57,11 @@ class Channel[T: JsonValueCodec](
// If the local dtnd could be stopped and restarted without loosing data, this id should remain the same for performance reasons, but it will be correct even if it changes.
val dtnid = Uid.gen()

override def prepare(incomingHandler: Handler[ProtocolMessage[T]]): Async[Abort, Connection[ProtocolMessage[T]]] =
override def prepare(receiver: Receive[ProtocolMessage[T]]): Async[Abort, Connection[ProtocolMessage[T]]] =
Async {
val client: Client = Client(host, port, appName, monitoringClient).toAsync(using ec).bind
val conn = ClientContext[T](client, ec, operationMode)
val cb = incomingHandler.getCallbackFor(conn)
val cb = receiver.messageHandler(conn)

client.registerOnReceive { (message_type: RdtMessageType, payload: Array[Byte], dots: Dots) =>
message_type match
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package todo

import channels.{Abort, ArrayMessageBuffer, Connection, Handler, LatentConnection, MessageBuffer}
import channels.{Abort, ArrayMessageBuffer, Connection, Receive, LatentConnection, MessageBuffer}
import de.rmgk.delay.{Async, Sync}

import scala.scalajs.js
@@ -28,9 +28,9 @@ object WebviewAdapterChannel {
}

def listen(): LatentConnection[MessageBuffer] = new LatentConnection {
def prepare(incomingHandler: Handler[MessageBuffer]): Async[Abort, Connection[MessageBuffer]] = Sync {
def prepare(incomingHandler: Receive[MessageBuffer]): Async[Abort, Connection[MessageBuffer]] = Sync {
val conn = WebviewConnectionContext
val cb = incomingHandler.getCallbackFor(conn)
val cb = incomingHandler.messageHandler(conn)
receiveCallback = { (msg: String) =>
val bytes = java.util.Base64.getDecoder.decode(msg)
cb.succeed(ArrayMessageBuffer(bytes))
Original file line number Diff line number Diff line change
@@ -98,7 +98,13 @@ class DeltaDissemination[State](
}

def addLatentConnection(latentConnection: LatentConnection[ProtocolMessage[State]]): Unit = {
latentConnection.prepare(conn => messageBufferCallback(conn)).run(using globalAbort) {
val preparedConnection = latentConnection.prepare { from =>
{
case Success(msg) => handleMessage(msg, from)
case Failure(error) => error.printStackTrace()
}
}
preparedConnection.run(using globalAbort) {
case Success(conn) =>
lock.synchronized {
connections = conn :: connections
@@ -134,20 +140,16 @@ class DeltaDissemination[State](
contexts = contexts.updatedWith(rr)(curr => curr `merge` Some(dots))
}

private def messageBufferCallback(outChan: ConnectionContext): Callback[ProtocolMessage[State]] =
case Success(msg) => handleMessage(msg, outChan)
case Failure(error) => error.printStackTrace()

def handleMessage(msg: ProtocolMessage[State], biChan: ConnectionContext): Unit = {
def handleMessage(msg: ProtocolMessage[State], from: ConnectionContext): Unit = {
msg match
case Ping(time) =>
biChan.send(Pong(time)).run(debugCallbackAndRemoveCon(biChan))
from.send(Pong(time)).run(debugCallbackAndRemoveCon(from))
case Pong(time) =>
println(s"ping took ${(System.nanoTime() - time.toLong).doubleValue / 1000_000}ms")
case Request(uid, knows) =>
val relevant = pastPayloads.filterNot { dt => dt.dots <= knows }
relevant.foreach: msg =>
biChan.send(msg.addSender(replicaId.uid)).run(using ())(debugCallbackAndRemoveCon(biChan))
from.send(msg.addSender(replicaId.uid)).run(using ())(debugCallbackAndRemoveCon(from))
updateContext(uid, selfContext `merge` knows)
case payload @ Payload(uid, context, data) =>
if context <= selfContext then return
@@ -160,7 +162,7 @@ class DeltaDissemination[State](
}
receiveCallback(data)
if immediateForward then
disseminate(payload, Set(biChan))
disseminate(payload, Set(from))

}

Loading

0 comments on commit f1960a6

Please sign in to comment.