From f8d3a07683471ff96c4871ee4b00d2b3912cdbda Mon Sep 17 00:00:00 2001 From: ragnar Date: Sun, 24 Nov 2024 22:40:12 +0100 Subject: [PATCH] add basic delta dissemination test --- .../src/main/scala/channels/Channels.scala | 8 ++- .../replication/DeltaDissemination.scala | 17 ++--- .../example/DeltaDisseminationTest.scala | 65 +++++++++++++++++++ 3 files changed, 78 insertions(+), 12 deletions(-) create mode 100644 Modules/Replication/shared/src/test/scala/replication/example/DeltaDisseminationTest.scala diff --git a/Modules/Channels/shared/src/main/scala/channels/Channels.scala b/Modules/Channels/shared/src/main/scala/channels/Channels.scala index 70eb7f2a3..fe6b64fd1 100644 --- a/Modules/Channels/shared/src/main/scala/channels/Channels.scala +++ b/Modules/Channels/shared/src/main/scala/channels/Channels.scala @@ -34,8 +34,12 @@ trait Connection[T] { def close(): Unit } -/** Provides a specification how to handle messages, given a connection context */ +/** Provides a specification how to handle messages, given a connection context. */ trait Handler[T] { + + /** The provided connection is not guaranteed to be useable until the first message is received. + * If you want to initiate sending messages on this connection, use the value returned by the prepare call of the latent connection instead. + */ def getCallbackFor(conn: Connection[T]): Callback[T] } @@ -48,7 +52,7 @@ trait Handler[T] { trait LatentConnection[T] { /** The returned async, when run, should establish connections with the given callback atomically. - * That is, no messages should be lost. + * That is, no messages should be lost during setup. * Similarly, the provider of the callback (the result of `incoming`) of this method should make sure that the other end of the callback is ready to receive callbacks before running the async. */ def prepare(incomingHandler: Handler[T]): Async[Abort, Connection[T]] diff --git a/Modules/Replication/shared/src/main/scala/replication/DeltaDissemination.scala b/Modules/Replication/shared/src/main/scala/replication/DeltaDissemination.scala index 1346129ef..cc5f7618e 100644 --- a/Modules/Replication/shared/src/main/scala/replication/DeltaDissemination.scala +++ b/Modules/Replication/shared/src/main/scala/replication/DeltaDissemination.scala @@ -98,7 +98,6 @@ class DeltaDissemination[State]( } def addLatentConnection(latentConnection: LatentConnection[ProtocolMessage[State]]): Unit = { - println(s"activating latent connection in data manager") latentConnection.prepare(conn => messageBufferCallback(conn)).run(using globalAbort) { case Success(conn) => lock.synchronized { @@ -112,9 +111,10 @@ class DeltaDissemination[State]( } // note that deltas are not guaranteed to be ordered the same in the buffers - val lock: AnyRef = new {} - private var localDeltas: List[Payload[State]] = Nil - private var remoteDeltas: List[Payload[State]] = Nil + private val lock: AnyRef = new {} + private var pastDeltas: List[Payload[State]] = Nil + + def allDeltas: List[Payload[State]] = pastDeltas private var contexts: Map[Uid, Dots] = Map.empty @@ -125,14 +125,11 @@ class DeltaDissemination[State]( val nextDot = selfContext.nextDot(replicaId.uid) val payload = Payload(replicaId.uid, Dots.single(nextDot), delta) updateContext(replicaId.uid, payload.dots) + pastDeltas = payload :: pastDeltas payload } disseminate(payload) - def allDeltas: List[Payload[State]] = lock.synchronized { - List(remoteDeltas, localDeltas).flatten - } - def updateContext(rr: Uid, dots: Dots): Unit = lock.synchronized { contexts = contexts.updatedWith(rr)(curr => curr `merge` Some(dots)) } @@ -148,7 +145,7 @@ class DeltaDissemination[State]( case Pong(time) => println(s"ping took ${(System.nanoTime() - time.toLong).doubleValue / 1000_000}ms") case Request(uid, knows) => - val relevant = allDeltas.filterNot { dt => dt.dots <= knows } + val relevant = pastDeltas.filterNot { dt => dt.dots <= knows } relevant.foreach: msg => biChan.send(msg.addSender(replicaId.uid)).run(using ())(debugCallbackAndRemoveCon(biChan)) updateContext(uid, selfContext `merge` knows) @@ -159,7 +156,7 @@ class DeltaDissemination[State]( updateContext(uid, context) } updateContext(replicaId.uid, context) - remoteDeltas = payload :: remoteDeltas + pastDeltas = payload :: pastDeltas } receiveCallback(data) if immediateForward then disseminate(payload, Set(biChan)) diff --git a/Modules/Replication/shared/src/test/scala/replication/example/DeltaDisseminationTest.scala b/Modules/Replication/shared/src/test/scala/replication/example/DeltaDisseminationTest.scala new file mode 100644 index 000000000..76a98e1eb --- /dev/null +++ b/Modules/Replication/shared/src/test/scala/replication/example/DeltaDisseminationTest.scala @@ -0,0 +1,65 @@ +package replication.example + +import channels.{Abort, Connection, Handler, LatentConnection} +import de.rmgk.delay.{Async, Callback, Sync} +import rdts.base.LocalUid +import replication.{DeltaDissemination, ProtocolMessage} + +class SynchronizedConnection[T] { + object client extends LatentConnection[T] { + + object connection extends Connection[T] { + def send(msg: T): Async[Any, Unit] = Sync { + server.callback.succeed(msg) + } + override def close(): Unit = () + } + + def prepare(incomingHandler: Handler[T]): Async[Abort, Connection[T]] = Sync { + val callback = incomingHandler.getCallbackFor(connection) + + val serverConn = new Connection[T] { + override def close(): Unit = () + override def send(message: T): Async[Any, Unit] = Sync { + callback.succeed(message) + } + } + server.connectionEstablished.succeed(serverConn) + connection + } + } + + object server extends LatentConnection[T] { + + var callback: Callback[T] = null + + var connectionEstablished: Callback[Connection[T]] = null + + def prepare(incomingHandler: Handler[T]): Async[Abort, Connection[T]] = Async.fromCallback[Connection[T]] { + connectionEstablished = Async.handler + }.map { conn => + callback = incomingHandler.getCallbackFor(conn) + conn + } + } + +} + +class DeltaDisseminationTest extends munit.FunSuite { + test("basics") { + + // I have no clue why this syntax is still not deprecated xD + val dd1, dd2 = DeltaDissemination[Set[String]](LocalUid.gen(), _ => ()) + + val sync = SynchronizedConnection[ProtocolMessage[Set[String]]]() + + dd1.addLatentConnection(sync.server) + dd2.addLatentConnection(sync.client) + + dd1.applyDelta(Set("a")) + dd2.applyDelta(Set("b")) + + assertEquals(dd1.allDeltas.toSet, dd2.allDeltas.toSet) + + } +}