From dd96014476a758121e08544f9c2f9dcda159acda Mon Sep 17 00:00:00 2001 From: ragnar Date: Wed, 10 Jul 2024 18:10:41 +0200 Subject: [PATCH] give dtnchannel its own Uid --- .../main/scala/replication/DTNChannel.scala | 10 +++++++--- .../main/scala/replication/DataManager.scala | 18 +++--------------- 2 files changed, 10 insertions(+), 18 deletions(-) diff --git a/Modules/Replication/shared/src/main/scala/replication/DTNChannel.scala b/Modules/Replication/shared/src/main/scala/replication/DTNChannel.scala index 39e1aca62..e735cac29 100644 --- a/Modules/Replication/shared/src/main/scala/replication/DTNChannel.scala +++ b/Modules/Replication/shared/src/main/scala/replication/DTNChannel.scala @@ -30,6 +30,11 @@ class DTNRdtClientContext[T: JsonValueCodec](connection: RdtClient, executionCon class DTNChannel[T: JsonValueCodec](host: String, port: Int, appName: String, ec: ExecutionContext) extends AbstractLatentConnection[ProtocolMessage[T]] { + + // We use a local dtnid instead of a remote replica ID to signify that the local DTNd is the one providing information. + // If the local dtnd could be stopped and restarted without loosing data, this id should remain the same for performance reasons, but it will be correct even if it changes. + val dtnid = Uid.gen() + override def prepare(incoming: AbstractIncoming[ProtocolMessage[T]]) : Async[Abort, AbstractConnectionContext[ProtocolMessage[T]]] = Async { val client: RdtClient = RdtClient(host, port, appName, NoDotsConvergenceClient).toAsync(using ec).bind @@ -38,13 +43,12 @@ class DTNChannel[T: JsonValueCodec](host: String, port: Int, appName: String, ec client.registerOnReceive { (payload: Array[Byte], dots: Dots) => val data = readFromArray[T](payload) - // TODO: is Uid zero bad? - cb.succeed(ProtocolMessage.Payload(Uid.zero, dots, data)) + cb.succeed(ProtocolMessage.Payload(dtnid, dots, data)) } // TODO: create custom empty request to signal to the application that it should send us all data it has. // optimally, this should not be an empty set of dots, but those present in the network - cb.succeed(ProtocolMessage.Request(Uid.zero, Dots.empty)) + cb.succeed(ProtocolMessage.Request(dtnid, Dots.empty)) conn } diff --git a/Modules/Replication/shared/src/main/scala/replication/DataManager.scala b/Modules/Replication/shared/src/main/scala/replication/DataManager.scala index a2a7f4360..42b60039b 100644 --- a/Modules/Replication/shared/src/main/scala/replication/DataManager.scala +++ b/Modules/Replication/shared/src/main/scala/replication/DataManager.scala @@ -75,7 +75,9 @@ class DataManager[State]( 10000 ) - def addLatentConnection(latentConnection: AbstractLatentConnection[MessageBuffer])(using JsonValueCodec[CodecState]): Unit = { + def addLatentConnection(latentConnection: AbstractLatentConnection[MessageBuffer])(using + JsonValueCodec[CodecState] + ): Unit = { addLatentConnection(DataManager.jsoniterMessages(latentConnection)) } @@ -155,17 +157,3 @@ class DataManager[State]( } } - -//def encodeDelta(delta: TransferState): MessageBuffer = { -// toEncBuffer: -// writeToArray[ProtocolMessage[TransferState]](Payload(replicaId.uid, delta)) -//} -// -//def missingRequestMessage: MessageBuffer = { -// toEncBuffer: -// writeToArray[ProtocolMessage[TransferState]](Request(replicaId.uid, selfContext)) -//} -// -//def toEncBuffer(bytes: Array[Byte]): MessageBuffer = -// val enc = crypto.map(c => c.encrypt(bytes, Array.empty)).getOrElse(bytes) -// ArrayMessageBuffer(enc)