From e06714494fa9378587fd30733b8c70251f4fef8f Mon Sep 17 00:00:00 2001 From: ragnar Date: Sun, 24 Nov 2024 22:56:49 +0100 Subject: [PATCH] make synchronous local connection a proper non test implementation I am unreasonably satisfied that it just works like this --- .../channels/SynchronousLocalConnection.scala | 47 +++++++++++++++++ .../example/DeltaDisseminationTest.scala | 50 +++---------------- 2 files changed, 53 insertions(+), 44 deletions(-) create mode 100644 Modules/Channels/shared/src/main/scala/channels/SynchronousLocalConnection.scala diff --git a/Modules/Channels/shared/src/main/scala/channels/SynchronousLocalConnection.scala b/Modules/Channels/shared/src/main/scala/channels/SynchronousLocalConnection.scala new file mode 100644 index 000000000..bbd98b304 --- /dev/null +++ b/Modules/Channels/shared/src/main/scala/channels/SynchronousLocalConnection.scala @@ -0,0 +1,47 @@ +package channels + +import de.rmgk.delay.{Async, Callback, Promise, Sync} + +/** Allows establishing a single direct synchronous connection. */ +class SynchronousLocalConnection[T] { + object client extends LatentConnection[T] { + + object connection extends Connection[T] { + def send(msg: T): Async[Any, Unit] = Async { + val cb = server.incomingMessageCallback.async.bind + cb.succeed(msg) + } + override def close(): Unit = () + } + + def prepare(incomingHandler: Handler[T]): Async[Abort, Connection[T]] = Async { + val callback = incomingHandler.getCallbackFor(connection) + + val serverConn = new Connection[T] { + override def close(): Unit = () + override def send(message: T): Async[Any, Unit] = Sync { + callback.succeed(message) + } + } + + val established = server.connectionEstablished.async.bind + established.succeed(serverConn) + connection + } + } + + object server extends LatentConnection[T] { + + val incomingMessageCallback: Promise[Callback[T]] = Promise() + + val connectionEstablished: Promise[Callback[Connection[T]]] = Promise() + + def prepare(incomingHandler: Handler[T]): Async[Abort, Connection[T]] = Async.fromCallback[Connection[T]] { + connectionEstablished.succeed(Async.handler) + }.map { conn => + incomingMessageCallback.succeed(incomingHandler.getCallbackFor(conn)) + conn + } + } + +} diff --git a/Modules/Replication/shared/src/test/scala/replication/example/DeltaDisseminationTest.scala b/Modules/Replication/shared/src/test/scala/replication/example/DeltaDisseminationTest.scala index 76a98e1eb..f6a45a12d 100644 --- a/Modules/Replication/shared/src/test/scala/replication/example/DeltaDisseminationTest.scala +++ b/Modules/Replication/shared/src/test/scala/replication/example/DeltaDisseminationTest.scala @@ -1,60 +1,22 @@ package replication.example -import channels.{Abort, Connection, Handler, LatentConnection} -import de.rmgk.delay.{Async, Callback, Sync} +import channels.SynchronousLocalConnection import rdts.base.LocalUid import replication.{DeltaDissemination, ProtocolMessage} -class SynchronizedConnection[T] { - object client extends LatentConnection[T] { - - object connection extends Connection[T] { - def send(msg: T): Async[Any, Unit] = Sync { - server.callback.succeed(msg) - } - override def close(): Unit = () - } - - def prepare(incomingHandler: Handler[T]): Async[Abort, Connection[T]] = Sync { - val callback = incomingHandler.getCallbackFor(connection) - - val serverConn = new Connection[T] { - override def close(): Unit = () - override def send(message: T): Async[Any, Unit] = Sync { - callback.succeed(message) - } - } - server.connectionEstablished.succeed(serverConn) - connection - } - } - - object server extends LatentConnection[T] { - - var callback: Callback[T] = null - - var connectionEstablished: Callback[Connection[T]] = null - - def prepare(incomingHandler: Handler[T]): Async[Abort, Connection[T]] = Async.fromCallback[Connection[T]] { - connectionEstablished = Async.handler - }.map { conn => - callback = incomingHandler.getCallbackFor(conn) - conn - } - } - -} - class DeltaDisseminationTest extends munit.FunSuite { test("basics") { // I have no clue why this syntax is still not deprecated xD val dd1, dd2 = DeltaDissemination[Set[String]](LocalUid.gen(), _ => ()) - val sync = SynchronizedConnection[ProtocolMessage[Set[String]]]() + val sync = SynchronousLocalConnection[ProtocolMessage[Set[String]]]() - dd1.addLatentConnection(sync.server) dd2.addLatentConnection(sync.client) + dd1.addLatentConnection(sync.server) + + dd1.pingAll() + dd2.pingAll() dd1.applyDelta(Set("a")) dd2.applyDelta(Set("b"))