Skip to content

Commit

Permalink
Added message-type parameter to DTNCannel, RdtClient, Bundle, and rou…
Browse files Browse the repository at this point in the history
…ting, although routing ignores requests for now.
  • Loading branch information
lh70 committed Jul 22, 2024
1 parent 5110221 commit 08606bc
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 75 deletions.
8 changes: 4 additions & 4 deletions Modules/DTN/jvm/src/main/scala/dtn/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,12 @@ def send_one_rdt_package(host: String, port: Int, checkerHost: String, checkerPo
val checkerClient = DotsConvergenceClient(checkerHost, checkerPort)

RdtClient(host, port, "testapp", checkerClient).flatMap(client => {
client.registerOnReceive((payload: Array[Byte], dots: Dots) => {
client.registerOnReceive((message_type: RdtMessageType, payload: Array[Byte], dots: Dots) => {
println(s"received dots: $dots")
})

println(s"sending dots: $dots")
client.send(payload = Array(), dots = dots)
client.send(message_type = RdtMessageType.Payload, payload = Array(), dots = dots)
}).recover(throwable => println(throwable))

while true do {
Expand All @@ -132,7 +132,7 @@ def send_continuous_rdt_packages(host: String, port: Int, checkerHost: String, c
val checkerClient = DotsConvergenceClient(checkerHost, checkerPort)

RdtClient(host, port, "testapp", checkerClient).map(client => {
client.registerOnReceive((payload: Array[Byte], d: Dots) => {
client.registerOnReceive((message_type: RdtMessageType, payload: Array[Byte], d: Dots) => {
dots = dots.merge(d)
println(s"merged rdt-meta data, new dots: $dots")
})
Expand All @@ -143,7 +143,7 @@ def send_continuous_rdt_packages(host: String, port: Int, checkerHost: String, c
// add dots here

println(s"sending new dots: $dots")
client.send(Array(), dots).printError()
client.send(RdtMessageType.Payload, Array(), dots).printError()
}
}).recover(throwable => println(throwable))

Expand Down
14 changes: 11 additions & 3 deletions Modules/DTN/shared/src/main/scala/dtn/Bundle.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ deserialization: CBOR -> object && JSON -> object
todo: for object -> JSON serialization the corresponding counter-method to readBytes(reader: Reader): Array[Byte] is missing. we would need info on the desired serialized presentation (JSON or CBOR) for this.
*/

enum RdtMessageType:
case Request, Payload

case class Endpoint(scheme: Int, specific_part: String | Int) {
def full_uri: String = {
scheme match {
Expand Down Expand Up @@ -136,6 +139,8 @@ case class FragmentInfo(fragment_offset: Int, total_application_data_length: Int

case class HopCount(hop_limit: Int, current_count: Int)

case class RdtMetaInfo(dots: Dots, message_type: RdtMessageType)

case class PrimaryBlock(
version: Int,
bundle_processing_control_flags: BundleProcessingControlFlags,
Expand Down Expand Up @@ -245,15 +250,15 @@ case class RdtMetaBlock(
crc_type: Int,
data: Array[Byte]
) extends CanonicalBlock {
def dots: Dots = Cbor.decode(data).to[Dots].value
def info: RdtMetaInfo = Cbor.decode(data).to[RdtMetaInfo].value
}
object RdtMetaBlock {
def createFrom(dots: Dots): RdtMetaBlock = RdtMetaBlock(
def createFrom(info: RdtMetaInfo): RdtMetaBlock = RdtMetaBlock(
CanonicalBlock.RDT_META_BLOCK_TYPE_CPDE,
0,
BlockProcessingControlFlags(),
0,
Cbor.encode(dots).toByteArray
Cbor.encode(info).toByteArray
)
}

Expand Down Expand Up @@ -302,6 +307,9 @@ private def readBytes(reader: Reader): Array[Byte] = {
}
}

given Codec[RdtMessageType] = deriveCodec[RdtMessageType]
given Codec[RdtMetaInfo] = deriveCodec[RdtMetaInfo]

given Encoder[Uid] = Encoder.forString.asInstanceOf[Encoder[Uid]]
given Decoder[Uid] = Decoder.forString.asInstanceOf[Decoder[Uid]]

Expand Down
3 changes: 2 additions & 1 deletion Modules/DTN/shared/src/main/scala/dtn/BundleCreation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ object BundleCreation {
}

def createBundleRdt(
message_type: RdtMessageType,
data: Array[Byte],
dots: Dots,
node: Endpoint,
Expand All @@ -61,7 +62,7 @@ object BundleCreation {
lifetime = 1000 * 3600 * 24
)

val rdtmeta_block: RdtMetaBlock = RdtMetaBlock.createFrom(dots).copy(block_number = 4)
val rdtmeta_block: RdtMetaBlock = RdtMetaBlock.createFrom(RdtMetaInfo(dots, message_type)).copy(block_number = 4)

val hop_count_block: HopCountBlock =
HopCountBlock.createFrom(HopCount(hop_limit = 32, current_count = 0)).copy(block_number = 3)
Expand Down
15 changes: 8 additions & 7 deletions Modules/DTN/shared/src/main/scala/dtn/RdtClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

class RdtClient(ws: WSEndpointClient, appName: String, checkerClient: ConvergenceClientInterface) {
def send(payload: Array[Byte], dots: Dots): Future[Unit] = {
def send(message_type: RdtMessageType, payload: Array[Byte], dots: Dots): Future[Unit] = {
val bundle: Bundle = BundleCreation.createBundleRdt(
message_type = message_type,
data = payload,
dots = dots,
node = Endpoint.createFrom(ws.nodeId),
Expand All @@ -18,20 +19,20 @@ class RdtClient(ws: WSEndpointClient, appName: String, checkerClient: Convergenc
ws.sendBundle(bundle)
}

def registerOnReceive(callback: (Array[Byte], Dots) => Unit): Unit = {
def registerOnReceive(callback: (RdtMessageType, Array[Byte], Dots) => Unit): Unit = {
// flush receive forever and call callback
def flush_receive(): Future[Bundle] = {
ws.receiveBundle().flatMap(bundle => {
println(s"received bundle: ${bundle.id}")

val payload: Option[Array[Byte]] = bundle.other_blocks.collectFirst({ case x: PayloadBlock => x.data })
val dots: Option[Dots] = bundle.other_blocks.collectFirst({ case x: RdtMetaBlock => x.dots })
val payload: Option[Array[Byte]] = bundle.other_blocks.collectFirst({ case x: PayloadBlock => x.data })
val rdt_meta_info: Option[RdtMetaInfo] = bundle.other_blocks.collectFirst({ case x: RdtMetaBlock => x.info })

if payload.isEmpty || dots.isEmpty then {
if payload.isEmpty || rdt_meta_info.isEmpty then {
println("did not contain dots or payload. bundle is no rdt bundle. ignoring bundle.")
} else {
checkerClient.send(dots.get)
callback(payload.get, dots.get)
checkerClient.send(rdt_meta_info.get.dots)
callback(rdt_meta_info.get.message_type, payload.get, rdt_meta_info.get.dots)
}

flush_receive()
Expand Down
51 changes: 29 additions & 22 deletions Modules/DTN/shared/src/main/scala/dtn/routing/RdtRouter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import scala.concurrent.Future
import scala.jdk.CollectionConverters.*
import scala.math.{addExact, max}
import scala.util.{Random, Try}
import dtn.RdtMetaInfo

// Variables, chosen number, for this routing
val MIN_DELIVERED = 10
Expand Down Expand Up @@ -59,9 +60,9 @@ class RdtRouter(ws: WSEroutingClient) extends BaseRouter(ws: WSEroutingClient) {
val dotState: DotState = DotState()

// maybe grows indefinitely, but only if we receive a bundle which will not be forwarded (hop count depleted?, local unicast endpoint?)
val tempDotsStore: ConcurrentHashMap[String, Dots] = ConcurrentHashMap()
val tempPreviousNodeStore: ConcurrentHashMap[String, Endpoint] = ConcurrentHashMap()
val tempRdtIdStore: ConcurrentHashMap[String, String] = ConcurrentHashMap()
val tempRdtMetaInfoStore: ConcurrentHashMap[String, RdtMetaInfo] = ConcurrentHashMap()
val tempPreviousNodeStore: ConcurrentHashMap[String, Endpoint] = ConcurrentHashMap()
val tempRdtIdStore: ConcurrentHashMap[String, String] = ConcurrentHashMap()

val delivered: ConcurrentHashMap[String, Int] =
ConcurrentHashMap() // shows the number of nodes we have delivered a bundle to
Expand All @@ -70,14 +71,20 @@ class RdtRouter(ws: WSEroutingClient) extends BaseRouter(ws: WSEroutingClient) {
: Option[Packet.ResponseSenderForBundle] = {
println(s"received sender-request for bundle: ${packet.bp}")

val source_node: Endpoint = packet.bp.source.extract_node_endpoint()
val rdt_id: String = packet.bp.destination.extract_endpoint_id()
val dots: Dots = tempDotsStore.getOrDefault(packet.bp.id, Dots.empty)
// we only route rdt packets rn
if !tempRdtMetaInfoStore.contains(packet.bp.id) then {
println(s"bundle meta information for bundle-id ${packet.bp.id} are not available. not routing rn.")
return Option(Packet.ResponseSenderForBundle(bp = packet.bp, clas = List(), delete_afterwards = false))
}

val source_node: Endpoint = packet.bp.source.extract_node_endpoint()
val rdt_id: String = packet.bp.destination.extract_endpoint_id()
val rdt_meta_info: RdtMetaInfo = tempRdtMetaInfoStore.get(packet.bp.id)

// if we already successfully forwarded this package to enough clas we can 'safely' delete it.
if delivered.getOrDefault(packet.bp.id, 0) >= MIN_DELIVERED then {
println("bundle was forwarded to enough unique neighbours. deleting.")
tempDotsStore.remove(packet.bp.id)
tempRdtMetaInfoStore.remove(packet.bp.id)
tempPreviousNodeStore.remove(packet.bp.id)
tempRdtIdStore.remove(packet.bp.id)
return Option(Packet.ResponseSenderForBundle(bp = packet.bp, clas = List(), delete_afterwards = true))
Expand All @@ -91,7 +98,7 @@ class RdtRouter(ws: WSEroutingClient) extends BaseRouter(ws: WSEroutingClient) {

// get all destination nodes to which we must forward this bundle
val destination_nodes: Set[Endpoint] =
dotState.getDestinationNodeEndpoints(source_node, rdt_id, dots)
dotState.getDestinationNodeEndpoints(source_node, rdt_id, rdt_meta_info.dots)
println(s"destination nodes: $destination_nodes")

// for these destination nodes select the best neighbours to forward this bundle to
Expand All @@ -107,7 +114,7 @@ class RdtRouter(ws: WSEroutingClient) extends BaseRouter(ws: WSEroutingClient) {
// println(s"best neighbours without previous and source node: $best_neighbours")

// remove neighbours which already know the state
best_neighbours = dotState.filterNeighbourNodes(best_neighbours, rdt_id, dots)
best_neighbours = dotState.filterNeighbourNodes(best_neighbours, rdt_id, rdt_meta_info.dots)
println(s"best neighbours without neighbours that already know the state: $best_neighbours")

// use current-peers-list to try and get peer information for each best neighbour
Expand Down Expand Up @@ -168,12 +175,12 @@ class RdtRouter(ws: WSEroutingClient) extends BaseRouter(ws: WSEroutingClient) {
override def onSendingSucceeded(packet: Packet.SendingSucceeded): Unit = {
delivered.merge(packet.bid, 1, (x1, x2) => x1 + x2)

val dots = tempDotsStore.get(packet.bid)
val rdt_id = tempRdtIdStore.get(packet.bid)
val rdt_meta_info = tempRdtMetaInfoStore.get(packet.bid)
val rdt_id = tempRdtIdStore.get(packet.bid)

if dots != null && rdt_id != null then {
if rdt_meta_info != null && rdt_id != null then {
println(s"sending succeeded for bundle ${packet.bid} on cla ${packet.cla_sender}. merging dots.")
dotState.mergeDots(Endpoint.createFromName(packet.cla_sender), rdt_id, dots)
dotState.mergeDots(Endpoint.createFromName(packet.cla_sender), rdt_id, rdt_meta_info.dots)
} else {
println(
s"sending succeeded for bundle ${packet.bid} on cla ${packet.cla_sender}. could not merge dots because dots or rdt_id are no longer available in temp-store."
Expand All @@ -184,10 +191,10 @@ class RdtRouter(ws: WSEroutingClient) extends BaseRouter(ws: WSEroutingClient) {
override def onIncomingBundle(packet: Packet.IncomingBundle): Unit = {
// we always merge the bundle information, as we might receive it from multiple nodes, which is valuable information to us

val source_node = packet.bndl.primary_block.source.extract_node_endpoint()
val rdt_id = packet.bndl.primary_block.destination.extract_endpoint_id()
var previous_node: Option[Endpoint] = None
var dots: Option[Dots] = None
val source_node = packet.bndl.primary_block.source.extract_node_endpoint()
val rdt_id = packet.bndl.primary_block.destination.extract_endpoint_id()
var previous_node: Option[Endpoint] = None
var rdt_meta_info: Option[RdtMetaInfo] = None

packet.bndl.other_blocks.collectFirst {
case x: PreviousNodeBlock => x
Expand All @@ -199,18 +206,18 @@ class RdtRouter(ws: WSEroutingClient) extends BaseRouter(ws: WSEroutingClient) {
case x: RdtMetaBlock => x
} match {
case None => println("received incoming bundle without rdt-meta block. ignoring")
case Some(rdt_meta_block) => dots = Option(rdt_meta_block.dots)
case Some(rdt_meta_block) => rdt_meta_info = Option(rdt_meta_block.info)
}

if previous_node.nonEmpty && dots.nonEmpty then {
if previous_node.nonEmpty && rdt_meta_info.nonEmpty then {
println(s"received incoming bundle ${packet.bndl.id} with rdt-meta block and previous-node block. merging.")

likelihoodState.update_score(neighbour_node = previous_node.get, destination_node = source_node)
dotState.mergeDots(source_node, rdt_id, dots.get)
dotState.mergeDots(previous_node.get, rdt_id, dots.get)
dotState.mergeDots(source_node, rdt_id, rdt_meta_info.get.dots)
dotState.mergeDots(previous_node.get, rdt_id, rdt_meta_info.get.dots)

tempPreviousNodeStore.put(packet.bndl.id, previous_node.get)
tempDotsStore.put(packet.bndl.id, dots.get)
tempRdtMetaInfoStore.put(packet.bndl.id, rdt_meta_info.get)
tempRdtIdStore.put(packet.bndl.id, rdt_id)
()
}
Expand Down
41 changes: 21 additions & 20 deletions Modules/DTN/shared/src/main/scala/dtn/routing/RdtRouter2.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import scala.concurrent.Future
import scala.jdk.CollectionConverters.*
import scala.math.{addExact, max}
import scala.util.{Random, Try}
import dtn.RdtMetaInfo

/*
This alternative RdtRouter does a simple limited flooding approach.
Expand Down Expand Up @@ -36,27 +37,27 @@ class RdtRouter2(ws: WSEroutingClient) extends BaseRouter(ws: WSEroutingClient)
val dotState: DotState2 = DotState2()

// grows indefinitely
val tempDotsStore: ConcurrentHashMap[String, Dots] = ConcurrentHashMap()
val tempRdtIdStore: ConcurrentHashMap[String, String] = ConcurrentHashMap()
val tempRdtMetaInfoStore: ConcurrentHashMap[String, RdtMetaInfo] = ConcurrentHashMap()
val tempRdtIdStore: ConcurrentHashMap[String, String] = ConcurrentHashMap()

override def onRequestSenderForBundle(packet: Packet.RequestSenderForBundle)
: Option[Packet.ResponseSenderForBundle] = {
println(s"received sender-request for bundle: ${packet.bp}")

val rdt_id: String = packet.bp.destination.extract_endpoint_id()

if !tempDotsStore.contains(packet.bp.id) then {
// we only route rdt packets rn
if !tempRdtMetaInfoStore.contains(packet.bp.id) then {
println(s"bundle meta information for bundle-id ${packet.bp.id} are not available. not routing rn.")
return Option(Packet.ResponseSenderForBundle(bp = packet.bp, clas = List(), delete_afterwards = false))
}

val dots: Dots = tempDotsStore.getOrDefault(packet.bp.id, Dots.empty)
val rdt_id: String = packet.bp.destination.extract_endpoint_id()
val rdt_meta_info: RdtMetaInfo = tempRdtMetaInfoStore.get(packet.bp.id)

val all_peers = peers.values().asScala
println(s"all peers: ${all_peers.toList}")

// filter peers for peers that still need this bundles dot-state
val filtered_peers = dotState.filterPeers(all_peers, rdt_id, dots)
val filtered_peers = dotState.filterPeers(all_peers, rdt_id, rdt_meta_info.dots)
println(s"filtered peers: ${filtered_peers.toList}")

// use peer-info and available clas' to build a list of cla-connections to forward the bundle over
Expand Down Expand Up @@ -89,12 +90,12 @@ class RdtRouter2(ws: WSEroutingClient) extends BaseRouter(ws: WSEroutingClient)
}

override def onSendingSucceeded(packet: Packet.SendingSucceeded): Unit = {
val dots = tempDotsStore.get(packet.bid)
val rdt_id = tempRdtIdStore.get(packet.bid)
val rdt_id = tempRdtIdStore.get(packet.bid)
val rdt_meta_info = tempRdtMetaInfoStore.get(packet.bid)

if dots != null && rdt_id != null then {
if rdt_meta_info != null && rdt_id != null then {
println(s"sending succeeded for bundle ${packet.bid} on cla ${packet.cla_sender}. merging dots.")
dotState.mergeDots(Endpoint.createFromName(packet.cla_sender), rdt_id, dots)
dotState.mergeDots(Endpoint.createFromName(packet.cla_sender), rdt_id, rdt_meta_info.dots)
} else {
println(
s"sending succeeded for bundle ${packet.bid} on cla ${packet.cla_sender}. could not merge dots because dots or rdt_id are no longer available in temp-store."
Expand All @@ -105,10 +106,10 @@ class RdtRouter2(ws: WSEroutingClient) extends BaseRouter(ws: WSEroutingClient)
override def onIncomingBundle(packet: Packet.IncomingBundle): Unit = {
// we always merge the bundle information, as we might receive it from multiple nodes, which is valuable information to us

val source_node = packet.bndl.primary_block.source.extract_node_endpoint()
val rdt_id = packet.bndl.primary_block.destination.extract_endpoint_id()
var previous_node: Option[Endpoint] = None
var dots: Option[Dots] = None
val source_node = packet.bndl.primary_block.source.extract_node_endpoint()
val rdt_id = packet.bndl.primary_block.destination.extract_endpoint_id()
var previous_node: Option[Endpoint] = None
var rdt_meta_info: Option[RdtMetaInfo] = None

packet.bndl.other_blocks.collectFirst {
case x: PreviousNodeBlock => x
Expand All @@ -120,15 +121,15 @@ class RdtRouter2(ws: WSEroutingClient) extends BaseRouter(ws: WSEroutingClient)
case x: RdtMetaBlock => x
} match {
case None => println("received incoming bundle without rdt-meta block. ignoring")
case Some(rdt_meta_block) => dots = Option(rdt_meta_block.dots)
case Some(rdt_meta_block) => rdt_meta_info = Option(rdt_meta_block.info)
}

if previous_node.nonEmpty && dots.nonEmpty then {
if previous_node.nonEmpty && rdt_meta_info.nonEmpty then {
println(s"received incoming bundle ${packet.bndl.id} with rdt-meta block and previous-node block. merging.")
dotState.mergeDots(source_node, rdt_id, dots.get)
dotState.mergeDots(previous_node.get, rdt_id, dots.get)
dotState.mergeDots(source_node, rdt_id, rdt_meta_info.get.dots)
dotState.mergeDots(previous_node.get, rdt_id, rdt_meta_info.get.dots)

tempDotsStore.put(packet.bndl.id, dots.get)
tempRdtMetaInfoStore.put(packet.bndl.id, rdt_meta_info.get)
tempRdtIdStore.put(packet.bndl.id, rdt_id)
()
}
Expand Down
Loading

0 comments on commit 08606bc

Please sign in to comment.