Skip to content

Commit

Permalink
give dtnchannel its own Uid
Browse files Browse the repository at this point in the history
  • Loading branch information
rmgk committed Jul 10, 2024
1 parent daf10b9 commit dd96014
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ class DTNRdtClientContext[T: JsonValueCodec](connection: RdtClient, executionCon

class DTNChannel[T: JsonValueCodec](host: String, port: Int, appName: String, ec: ExecutionContext)
extends AbstractLatentConnection[ProtocolMessage[T]] {

// We use a local dtnid instead of a remote replica ID to signify that the local DTNd is the one providing information.
// If the local dtnd could be stopped and restarted without loosing data, this id should remain the same for performance reasons, but it will be correct even if it changes.
val dtnid = Uid.gen()

override def prepare(incoming: AbstractIncoming[ProtocolMessage[T]])
: Async[Abort, AbstractConnectionContext[ProtocolMessage[T]]] = Async {
val client: RdtClient = RdtClient(host, port, appName, NoDotsConvergenceClient).toAsync(using ec).bind
Expand All @@ -38,13 +43,12 @@ class DTNChannel[T: JsonValueCodec](host: String, port: Int, appName: String, ec

client.registerOnReceive { (payload: Array[Byte], dots: Dots) =>
val data = readFromArray[T](payload)
// TODO: is Uid zero bad?
cb.succeed(ProtocolMessage.Payload(Uid.zero, dots, data))
cb.succeed(ProtocolMessage.Payload(dtnid, dots, data))
}

// TODO: create custom empty request to signal to the application that it should send us all data it has.
// optimally, this should not be an empty set of dots, but those present in the network
cb.succeed(ProtocolMessage.Request(Uid.zero, Dots.empty))
cb.succeed(ProtocolMessage.Request(dtnid, Dots.empty))

conn
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ class DataManager[State](
10000
)

def addLatentConnection(latentConnection: AbstractLatentConnection[MessageBuffer])(using JsonValueCodec[CodecState]): Unit = {
def addLatentConnection(latentConnection: AbstractLatentConnection[MessageBuffer])(using
JsonValueCodec[CodecState]
): Unit = {
addLatentConnection(DataManager.jsoniterMessages(latentConnection))
}

Expand Down Expand Up @@ -155,17 +157,3 @@ class DataManager[State](
}

}

//def encodeDelta(delta: TransferState): MessageBuffer = {
// toEncBuffer:
// writeToArray[ProtocolMessage[TransferState]](Payload(replicaId.uid, delta))
//}
//
//def missingRequestMessage: MessageBuffer = {
// toEncBuffer:
// writeToArray[ProtocolMessage[TransferState]](Request(replicaId.uid, selfContext))
//}
//
//def toEncBuffer(bytes: Array[Byte]): MessageBuffer =
// val enc = crypto.map(c => c.encrypt(bytes, Array.empty)).getOrElse(bytes)
// ArrayMessageBuffer(enc)

0 comments on commit dd96014

Please sign in to comment.