Skip to content

Commit

Permalink
added state-to-ratio data-converter. added plotting script.
Browse files Browse the repository at this point in the history
  • Loading branch information
lh70 committed Aug 13, 2024
1 parent 9a923d6 commit bcd41b2
Show file tree
Hide file tree
Showing 3 changed files with 259 additions and 0 deletions.
5 changes: 5 additions & 0 deletions Modules/DTN/jvm/src/main/scala/dtn/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ commandline options:
creationClientId = "dtn://n2/rdt/app1",
paths = MonitoringPaths("/home/kali/REScala/Modules/DTN/simulation/shared/monitoring")
).run()
@main def run_ratio_converter(): Unit =
MonitoringStateDevelopmentToRatioConverter(
creationClientId = "dtn://n2/rdt/app1",
paths = MonitoringPaths("/home/kali/REScala/Modules/DTN/simulation/shared/monitoring")
).run()

@main def start_monitoring_server_default(): Unit = start_monitoring_server("0.0.0.0", 5000)

Expand Down
75 changes: 75 additions & 0 deletions Modules/DTN/jvm/src/main/scala/dtn/Monitoring.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@ import java.time.ZoneId
import java.time.ZonedDateTime
import scala.util.Using

import io.bullet.borer.Codec
import io.bullet.borer.derivation.MapBasedCodecs.*

class MonitoringPaths(base_dir: String = "/shared/monitoring") {
val monitoring_dir = Paths.get(base_dir)
val received_data_fp = monitoring_dir.resolve("received.data")
val forwarded_data_fp = monitoring_dir.resolve("forwarded.data")
val created_and_delivered_data_fp = monitoring_dir.resolve("created_and_delivered.data")
val ratios_fp = monitoring_dir.resolve("ratios.data")
}

class MonitoringServer(server: TCPReadonlyServer, paths: MonitoringPaths = MonitoringPaths()) {
Expand Down Expand Up @@ -216,3 +220,74 @@ class MonitoringStateDevelopmentPrinter(creationClientId: String, paths: Monitor
()
}
}

case class RatioMessage(clientId: String, timestamps: List[ZonedDateTime], ratios: List[Double]) derives Codec

class MonitoringStateDevelopmentToRatioConverter(creationClientId: String, paths: MonitoringPaths = MonitoringPaths()) {
def run(): Unit = {
var data: Map[String, Tuple2[List[ZonedDateTime], List[Double]]] = Map()

Using(Files.newBufferedReader(paths.created_and_delivered_data_fp, StandardCharsets.UTF_8)) { in =>
var creationState: Dots = Dots.empty
var deliveredStates: Map[String, Dots] = Map()

// these count anomalies that should not happen!?
var bundlesCreatedAtOtherNodesCounter: Long = 0
var bundlesDeliveredAtCreationCounter: Long = 0

var line = in.readLine()
while line != null do {
Json.decode(line.getBytes()).to[MonitoringMessage].value match
case MonitoringMessage.BundleReceivedAtRouter(nodeId, bundleId, time) => ()
case MonitoringMessage.BundleForwardedAtRouter(nodeId, bundleId, time) => ()
case MonitoringMessage.BundleDeliveredAtClient(clientId, bundleId, dots, time) => {
if clientId == creationClientId then {
bundlesDeliveredAtCreationCounter += 1
} else {
deliveredStates = deliveredStates.merge(Map(clientId -> dots))

val ratio = deliveredStates(clientId).size.toDouble / creationState.size.toDouble

data = data.updatedWith(clientId)(option => {
option match
case None => Option((List(time.get), List(ratio)))
case Some(tuple) => Option((tuple._1 :+ time.get, tuple._2 :+ ratio))
})
}
}
case MonitoringMessage.BundleCreatedAtClient(clientId, bundleId, dots, time) => {
if clientId != creationClientId then {
bundlesCreatedAtOtherNodesCounter += 1
} else {
creationState = creationState.merge(dots)

deliveredStates.foreach((cId, d) => {
val ratio = d.size.toDouble / creationState.size.toDouble

data = data.updatedWith(cId)(option => {
option match
case None => Option((List(time.get), List(ratio)))
case Some(tuple) => Option((tuple._1 :+ time.get, tuple._2 :+ ratio))
})
})
}
}

line = in.readLine()
}
}.recover(_.printStackTrace())

Using(BufferedOutputStream(Files.newOutputStream(paths.ratios_fp))) { out =>
{
data.foreach((clientId, tuple) => {
val message = RatioMessage(clientId, tuple._1, tuple._2)

out.write(Json.encode[RatioMessage](message).toByteArray)
out.write("\n".getBytes())
out.flush()
})
}
}.recover(_.printStackTrace())
()
}
}
179 changes: 179 additions & 0 deletions Modules/DTN/simulation/shared/plot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
import json

