From 08606bc5da3dc5bae6064483e651227d6cbaf853 Mon Sep 17 00:00:00 2001 From: Lukas Holst Date: Mon, 22 Jul 2024 08:26:50 -0400 Subject: [PATCH] Added message-type parameter to DTNCannel, RdtClient, Bundle, and routing, although routing ignores requests for now. --- Modules/DTN/jvm/src/main/scala/dtn/Main.scala | 8 +-- .../shared/src/main/scala/dtn/Bundle.scala | 14 +++-- .../src/main/scala/dtn/BundleCreation.scala | 3 +- .../shared/src/main/scala/dtn/RdtClient.scala | 15 +++--- .../main/scala/dtn/routing/RdtRouter.scala | 51 +++++++++++-------- .../main/scala/dtn/routing/RdtRouter2.scala | 41 +++++++-------- .../main/scala/replication/DTNChannel.scala | 36 ++++++------- 7 files changed, 93 insertions(+), 75 deletions(-) diff --git a/Modules/DTN/jvm/src/main/scala/dtn/Main.scala b/Modules/DTN/jvm/src/main/scala/dtn/Main.scala index c9b981ffb..f5ba5d423 100644 --- a/Modules/DTN/jvm/src/main/scala/dtn/Main.scala +++ b/Modules/DTN/jvm/src/main/scala/dtn/Main.scala @@ -114,12 +114,12 @@ def send_one_rdt_package(host: String, port: Int, checkerHost: String, checkerPo val checkerClient = DotsConvergenceClient(checkerHost, checkerPort) RdtClient(host, port, "testapp", checkerClient).flatMap(client => { - client.registerOnReceive((payload: Array[Byte], dots: Dots) => { + client.registerOnReceive((message_type: RdtMessageType, payload: Array[Byte], dots: Dots) => { println(s"received dots: $dots") }) println(s"sending dots: $dots") - client.send(payload = Array(), dots = dots) + client.send(message_type = RdtMessageType.Payload, payload = Array(), dots = dots) }).recover(throwable => println(throwable)) while true do { @@ -132,7 +132,7 @@ def send_continuous_rdt_packages(host: String, port: Int, checkerHost: String, c val checkerClient = DotsConvergenceClient(checkerHost, checkerPort) RdtClient(host, port, "testapp", checkerClient).map(client => { - client.registerOnReceive((payload: Array[Byte], d: Dots) => { + client.registerOnReceive((message_type: RdtMessageType, payload: Array[Byte], d: Dots) => { dots = dots.merge(d) println(s"merged rdt-meta data, new dots: $dots") }) @@ -143,7 +143,7 @@ def send_continuous_rdt_packages(host: String, port: Int, checkerHost: String, c // add dots here println(s"sending new dots: $dots") - client.send(Array(), dots).printError() + client.send(RdtMessageType.Payload, Array(), dots).printError() } }).recover(throwable => println(throwable)) diff --git a/Modules/DTN/shared/src/main/scala/dtn/Bundle.scala b/Modules/DTN/shared/src/main/scala/dtn/Bundle.scala index a24480c9c..93dbe2098 100644 --- a/Modules/DTN/shared/src/main/scala/dtn/Bundle.scala +++ b/Modules/DTN/shared/src/main/scala/dtn/Bundle.scala @@ -18,6 +18,9 @@ deserialization: CBOR -> object && JSON -> object todo: for object -> JSON serialization the corresponding counter-method to readBytes(reader: Reader): Array[Byte] is missing. we would need info on the desired serialized presentation (JSON or CBOR) for this. */ +enum RdtMessageType: + case Request, Payload + case class Endpoint(scheme: Int, specific_part: String | Int) { def full_uri: String = { scheme match { @@ -136,6 +139,8 @@ case class FragmentInfo(fragment_offset: Int, total_application_data_length: Int case class HopCount(hop_limit: Int, current_count: Int) +case class RdtMetaInfo(dots: Dots, message_type: RdtMessageType) + case class PrimaryBlock( version: Int, bundle_processing_control_flags: BundleProcessingControlFlags, @@ -245,15 +250,15 @@ case class RdtMetaBlock( crc_type: Int, data: Array[Byte] ) extends CanonicalBlock { - def dots: Dots = Cbor.decode(data).to[Dots].value + def info: RdtMetaInfo = Cbor.decode(data).to[RdtMetaInfo].value } object RdtMetaBlock { - def createFrom(dots: Dots): RdtMetaBlock = RdtMetaBlock( + def createFrom(info: RdtMetaInfo): RdtMetaBlock = RdtMetaBlock( CanonicalBlock.RDT_META_BLOCK_TYPE_CPDE, 0, BlockProcessingControlFlags(), 0, - Cbor.encode(dots).toByteArray + Cbor.encode(info).toByteArray ) } @@ -302,6 +307,9 @@ private def readBytes(reader: Reader): Array[Byte] = { } } +given Codec[RdtMessageType] = deriveCodec[RdtMessageType] +given Codec[RdtMetaInfo] = deriveCodec[RdtMetaInfo] + given Encoder[Uid] = Encoder.forString.asInstanceOf[Encoder[Uid]] given Decoder[Uid] = Decoder.forString.asInstanceOf[Decoder[Uid]] diff --git a/Modules/DTN/shared/src/main/scala/dtn/BundleCreation.scala b/Modules/DTN/shared/src/main/scala/dtn/BundleCreation.scala index f1446cf32..be568dbd0 100644 --- a/Modules/DTN/shared/src/main/scala/dtn/BundleCreation.scala +++ b/Modules/DTN/shared/src/main/scala/dtn/BundleCreation.scala @@ -44,6 +44,7 @@ object BundleCreation { } def createBundleRdt( + message_type: RdtMessageType, data: Array[Byte], dots: Dots, node: Endpoint, @@ -61,7 +62,7 @@ object BundleCreation { lifetime = 1000 * 3600 * 24 ) - val rdtmeta_block: RdtMetaBlock = RdtMetaBlock.createFrom(dots).copy(block_number = 4) + val rdtmeta_block: RdtMetaBlock = RdtMetaBlock.createFrom(RdtMetaInfo(dots, message_type)).copy(block_number = 4) val hop_count_block: HopCountBlock = HopCountBlock.createFrom(HopCount(hop_limit = 32, current_count = 0)).copy(block_number = 3) diff --git a/Modules/DTN/shared/src/main/scala/dtn/RdtClient.scala b/Modules/DTN/shared/src/main/scala/dtn/RdtClient.scala index 53e9c05ac..9b7bf39b0 100644 --- a/Modules/DTN/shared/src/main/scala/dtn/RdtClient.scala +++ b/Modules/DTN/shared/src/main/scala/dtn/RdtClient.scala @@ -6,8 +6,9 @@ import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future class RdtClient(ws: WSEndpointClient, appName: String, checkerClient: ConvergenceClientInterface) { - def send(payload: Array[Byte], dots: Dots): Future[Unit] = { + def send(message_type: RdtMessageType, payload: Array[Byte], dots: Dots): Future[Unit] = { val bundle: Bundle = BundleCreation.createBundleRdt( + message_type = message_type, data = payload, dots = dots, node = Endpoint.createFrom(ws.nodeId), @@ -18,20 +19,20 @@ class RdtClient(ws: WSEndpointClient, appName: String, checkerClient: Convergenc ws.sendBundle(bundle) } - def registerOnReceive(callback: (Array[Byte], Dots) => Unit): Unit = { + def registerOnReceive(callback: (RdtMessageType, Array[Byte], Dots) => Unit): Unit = { // flush receive forever and call callback def flush_receive(): Future[Bundle] = { ws.receiveBundle().flatMap(bundle => { println(s"received bundle: ${bundle.id}") - val payload: Option[Array[Byte]] = bundle.other_blocks.collectFirst({ case x: PayloadBlock => x.data }) - val dots: Option[Dots] = bundle.other_blocks.collectFirst({ case x: RdtMetaBlock => x.dots }) + val payload: Option[Array[Byte]] = bundle.other_blocks.collectFirst({ case x: PayloadBlock => x.data }) + val rdt_meta_info: Option[RdtMetaInfo] = bundle.other_blocks.collectFirst({ case x: RdtMetaBlock => x.info }) - if payload.isEmpty || dots.isEmpty then { + if payload.isEmpty || rdt_meta_info.isEmpty then { println("did not contain dots or payload. bundle is no rdt bundle. ignoring bundle.") } else { - checkerClient.send(dots.get) - callback(payload.get, dots.get) + checkerClient.send(rdt_meta_info.get.dots) + callback(rdt_meta_info.get.message_type, payload.get, rdt_meta_info.get.dots) } flush_receive() diff --git a/Modules/DTN/shared/src/main/scala/dtn/routing/RdtRouter.scala b/Modules/DTN/shared/src/main/scala/dtn/routing/RdtRouter.scala index d5733bd74..c6b946576 100644 --- a/Modules/DTN/shared/src/main/scala/dtn/routing/RdtRouter.scala +++ b/Modules/DTN/shared/src/main/scala/dtn/routing/RdtRouter.scala @@ -9,6 +9,7 @@ import scala.concurrent.Future import scala.jdk.CollectionConverters.* import scala.math.{addExact, max} import scala.util.{Random, Try} +import dtn.RdtMetaInfo // Variables, chosen number, for this routing val MIN_DELIVERED = 10 @@ -59,9 +60,9 @@ class RdtRouter(ws: WSEroutingClient) extends BaseRouter(ws: WSEroutingClient) { val dotState: DotState = DotState() // maybe grows indefinitely, but only if we receive a bundle which will not be forwarded (hop count depleted?, local unicast endpoint?) - val tempDotsStore: ConcurrentHashMap[String, Dots] = ConcurrentHashMap() - val tempPreviousNodeStore: ConcurrentHashMap[String, Endpoint] = ConcurrentHashMap() - val tempRdtIdStore: ConcurrentHashMap[String, String] = ConcurrentHashMap() + val tempRdtMetaInfoStore: ConcurrentHashMap[String, RdtMetaInfo] = ConcurrentHashMap() + val tempPreviousNodeStore: ConcurrentHashMap[String, Endpoint] = ConcurrentHashMap() + val tempRdtIdStore: ConcurrentHashMap[String, String] = ConcurrentHashMap() val delivered: ConcurrentHashMap[String, Int] = ConcurrentHashMap() // shows the number of nodes we have delivered a bundle to @@ -70,14 +71,20 @@ class RdtRouter(ws: WSEroutingClient) extends BaseRouter(ws: WSEroutingClient) { : Option[Packet.ResponseSenderForBundle] = { println(s"received sender-request for bundle: ${packet.bp}") - val source_node: Endpoint = packet.bp.source.extract_node_endpoint() - val rdt_id: String = packet.bp.destination.extract_endpoint_id() - val dots: Dots = tempDotsStore.getOrDefault(packet.bp.id, Dots.empty) + // we only route rdt packets rn + if !tempRdtMetaInfoStore.contains(packet.bp.id) then { + println(s"bundle meta information for bundle-id ${packet.bp.id} are not available. not routing rn.") + return Option(Packet.ResponseSenderForBundle(bp = packet.bp, clas = List(), delete_afterwards = false)) + } + + val source_node: Endpoint = packet.bp.source.extract_node_endpoint() + val rdt_id: String = packet.bp.destination.extract_endpoint_id() + val rdt_meta_info: RdtMetaInfo = tempRdtMetaInfoStore.get(packet.bp.id) // if we already successfully forwarded this package to enough clas we can 'safely' delete it. if delivered.getOrDefault(packet.bp.id, 0) >= MIN_DELIVERED then { println("bundle was forwarded to enough unique neighbours. deleting.") - tempDotsStore.remove(packet.bp.id) + tempRdtMetaInfoStore.remove(packet.bp.id) tempPreviousNodeStore.remove(packet.bp.id) tempRdtIdStore.remove(packet.bp.id) return Option(Packet.ResponseSenderForBundle(bp = packet.bp, clas = List(), delete_afterwards = true)) @@ -91,7 +98,7 @@ class RdtRouter(ws: WSEroutingClient) extends BaseRouter(ws: WSEroutingClient) { // get all destination nodes to which we must forward this bundle val destination_nodes: Set[Endpoint] = - dotState.getDestinationNodeEndpoints(source_node, rdt_id, dots) + dotState.getDestinationNodeEndpoints(source_node, rdt_id, rdt_meta_info.dots) println(s"destination nodes: $destination_nodes") // for these destination nodes select the best neighbours to forward this bundle to @@ -107,7 +114,7 @@ class RdtRouter(ws: WSEroutingClient) extends BaseRouter(ws: WSEroutingClient) { // println(s"best neighbours without previous and source node: $best_neighbours") // remove neighbours which already know the state - best_neighbours = dotState.filterNeighbourNodes(best_neighbours, rdt_id, dots) + best_neighbours = dotState.filterNeighbourNodes(best_neighbours, rdt_id, rdt_meta_info.dots) println(s"best neighbours without neighbours that already know the state: $best_neighbours") // use current-peers-list to try and get peer information for each best neighbour @@ -168,12 +175,12 @@ class RdtRouter(ws: WSEroutingClient) extends BaseRouter(ws: WSEroutingClient) { override def onSendingSucceeded(packet: Packet.SendingSucceeded): Unit = { delivered.merge(packet.bid, 1, (x1, x2) => x1 + x2) - val dots = tempDotsStore.get(packet.bid) - val rdt_id = tempRdtIdStore.get(packet.bid) + val rdt_meta_info = tempRdtMetaInfoStore.get(packet.bid) + val rdt_id = tempRdtIdStore.get(packet.bid) - if dots != null && rdt_id != null then { + if rdt_meta_info != null && rdt_id != null then { println(s"sending succeeded for bundle ${packet.bid} on cla ${packet.cla_sender}. merging dots.") - dotState.mergeDots(Endpoint.createFromName(packet.cla_sender), rdt_id, dots) + dotState.mergeDots(Endpoint.createFromName(packet.cla_sender), rdt_id, rdt_meta_info.dots) } else { println( s"sending succeeded for bundle ${packet.bid} on cla ${packet.cla_sender}. could not merge dots because dots or rdt_id are no longer available in temp-store." @@ -184,10 +191,10 @@ class RdtRouter(ws: WSEroutingClient) extends BaseRouter(ws: WSEroutingClient) { override def onIncomingBundle(packet: Packet.IncomingBundle): Unit = { // we always merge the bundle information, as we might receive it from multiple nodes, which is valuable information to us - val source_node = packet.bndl.primary_block.source.extract_node_endpoint() - val rdt_id = packet.bndl.primary_block.destination.extract_endpoint_id() - var previous_node: Option[Endpoint] = None - var dots: Option[Dots] = None + val source_node = packet.bndl.primary_block.source.extract_node_endpoint() + val rdt_id = packet.bndl.primary_block.destination.extract_endpoint_id() + var previous_node: Option[Endpoint] = None + var rdt_meta_info: Option[RdtMetaInfo] = None packet.bndl.other_blocks.collectFirst { case x: PreviousNodeBlock => x @@ -199,18 +206,18 @@ class RdtRouter(ws: WSEroutingClient) extends BaseRouter(ws: WSEroutingClient) { case x: RdtMetaBlock => x } match { case None => println("received incoming bundle without rdt-meta block. ignoring") - case Some(rdt_meta_block) => dots = Option(rdt_meta_block.dots) + case Some(rdt_meta_block) => rdt_meta_info = Option(rdt_meta_block.info) } - if previous_node.nonEmpty && dots.nonEmpty then { + if previous_node.nonEmpty && rdt_meta_info.nonEmpty then { println(s"received incoming bundle ${packet.bndl.id} with rdt-meta block and previous-node block. merging.") likelihoodState.update_score(neighbour_node = previous_node.get, destination_node = source_node) - dotState.mergeDots(source_node, rdt_id, dots.get) - dotState.mergeDots(previous_node.get, rdt_id, dots.get) + dotState.mergeDots(source_node, rdt_id, rdt_meta_info.get.dots) + dotState.mergeDots(previous_node.get, rdt_id, rdt_meta_info.get.dots) tempPreviousNodeStore.put(packet.bndl.id, previous_node.get) - tempDotsStore.put(packet.bndl.id, dots.get) + tempRdtMetaInfoStore.put(packet.bndl.id, rdt_meta_info.get) tempRdtIdStore.put(packet.bndl.id, rdt_id) () } diff --git a/Modules/DTN/shared/src/main/scala/dtn/routing/RdtRouter2.scala b/Modules/DTN/shared/src/main/scala/dtn/routing/RdtRouter2.scala index e6bcc53fa..35c28a295 100644 --- a/Modules/DTN/shared/src/main/scala/dtn/routing/RdtRouter2.scala +++ b/Modules/DTN/shared/src/main/scala/dtn/routing/RdtRouter2.scala @@ -9,6 +9,7 @@ import scala.concurrent.Future import scala.jdk.CollectionConverters.* import scala.math.{addExact, max} import scala.util.{Random, Try} +import dtn.RdtMetaInfo /* This alternative RdtRouter does a simple limited flooding approach. @@ -36,27 +37,27 @@ class RdtRouter2(ws: WSEroutingClient) extends BaseRouter(ws: WSEroutingClient) val dotState: DotState2 = DotState2() // grows indefinitely - val tempDotsStore: ConcurrentHashMap[String, Dots] = ConcurrentHashMap() - val tempRdtIdStore: ConcurrentHashMap[String, String] = ConcurrentHashMap() + val tempRdtMetaInfoStore: ConcurrentHashMap[String, RdtMetaInfo] = ConcurrentHashMap() + val tempRdtIdStore: ConcurrentHashMap[String, String] = ConcurrentHashMap() override def onRequestSenderForBundle(packet: Packet.RequestSenderForBundle) : Option[Packet.ResponseSenderForBundle] = { println(s"received sender-request for bundle: ${packet.bp}") - val rdt_id: String = packet.bp.destination.extract_endpoint_id() - - if !tempDotsStore.contains(packet.bp.id) then { + // we only route rdt packets rn + if !tempRdtMetaInfoStore.contains(packet.bp.id) then { println(s"bundle meta information for bundle-id ${packet.bp.id} are not available. not routing rn.") return Option(Packet.ResponseSenderForBundle(bp = packet.bp, clas = List(), delete_afterwards = false)) } - val dots: Dots = tempDotsStore.getOrDefault(packet.bp.id, Dots.empty) + val rdt_id: String = packet.bp.destination.extract_endpoint_id() + val rdt_meta_info: RdtMetaInfo = tempRdtMetaInfoStore.get(packet.bp.id) val all_peers = peers.values().asScala println(s"all peers: ${all_peers.toList}") // filter peers for peers that still need this bundles dot-state - val filtered_peers = dotState.filterPeers(all_peers, rdt_id, dots) + val filtered_peers = dotState.filterPeers(all_peers, rdt_id, rdt_meta_info.dots) println(s"filtered peers: ${filtered_peers.toList}") // use peer-info and available clas' to build a list of cla-connections to forward the bundle over @@ -89,12 +90,12 @@ class RdtRouter2(ws: WSEroutingClient) extends BaseRouter(ws: WSEroutingClient) } override def onSendingSucceeded(packet: Packet.SendingSucceeded): Unit = { - val dots = tempDotsStore.get(packet.bid) - val rdt_id = tempRdtIdStore.get(packet.bid) + val rdt_id = tempRdtIdStore.get(packet.bid) + val rdt_meta_info = tempRdtMetaInfoStore.get(packet.bid) - if dots != null && rdt_id != null then { + if rdt_meta_info != null && rdt_id != null then { println(s"sending succeeded for bundle ${packet.bid} on cla ${packet.cla_sender}. merging dots.") - dotState.mergeDots(Endpoint.createFromName(packet.cla_sender), rdt_id, dots) + dotState.mergeDots(Endpoint.createFromName(packet.cla_sender), rdt_id, rdt_meta_info.dots) } else { println( s"sending succeeded for bundle ${packet.bid} on cla ${packet.cla_sender}. could not merge dots because dots or rdt_id are no longer available in temp-store." @@ -105,10 +106,10 @@ class RdtRouter2(ws: WSEroutingClient) extends BaseRouter(ws: WSEroutingClient) override def onIncomingBundle(packet: Packet.IncomingBundle): Unit = { // we always merge the bundle information, as we might receive it from multiple nodes, which is valuable information to us - val source_node = packet.bndl.primary_block.source.extract_node_endpoint() - val rdt_id = packet.bndl.primary_block.destination.extract_endpoint_id() - var previous_node: Option[Endpoint] = None - var dots: Option[Dots] = None + val source_node = packet.bndl.primary_block.source.extract_node_endpoint() + val rdt_id = packet.bndl.primary_block.destination.extract_endpoint_id() + var previous_node: Option[Endpoint] = None + var rdt_meta_info: Option[RdtMetaInfo] = None packet.bndl.other_blocks.collectFirst { case x: PreviousNodeBlock => x @@ -120,15 +121,15 @@ class RdtRouter2(ws: WSEroutingClient) extends BaseRouter(ws: WSEroutingClient) case x: RdtMetaBlock => x } match { case None => println("received incoming bundle without rdt-meta block. ignoring") - case Some(rdt_meta_block) => dots = Option(rdt_meta_block.dots) + case Some(rdt_meta_block) => rdt_meta_info = Option(rdt_meta_block.info) } - if previous_node.nonEmpty && dots.nonEmpty then { + if previous_node.nonEmpty && rdt_meta_info.nonEmpty then { println(s"received incoming bundle ${packet.bndl.id} with rdt-meta block and previous-node block. merging.") - dotState.mergeDots(source_node, rdt_id, dots.get) - dotState.mergeDots(previous_node.get, rdt_id, dots.get) + dotState.mergeDots(source_node, rdt_id, rdt_meta_info.get.dots) + dotState.mergeDots(previous_node.get, rdt_id, rdt_meta_info.get.dots) - tempDotsStore.put(packet.bndl.id, dots.get) + tempRdtMetaInfoStore.put(packet.bndl.id, rdt_meta_info.get) tempRdtIdStore.put(packet.bndl.id, rdt_id) () } diff --git a/Modules/Replication/js-jvm/src/main/scala/replication/DTNChannel.scala b/Modules/Replication/js-jvm/src/main/scala/replication/DTNChannel.scala index 5d8f7bfb9..10f6e218e 100644 --- a/Modules/Replication/js-jvm/src/main/scala/replication/DTNChannel.scala +++ b/Modules/Replication/js-jvm/src/main/scala/replication/DTNChannel.scala @@ -1,6 +1,6 @@ package replication -import _root_.dtn.{NoDotsConvergenceClient, RdtClient} +import _root_.dtn.{NoDotsConvergenceClient, RdtClient, RdtMessageType} import channels.{Abort, Connection, Handler, LatentConnection, MessageBuffer} import com.github.plokhotnyuk.jsoniter_scala.core.{JsonValueCodec, readFromArray, writeToArray} import de.rmgk.delay @@ -17,10 +17,9 @@ class DTNRdtClientContext[T: JsonValueCodec](connection: RdtClient, executionCon override def send(message: ProtocolMessage[T]): Async[Any, Unit] = message match case ProtocolMessage.Request(sender, dots) => - // TODO: how to handle this? Optimally, this should be answered by sending all known payloads that are larger than the provided set of dots - Sync { () } + connection.send(RdtMessageType.Request, Array(), dots).toAsync(using executionContext) case ProtocolMessage.Payload(sender, dots, data) => - connection.send(writeToArray[T](data), dots).toAsync(using executionContext) + connection.send(RdtMessageType.Payload, writeToArray[T](data), dots).toAsync(using executionContext) override def close(): Unit = connection.close().onComplete { case Failure(f) => f.printStackTrace() @@ -35,21 +34,22 @@ class DTNChannel[T: JsonValueCodec](host: String, port: Int, appName: String, ec // If the local dtnd could be stopped and restarted without loosing data, this id should remain the same for performance reasons, but it will be correct even if it changes. val dtnid = Uid.gen() - override def prepare(incomingHandler: Handler[ProtocolMessage[T]]) - : Async[Abort, Connection[ProtocolMessage[T]]] = Async { - val client: RdtClient = RdtClient(host, port, appName, NoDotsConvergenceClient).toAsync(using ec).bind - val conn = DTNRdtClientContext[T](client, ec) - val cb = incomingHandler.getCallbackFor(conn) + override def prepare(incomingHandler: Handler[ProtocolMessage[T]]): Async[Abort, Connection[ProtocolMessage[T]]] = + Async { + val client: RdtClient = RdtClient(host, port, appName, NoDotsConvergenceClient).toAsync(using ec).bind + val conn = DTNRdtClientContext[T](client, ec) + val cb = incomingHandler.getCallbackFor(conn) - client.registerOnReceive { (payload: Array[Byte], dots: Dots) => - val data = readFromArray[T](payload) - cb.succeed(ProtocolMessage.Payload(dtnid, dots, data)) - } + client.registerOnReceive { (message_type: RdtMessageType, payload: Array[Byte], dots: Dots) => + message_type match + case RdtMessageType.Request => cb.succeed(ProtocolMessage.Request(dtnid, dots)) + case RdtMessageType.Payload => cb.succeed(ProtocolMessage.Payload(dtnid, dots, readFromArray[T](payload))) + } - // TODO: create custom empty request to signal to the application that it should send us all data it has. - // optimally, this should not be an empty set of dots, but those present in the network - cb.succeed(ProtocolMessage.Request(dtnid, Dots.empty)) + // TODO: create custom empty request to signal to the application that it should send us all data it has. + // optimally, this should not be an empty set of dots, but those present in the network + cb.succeed(ProtocolMessage.Request(dtnid, Dots.empty)) - conn - } + conn + } }