diff --git a/Modules/Channels/js/src/main/scala/channels/JSHttpPseudoChannel.scala b/Modules/Channels/js/src/main/scala/channels/JSHttpPseudoChannel.scala index 8b8dd0d06..d07dbeccf 100644 --- a/Modules/Channels/js/src/main/scala/channels/JSHttpPseudoChannel.scala +++ b/Modules/Channels/js/src/main/scala/channels/JSHttpPseudoChannel.scala @@ -50,10 +50,10 @@ object JSHttpPseudoChannel { } def connect(uri: String, rid: LocalUid): LatentConnection[MessageBuffer] = new LatentConnection[MessageBuffer] { - def prepare(incomingHandler: Handler[MessageBuffer]): Async[Abort, Connection[MessageBuffer]] = Async { + def prepare(receiver: Receive[MessageBuffer]): Async[Abort, Connection[MessageBuffer]] = Async { val conn = new SSEPseudoConnection(uri, rid) - val cb = incomingHandler.getCallbackFor(conn) + val cb = receiver.messageHandler(conn) val requestInit = new RequestInit {}.tap: ri => ri.method = HttpMethod.GET diff --git a/Modules/Channels/js/src/main/scala/channels/broadcastchannel/BroadcastChannel.scala b/Modules/Channels/js/src/main/scala/channels/broadcastchannel/BroadcastChannel.scala index c5ac970c2..09220cde6 100644 --- a/Modules/Channels/js/src/main/scala/channels/broadcastchannel/BroadcastChannel.scala +++ b/Modules/Channels/js/src/main/scala/channels/broadcastchannel/BroadcastChannel.scala @@ -15,12 +15,12 @@ class BroadcastException(message: String, event: MessageEvent) extends Exception object BroadcastChannelConnector { def named(name: String): LatentConnection[MessageBuffer] = new LatentConnection { - override def prepare(incomingHandler: Handler[MessageBuffer]): Async[Abort, Connection[MessageBuffer]] = Async { + override def prepare(incomingHandler: Receive[MessageBuffer]): Async[Abort, Connection[MessageBuffer]] = Async { val bc = new BroadcastChannel(name) val connection = BroadcastChannelConnection(bc) - val handler = incomingHandler.getCallbackFor(connection) + val handler = incomingHandler.messageHandler(connection) bc.onmessage = (event: dom.MessageEvent) => println(js.typeOf(event.data)) diff --git a/Modules/Channels/js/src/main/scala/channels/webnativewebsockets/WSConnector.scala b/Modules/Channels/js/src/main/scala/channels/webnativewebsockets/WSConnector.scala index f3cd2ab70..240b6df1b 100644 --- a/Modules/Channels/js/src/main/scala/channels/webnativewebsockets/WSConnector.scala +++ b/Modules/Channels/js/src/main/scala/channels/webnativewebsockets/WSConnector.scala @@ -28,7 +28,7 @@ object WebsocketConnect { def connect(url: String): LatentConnection[MessageBuffer] = new LatentConnection { - override def prepare(incomingHandler: Handler[MessageBuffer]): Async[Abort, Connection[MessageBuffer]] = + override def prepare(incomingHandler: Receive[MessageBuffer]): Async[Abort, Connection[MessageBuffer]] = Async.fromCallback { println(s"preparing connection") @@ -40,7 +40,7 @@ object WebsocketConnect { println(s"connection opened") val connect = new WebsocketConnect(socket) - val callback = incomingHandler.getCallbackFor(connect) + val callback = incomingHandler.messageHandler(connect) socket.onmessage = { (event: dom.MessageEvent) => event.data match { diff --git a/Modules/Channels/js/src/main/scala/channels/webrtc/WebRTCConnection.scala b/Modules/Channels/js/src/main/scala/channels/webrtc/WebRTCConnection.scala index e3acacb32..78b051f17 100644 --- a/Modules/Channels/js/src/main/scala/channels/webrtc/WebRTCConnection.scala +++ b/Modules/Channels/js/src/main/scala/channels/webrtc/WebRTCConnection.scala @@ -79,9 +79,9 @@ object WebRTCConnection { def openLatent(channel: dom.RTCDataChannel): LatentConnection[MessageBuffer] = new LatentConnection { - def succeedConnection(incoming: Handler[MessageBuffer]) = { + def succeedConnection(incoming: Receive[MessageBuffer]) = { val connector = new WebRTCConnection(channel) - val handler = incoming.getCallbackFor(connector) + val handler = incoming.messageHandler(connector) { channel.onmessage = { (event: dom.MessageEvent) => @@ -124,7 +124,7 @@ object WebRTCConnection { connector } - override def prepare(incomingHandler: Handler[MessageBuffer]): Async[Abort, Connection[MessageBuffer]] = + override def prepare(incomingHandler: Receive[MessageBuffer]): Async[Abort, Connection[MessageBuffer]] = Async.fromCallback { channel.readyState match { diff --git a/Modules/Channels/jvm-native/src/main/scala/channels/TCP.scala b/Modules/Channels/jvm-native/src/main/scala/channels/TCP.scala index 3bfa72505..a59e2911a 100644 --- a/Modules/Channels/jvm-native/src/main/scala/channels/TCP.scala +++ b/Modules/Channels/jvm-native/src/main/scala/channels/TCP.scala @@ -17,7 +17,7 @@ object TCP { def handleConnection( socket: Socket, - incoming: Handler[MessageBuffer], + incoming: Receive[MessageBuffer], executionContext: ExecutionContext ): JIOStreamConnection = { println(s"handling new connection") @@ -34,7 +34,7 @@ object TCP { def connect(bindsocket: () => Socket, executionContext: ExecutionContext): LatentConnection[MessageBuffer] = new LatentConnection { - override def prepare(incoming: Handler[MessageBuffer]): Async[Any, Connection[MessageBuffer]] = + override def prepare(incoming: Receive[MessageBuffer]): Async[Any, Connection[MessageBuffer]] = TCP.syncAttempt { println(s"tcp sync attempt") TCP.handleConnection(bindsocket(), incoming, executionContext) @@ -57,7 +57,7 @@ object TCP { def listen(bindsocket: () => ServerSocket, executionContext: ExecutionContext): LatentConnection[MessageBuffer] = new LatentConnection { - override def prepare(incoming: Handler[MessageBuffer]): Async[Abort, Connection[MessageBuffer]] = + override def prepare(incoming: Receive[MessageBuffer]): Async[Abort, Connection[MessageBuffer]] = Async.fromCallback { abort ?=> try diff --git a/Modules/Channels/jvm/src/main/scala/channels/JavaHttp.scala b/Modules/Channels/jvm/src/main/scala/channels/JavaHttp.scala index 6f0bf1474..c12d9f70f 100644 --- a/Modules/Channels/jvm/src/main/scala/channels/JavaHttp.scala +++ b/Modules/Channels/jvm/src/main/scala/channels/JavaHttp.scala @@ -26,7 +26,7 @@ object JavaHttp { var connections: Map[Uid, Callback[MessageBuffer]] = Map.empty - def prepare(incomingHandler: Handler[MessageBuffer]): Async[Abort, Connection[MessageBuffer]] = Async.fromCallback { + def prepare(receiver: Receive[MessageBuffer]): Async[Abort, Connection[MessageBuffer]] = Async.fromCallback { addHandler { (exchange: HttpExchange) => val requestHeaders = exchange.getRequestHeaders @@ -56,7 +56,7 @@ object JavaHttp { SSEServer.this.synchronized { println(s"made connection for $uid") - connections = connections.updated(uid, incomingHandler.getCallbackFor(conn)) + connections = connections.updated(uid, receiver.messageHandler(conn)) } Async.handler.succeed(conn) @@ -97,7 +97,7 @@ object JavaHttp { class SSEClient(client: HttpClient, uri: URI, replicaId: LocalUid, ec: ExecutionContext) extends LatentConnection[MessageBuffer] { - def prepare(incomingHandler: Handler[MessageBuffer]): Async[Abort, Connection[MessageBuffer]] = Async { + def prepare(receiver: Receive[MessageBuffer]): Async[Abort, Connection[MessageBuffer]] = Async { val sseRequest = HttpRequest.newBuilder() .GET() @@ -115,7 +115,7 @@ object JavaHttp { val conn = SSEClientConnection(client, uri, replicaId) - ec.execute(() => JioInputStreamAdapter(rec).loopReceive(incomingHandler.getCallbackFor(conn))) + ec.execute(() => JioInputStreamAdapter(rec).loopReceive(receiver.messageHandler(conn))) println(s"succeeding client") diff --git a/Modules/Channels/jvm/src/main/scala/channels/Jetty.scala b/Modules/Channels/jvm/src/main/scala/channels/Jetty.scala index e70932ae1..36e884780 100644 --- a/Modules/Channels/jvm/src/main/scala/channels/Jetty.scala +++ b/Modules/Channels/jvm/src/main/scala/channels/Jetty.scala @@ -1,6 +1,6 @@ package channels.jettywebsockets -import channels.{Abort, ArrayMessageBuffer, Connection, LatentConnection, MessageBuffer, Handler as ChannelHandler} +import channels.{Abort, ArrayMessageBuffer, Connection, LatentConnection, MessageBuffer, Receive as ChannelHandler} import de.rmgk.delay.{Async, syntax, Callback as DelayCallback} import org.eclipse.jetty.http.pathmap.PathSpec import org.eclipse.jetty.server.handler.{ContextHandler, ContextHandlerCollection} @@ -124,7 +124,7 @@ class JettyWsHandler( override def onWebSocketOpen(session: Session): Unit = { super.onWebSocketOpen(session) val sessionWrapper = new JettySessionWrapper(session) - internalCallback = incoming.getCallbackFor(sessionWrapper) + internalCallback = incoming.messageHandler(sessionWrapper) connectionEstablished.succeed(sessionWrapper) session.demand() } diff --git a/Modules/Channels/jvm/src/main/scala/channels/NioTCP.scala b/Modules/Channels/jvm/src/main/scala/channels/NioTCP.scala index dd30c3f05..a3c8cc31f 100644 --- a/Modules/Channels/jvm/src/main/scala/channels/NioTCP.scala +++ b/Modules/Channels/jvm/src/main/scala/channels/NioTCP.scala @@ -12,7 +12,7 @@ import scala.util.{Failure, Success} case class AcceptAttachment( callback: Callback[Connection[MessageBuffer]], - incoming: Handler[MessageBuffer], + incoming: Receive[MessageBuffer], ) case class ReceiveAttachment( @@ -103,14 +103,14 @@ class NioTCP { def handleConnection( clientChannel: SocketChannel, - incoming: Handler[MessageBuffer], + incoming: Receive[MessageBuffer], ): NioTCPConnection = { configureChannel(clientChannel) val conn = NioTCPConnection(clientChannel) - val callback = incoming.getCallbackFor(conn) + val callback = incoming.messageHandler(conn) clientChannel.register(selector, SelectionKey.OP_READ, ReceiveAttachment(callback)) selector.wakeup() @@ -135,7 +135,7 @@ class NioTCP { bindsocket: () => SocketChannel, ): LatentConnection[MessageBuffer] = new LatentConnection { - override def prepare(incoming: Handler[MessageBuffer]): Async[Any, Connection[MessageBuffer]] = + override def prepare(incoming: Receive[MessageBuffer]): Async[Any, Connection[MessageBuffer]] = TCP.syncAttempt { handleConnection(bindsocket(), incoming) } @@ -177,7 +177,7 @@ class NioTCP { bindsocket: () => ServerSocketChannel, ): LatentConnection[MessageBuffer] = new LatentConnection { - override def prepare(incoming: Handler[MessageBuffer]): Async[Abort, Connection[MessageBuffer]] = + override def prepare(incoming: Receive[MessageBuffer]): Async[Abort, Connection[MessageBuffer]] = Async.fromCallback { abort ?=> try { val serverChannel: ServerSocketChannel = bindsocket() diff --git a/Modules/Channels/jvm/src/main/scala/channels/UDP.scala b/Modules/Channels/jvm/src/main/scala/channels/UDP.scala index f61f82dc1..da35d39af 100644 --- a/Modules/Channels/jvm/src/main/scala/channels/UDP.scala +++ b/Modules/Channels/jvm/src/main/scala/channels/UDP.scala @@ -26,7 +26,7 @@ class UDPPseudoConnection( executionContext: ExecutionContext, initializeOutbound: Async[Any, SocketAddress], ) extends LatentConnection[MessageBuffer] { - override def prepare(incomingHandler: Handler[MessageBuffer]): Async[Abort, Connection[MessageBuffer]] = + override def prepare(receiver: Receive[MessageBuffer]): Async[Abort, Connection[MessageBuffer]] = Async.fromCallback[Connection[MessageBuffer]] { val datagramSocket = socketFactory() @@ -41,7 +41,7 @@ class UDPPseudoConnection( sa, { val dw = UDPDatagramWrapper(sa, datagramSocket) connectionSuccess.succeed(dw) - val cb = incomingHandler.getCallbackFor(dw) + val cb = receiver.messageHandler(dw) (dw, cb) } ) diff --git a/Modules/Channels/shared/src/main/scala/channels/Channels.scala b/Modules/Channels/shared/src/main/scala/channels/Channels.scala index 535b1a704..5f96166d9 100644 --- a/Modules/Channels/shared/src/main/scala/channels/Channels.scala +++ b/Modules/Channels/shared/src/main/scala/channels/Channels.scala @@ -13,7 +13,6 @@ object MessageBuffer { given Conversion[String, MessageBuffer] = str => ArrayMessageBuffer(str.getBytes(StandardCharsets.UTF_8)) given Conversion[MessageBuffer, String] = buf => new String(buf.asArray, StandardCharsets.UTF_8) - type Handler = Connection[MessageBuffer] => Callback[MessageBuffer] } case class ArrayMessageBuffer(inner: Array[Byte]) extends MessageBuffer { @@ -34,13 +33,15 @@ trait Connection[T] { def close(): Unit } -/** Provides a specification how to handle messages, given a connection context. */ -trait Handler[T] { +/** Provides a specification how to handle messages, given a connection context. + * Failure calls on the callback generally indicate connection errors on the receiver side. + */ +trait Receive[T] { /** The provided connection is not guaranteed to be useable until the first message is received. * If you want to initiate sending messages on this connection, use the value returned by the prepare call of the latent connection instead. */ - def getCallbackFor(conn: Connection[T]): Callback[T] + def messageHandler(answers: Connection[T]): Callback[T] } /** Contains all the information required to try and establish a bidirectional connection. @@ -60,7 +61,7 @@ trait LatentConnection[T] { * * The async may produce multiple connections and will run [[incomingHandler]] for each of them. */ - def prepare(incomingHandler: Handler[T]): Async[Abort, Connection[T]] + def prepare(receiver: Receive[T]): Async[Abort, Connection[T]] } object LatentConnection { @@ -74,12 +75,12 @@ object LatentConnection { def adapt[A, B](f: A => B, g: B => A)(latentConnection: LatentConnection[A]): LatentConnection[B] = { new LatentConnection[B] { - def prepare(incomingHandler: Handler[B]): Async[Abort, Connection[B]] = + def prepare(receiver: Receive[B]): Async[Abort, Connection[B]] = Async[Abort] { val conn = Async.bind { latentConnection.prepare { conn => val mapped = EncodingConnection(g, conn) - val cb = incomingHandler.getCallbackFor(mapped) + val cb = receiver.messageHandler(mapped) rs => cb.complete(rs.map(f)) } } diff --git a/Modules/Channels/shared/src/main/scala/channels/JIOStreamConnection.scala b/Modules/Channels/shared/src/main/scala/channels/JIOStreamConnection.scala index 311e0de63..d00e8e1aa 100644 --- a/Modules/Channels/shared/src/main/scala/channels/JIOStreamConnection.scala +++ b/Modules/Channels/shared/src/main/scala/channels/JIOStreamConnection.scala @@ -60,7 +60,7 @@ class JIOStreamConnection(in: InputStream, out: OutputStream, doClose: () => Uni // frame parsing - def loopHandler(handler: Handler[MessageBuffer]): Unit = - inputStream.loopReceive(handler.getCallbackFor(this)) + def loopHandler(handler: Receive[MessageBuffer]): Unit = + inputStream.loopReceive(handler.messageHandler(this)) } diff --git a/Modules/Channels/shared/src/main/scala/channels/SynchronousLocalConnection.scala b/Modules/Channels/shared/src/main/scala/channels/SynchronousLocalConnection.scala index cc2909d67..63f08e157 100644 --- a/Modules/Channels/shared/src/main/scala/channels/SynchronousLocalConnection.scala +++ b/Modules/Channels/shared/src/main/scala/channels/SynchronousLocalConnection.scala @@ -17,10 +17,10 @@ class SynchronousLocalConnection[T] { case class Establish(serverSendsOn: Connection[T], clientConnectionSendsTo: Promise[Callback[T]]) val connectionEstablished: Promise[Callback[Establish]] = Promise() - def prepare(incomingHandler: Handler[T]): Async[Abort, Connection[T]] = Async.fromCallback[Establish] { + def prepare(receiver: Receive[T]): Async[Abort, Connection[T]] = Async.fromCallback[Establish] { connectionEstablished.succeed(Async.handler) }.map { connChan => - connChan.clientConnectionSendsTo.succeed(incomingHandler.getCallbackFor(connChan.serverSendsOn)) + connChan.clientConnectionSendsTo.succeed(receiver.messageHandler(connChan.serverSendsOn)) connChan.serverSendsOn } } @@ -43,8 +43,8 @@ class SynchronousLocalConnection[T] { override def toString: String = s"From[$id]" } - def prepare(incomingHandler: Handler[T]): Async[Abort, Connection[T]] = Async { - val callback = incomingHandler.getCallbackFor(toServer) + def prepare(receiver: Receive[T]): Async[Abort, Connection[T]] = Async { + val callback = receiver.messageHandler(toServer) /* This is the connection that is passed to the server, which just calls the callback defined by the handler. */ val toClient = new Connection[T] { diff --git a/Modules/DTN/shared/src/main/scala/dtn/rdt/Channel.scala b/Modules/DTN/shared/src/main/scala/dtn/rdt/Channel.scala index 985c50db0..0f711744f 100644 --- a/Modules/DTN/shared/src/main/scala/dtn/rdt/Channel.scala +++ b/Modules/DTN/shared/src/main/scala/dtn/rdt/Channel.scala @@ -1,6 +1,6 @@ package dtn.rdt -import channels.{Abort, Connection, Handler, LatentConnection} +import channels.{Abort, Connection, Receive, LatentConnection} import com.github.plokhotnyuk.jsoniter_scala.core.{JsonValueCodec, readFromArray, writeToArray} import de.rmgk.delay import de.rmgk.delay.syntax.toAsync @@ -57,11 +57,11 @@ class Channel[T: JsonValueCodec]( // If the local dtnd could be stopped and restarted without loosing data, this id should remain the same for performance reasons, but it will be correct even if it changes. val dtnid = Uid.gen() - override def prepare(incomingHandler: Handler[ProtocolMessage[T]]): Async[Abort, Connection[ProtocolMessage[T]]] = + override def prepare(receiver: Receive[ProtocolMessage[T]]): Async[Abort, Connection[ProtocolMessage[T]]] = Async { val client: Client = Client(host, port, appName, monitoringClient).toAsync(using ec).bind val conn = ClientContext[T](client, ec, operationMode) - val cb = incomingHandler.getCallbackFor(conn) + val cb = receiver.messageHandler(conn) client.registerOnReceive { (message_type: RdtMessageType, payload: Array[Byte], dots: Dots) => message_type match diff --git a/Modules/Examples/TodoMVC/src/main/scala/todo/WebviewAdapter.scala b/Modules/Examples/TodoMVC/src/main/scala/todo/WebviewAdapter.scala index 0275b05ff..62e264a01 100644 --- a/Modules/Examples/TodoMVC/src/main/scala/todo/WebviewAdapter.scala +++ b/Modules/Examples/TodoMVC/src/main/scala/todo/WebviewAdapter.scala @@ -1,6 +1,6 @@ package todo -import channels.{Abort, ArrayMessageBuffer, Connection, Handler, LatentConnection, MessageBuffer} +import channels.{Abort, ArrayMessageBuffer, Connection, Receive, LatentConnection, MessageBuffer} import de.rmgk.delay.{Async, Sync} import scala.scalajs.js @@ -28,9 +28,9 @@ object WebviewAdapterChannel { } def listen(): LatentConnection[MessageBuffer] = new LatentConnection { - def prepare(incomingHandler: Handler[MessageBuffer]): Async[Abort, Connection[MessageBuffer]] = Sync { + def prepare(incomingHandler: Receive[MessageBuffer]): Async[Abort, Connection[MessageBuffer]] = Sync { val conn = WebviewConnectionContext - val cb = incomingHandler.getCallbackFor(conn) + val cb = incomingHandler.messageHandler(conn) receiveCallback = { (msg: String) => val bytes = java.util.Base64.getDecoder.decode(msg) cb.succeed(ArrayMessageBuffer(bytes)) diff --git a/Modules/Replication/shared/src/main/scala/replication/DeltaDissemination.scala b/Modules/Replication/shared/src/main/scala/replication/DeltaDissemination.scala index 54394e69b..007f499e1 100644 --- a/Modules/Replication/shared/src/main/scala/replication/DeltaDissemination.scala +++ b/Modules/Replication/shared/src/main/scala/replication/DeltaDissemination.scala @@ -98,7 +98,13 @@ class DeltaDissemination[State]( } def addLatentConnection(latentConnection: LatentConnection[ProtocolMessage[State]]): Unit = { - latentConnection.prepare(conn => messageBufferCallback(conn)).run(using globalAbort) { + val preparedConnection = latentConnection.prepare { from => + { + case Success(msg) => handleMessage(msg, from) + case Failure(error) => error.printStackTrace() + } + } + preparedConnection.run(using globalAbort) { case Success(conn) => lock.synchronized { connections = conn :: connections @@ -134,20 +140,16 @@ class DeltaDissemination[State]( contexts = contexts.updatedWith(rr)(curr => curr `merge` Some(dots)) } - private def messageBufferCallback(outChan: ConnectionContext): Callback[ProtocolMessage[State]] = - case Success(msg) => handleMessage(msg, outChan) - case Failure(error) => error.printStackTrace() - - def handleMessage(msg: ProtocolMessage[State], biChan: ConnectionContext): Unit = { + def handleMessage(msg: ProtocolMessage[State], from: ConnectionContext): Unit = { msg match case Ping(time) => - biChan.send(Pong(time)).run(debugCallbackAndRemoveCon(biChan)) + from.send(Pong(time)).run(debugCallbackAndRemoveCon(from)) case Pong(time) => println(s"ping took ${(System.nanoTime() - time.toLong).doubleValue / 1000_000}ms") case Request(uid, knows) => val relevant = pastPayloads.filterNot { dt => dt.dots <= knows } relevant.foreach: msg => - biChan.send(msg.addSender(replicaId.uid)).run(using ())(debugCallbackAndRemoveCon(biChan)) + from.send(msg.addSender(replicaId.uid)).run(using ())(debugCallbackAndRemoveCon(from)) updateContext(uid, selfContext `merge` knows) case payload @ Payload(uid, context, data) => if context <= selfContext then return @@ -160,7 +162,7 @@ class DeltaDissemination[State]( } receiveCallback(data) if immediateForward then - disseminate(payload, Set(biChan)) + disseminate(payload, Set(from)) } diff --git a/Modules/Webview/src/main/scala/Webview.scala b/Modules/Webview/src/main/scala/Webview.scala index b15244191..e44d11a3f 100644 --- a/Modules/Webview/src/main/scala/Webview.scala +++ b/Modules/Webview/src/main/scala/Webview.scala @@ -1,4 +1,4 @@ -import channels.{Abort, ArrayMessageBuffer, Connection, Handler, LatentConnection, MessageBuffer} +import channels.{Abort, ArrayMessageBuffer, Connection, Receive, LatentConnection, MessageBuffer} import com.github.plokhotnyuk.jsoniter_scala.core.{JsonValueCodec, readFromString, writeToArray} import com.github.plokhotnyuk.jsoniter_scala.macros.JsonCodecMaker import de.rmgk.delay.{Async, Sync} @@ -85,9 +85,9 @@ object WebviewNativeChannel { } def listen(w: WebView): LatentConnection[MessageBuffer] = new LatentConnection { - def prepare(incomingHandler: Handler[MessageBuffer]): Async[Abort, Connection[MessageBuffer]] = Sync { + def prepare(incomingHandler: Receive[MessageBuffer]): Async[Abort, Connection[MessageBuffer]] = Sync { val conn = WebviewConnectionContext(w) - val cb = incomingHandler.getCallbackFor(conn) + val cb = incomingHandler.messageHandler(conn) w.bind( "webview_channel_send", { msg =>