from datetime import datetime, timedelta
from pathlib import Path



import matplotlib.pyplot as plt
import matplotlib.dates as mdates


monitoring_path = (Path(__file__).parent / 'monitoring').resolve()
forwarded_fp = monitoring_path / 'forwarded.data'
received_fp = monitoring_path / 'received.data'
created_and_delivered_fp = monitoring_path / 'created_and_delivered.data'
ratios_fp = monitoring_path / 'ratios.data'


def util_get_time_from_stamp(timestamp):
try:
return datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%S.%fZ[%Z]') # example timestamp: "2024-08-12T16:41:44.797120Z[UTC]"
except:
return datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%SZ[%Z]') # example timestamp: "2024-08-12T16:41:44Z[UTC]"




"""
first plot - bundles forwarded per node over time
"""

### accumulating number of bundles per second ###

begin_time = None

data = {} # structure: {node-id: {second: num-bundles}}

with open(forwarded_fp, 'r', encoding='utf8') as f:
for line in f.readlines():
message = json.loads(line)

message_time = util_get_time_from_stamp(message['time'])

if begin_time == None:
begin_time = message_time

node_name = message['nodeId'].split('/')[-2]

total_second = int((message_time - begin_time).total_seconds())

if node_name not in data:
data[node_name] = {}

if total_second not in data[node_name]:
data[node_name][total_second] = 1
else:
data[node_name][total_second] += 1


### converting structure ####

plot_data = {}

for node_id, d in data.items():
plot_data[node_id] = []

for second, number_of_bundles in sorted(d.items(), key=lambda item: item[0]):
while len(plot_data[node_id]) < second:
plot_data[node_id].append(0)

plot_data[node_id].append(number_of_bundles)


### plot data ###

for node_id, l in plot_data.items():
plt.plot(l, label=node_id)

plt.title('bundles forwarded per second')
plt.ylabel('number of bundles')
plt.xlabel('second')
plt.legend()
plt.show()



"""
second plot - bundles received per node over time:
"""

### accumulating number of bundles per second ###

begin_time = None

data = {} # structure: {node-id: {second: num-bundles}}

with open(received_fp, 'r', encoding='utf8') as f:
for line in f.readlines():
message = json.loads(line)

message_time = util_get_time_from_stamp(message['time'])

if begin_time == None:
begin_time = message_time

node_name = message['nodeId'].split('/')[-2]

total_second = int((message_time - begin_time).total_seconds())

if node_name not in data:
data[node_name] = {}

if total_second not in data[node_name]:
data[node_name][total_second] = 1
else:
data[node_name][total_second] += 1


### converting structure ####

plot_data = {}

for node_id, d in data.items():
plot_data[node_id] = []

for second, number_of_bundles in sorted(d.items(), key=lambda item: item[0]):
while len(plot_data[node_id]) < second:
plot_data[node_id].append(0)

plot_data[node_id].append(number_of_bundles)


### plot data ###

for node_id, l in plot_data.items():
plt.plot(l, label=node_id)

plt.title('bundles received per second')
plt.ylabel('number of bundles')
plt.xlabel('second')
plt.legend()
plt.show()



"""
third plot - state convergence of clients over time
"""

### import ratio converted data ###

plot_data = {}

with open(ratios_fp, 'r', encoding='utf8') as f:
for line in f.readlines():
message = json.loads(line)

converted_timestamps = [util_get_time_from_stamp(timestamp) for timestamp in message['timestamps']]

plot_data[message['clientId']] = (converted_timestamps, message['ratios'])

### plot data ###

locator = mdates.AutoDateLocator()
formatter = mdates.ConciseDateFormatter(locator)
ax = plt.gca()
ax.xaxis.set_major_locator(locator)
ax.xaxis.set_major_formatter(formatter)

for node_id, (l1, l2) in plot_data.items():
plt.plot(l1, l2, label=node_id)

plt.title('state convergence over time')
plt.ylabel('convergence ratio')
plt.xlabel('time')
plt.legend()
plt.show()


0 comments on commit bcd41b2

Please sign in to comment.