Skip to content

Commit

Permalink
add basic delta dissemination test
Browse files Browse the repository at this point in the history
  • Loading branch information
rmgk committed Nov 24, 2024
1 parent 4adc8fc commit f8d3a07
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,12 @@ trait Connection[T] {
def close(): Unit
}

/** Provides a specification how to handle messages, given a connection context */
/** Provides a specification how to handle messages, given a connection context. */
trait Handler[T] {

/** The provided connection is not guaranteed to be useable until the first message is received.
* If you want to initiate sending messages on this connection, use the value returned by the prepare call of the latent connection instead.
*/
def getCallbackFor(conn: Connection[T]): Callback[T]
}

Expand All @@ -48,7 +52,7 @@ trait Handler[T] {
trait LatentConnection[T] {

/** The returned async, when run, should establish connections with the given callback atomically.
* That is, no messages should be lost.
* That is, no messages should be lost during setup.
* Similarly, the provider of the callback (the result of `incoming`) of this method should make sure that the other end of the callback is ready to receive callbacks before running the async.
*/
def prepare(incomingHandler: Handler[T]): Async[Abort, Connection[T]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ class DeltaDissemination[State](
}

def addLatentConnection(latentConnection: LatentConnection[ProtocolMessage[State]]): Unit = {
println(s"activating latent connection in data manager")
latentConnection.prepare(conn => messageBufferCallback(conn)).run(using globalAbort) {
case Success(conn) =>
lock.synchronized {
Expand All @@ -112,9 +111,10 @@ class DeltaDissemination[State](
}

// note that deltas are not guaranteed to be ordered the same in the buffers
val lock: AnyRef = new {}
private var localDeltas: List[Payload[State]] = Nil
private var remoteDeltas: List[Payload[State]] = Nil
private val lock: AnyRef = new {}
private var pastDeltas: List[Payload[State]] = Nil

def allDeltas: List[Payload[State]] = pastDeltas

private var contexts: Map[Uid, Dots] = Map.empty

Expand All @@ -125,14 +125,11 @@ class DeltaDissemination[State](
val nextDot = selfContext.nextDot(replicaId.uid)
val payload = Payload(replicaId.uid, Dots.single(nextDot), delta)
updateContext(replicaId.uid, payload.dots)
pastDeltas = payload :: pastDeltas
payload
}
disseminate(payload)

def allDeltas: List[Payload[State]] = lock.synchronized {
List(remoteDeltas, localDeltas).flatten
}

def updateContext(rr: Uid, dots: Dots): Unit = lock.synchronized {
contexts = contexts.updatedWith(rr)(curr => curr `merge` Some(dots))
}
Expand All @@ -148,7 +145,7 @@ class DeltaDissemination[State](
case Pong(time) =>
println(s"ping took ${(System.nanoTime() - time.toLong).doubleValue / 1000_000}ms")
case Request(uid, knows) =>
val relevant = allDeltas.filterNot { dt => dt.dots <= knows }
val relevant = pastDeltas.filterNot { dt => dt.dots <= knows }
relevant.foreach: msg =>
biChan.send(msg.addSender(replicaId.uid)).run(using ())(debugCallbackAndRemoveCon(biChan))
updateContext(uid, selfContext `merge` knows)
Expand All @@ -159,7 +156,7 @@ class DeltaDissemination[State](
updateContext(uid, context)
}
updateContext(replicaId.uid, context)
remoteDeltas = payload :: remoteDeltas
pastDeltas = payload :: pastDeltas
}
receiveCallback(data)
if immediateForward then disseminate(payload, Set(biChan))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package replication.example

import channels.{Abort, Connection, Handler, LatentConnection}
import de.rmgk.delay.{Async, Callback, Sync}
import rdts.base.LocalUid
import replication.{DeltaDissemination, ProtocolMessage}

class SynchronizedConnection[T] {
object client extends LatentConnection[T] {

object connection extends Connection[T] {
def send(msg: T): Async[Any, Unit] = Sync {
server.callback.succeed(msg)
}
override def close(): Unit = ()
}

def prepare(incomingHandler: Handler[T]): Async[Abort, Connection[T]] = Sync {
val callback = incomingHandler.getCallbackFor(connection)

val serverConn = new Connection[T] {
override def close(): Unit = ()
override def send(message: T): Async[Any, Unit] = Sync {
callback.succeed(message)
}
}
server.connectionEstablished.succeed(serverConn)
connection
}
}

object server extends LatentConnection[T] {

var callback: Callback[T] = null

var connectionEstablished: Callback[Connection[T]] = null

def prepare(incomingHandler: Handler[T]): Async[Abort, Connection[T]] = Async.fromCallback[Connection[T]] {
connectionEstablished = Async.handler
}.map { conn =>
callback = incomingHandler.getCallbackFor(conn)
conn
}
}

}

class DeltaDisseminationTest extends munit.FunSuite {
test("basics") {

// I have no clue why this syntax is still not deprecated xD
val dd1, dd2 = DeltaDissemination[Set[String]](LocalUid.gen(), _ => ())

val sync = SynchronizedConnection[ProtocolMessage[Set[String]]]()

dd1.addLatentConnection(sync.server)
dd2.addLatentConnection(sync.client)

dd1.applyDelta(Set("a"))
dd2.applyDelta(Set("b"))

assertEquals(dd1.allDeltas.toSet, dd2.allDeltas.toSet)

}
}

0 comments on commit f8d3a07

Please sign in to comment.