Skip to content

Commit

Permalink
added observe-remove set rdt to case study
Browse files Browse the repository at this point in the history
  • Loading branch information
lh70 committed Aug 27, 2024
1 parent 386b9f9 commit b4f2087
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 2 deletions.
18 changes: 16 additions & 2 deletions Modules/DTN/jvm/src/main/scala/dtn/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,22 @@ commandline options:
mode,
AddWinsSetRDT(add_wins_rdt_number_of_additions, add_wins_rdt_sleep_time_milliseconds)
)
case "observeremove.listen" => throw Exception("observeremove.listen not implemented yet")
case "observeremove.active" => throw Exception("observeremove.active not implemented yet")
case "observeremove.listen" =>
case_study_listen(
host_address,
host_port,
MonitoringClient(monitoring_address, monitoring_port),
mode,
ObserveRemoveSetRDT(add_wins_rdt_number_of_additions, add_wins_rdt_sleep_time_milliseconds)
)
case "observeremove.active" =>
case_study_active(
host_address,
host_port,
MonitoringClient(monitoring_address, monitoring_port),
mode,
ObserveRemoveSetRDT(add_wins_rdt_number_of_additions, add_wins_rdt_sleep_time_milliseconds)
)
case "lastwriterwins.listen" => throw Exception("lastwriterwins.listen not implemented yet")
case "lastwriterwins.active" => throw Exception("lastwriterwins.active not implemented yet")
case s => throw Exception(s"unknown client rdt: $s")
Expand Down
114 changes: 114 additions & 0 deletions Modules/DTN/shared/src/main/scala/dtn/RdtCreation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import com.github.plokhotnyuk.jsoniter_scala.core.JsonValueCodec
import com.github.plokhotnyuk.jsoniter_scala.macros.JsonCodecMaker
import com.github.plokhotnyuk.jsoniter_scala.macros.CodecMakerConfig
import dtn.rdt.ClientOperationMode
import rdts.datatypes.alternatives.ObserveRemoveSet
import scala.util.Random

trait CaseStudyRdt {
def connect(
Expand Down Expand Up @@ -69,3 +71,115 @@ class AddWinsSetRDT(number_of_additions: Int, sleep_time_milliseconds: Long) ext
}
}
}

class ObserveRemoveSetRDT(number_of_changes: Int, sleep_time_milliseconds: Long) extends CaseStudyRdt {
type RdtType = ObserveRemoveSet[String]

given JsonValueCodec[RdtType] = JsonCodecMaker.make(CodecMakerConfig.withMapAsArray(true))

val dataManager: DataManager[RdtType] = DataManager[RdtType](
LocalUid.gen(),
state =>
println("replica received new state information"), // we ignore state updates as there will be only one active rdt
_ => ()
)

val data: RdtType = ObserveRemoveSet.empty[String]

def connect(
host: String,
port: Int,
monitoringClient: MonitoringClientInterface,
operationMode: ClientOperationMode
): Unit = {
dataManager.addLatentConnection(Channel[RdtType](
host,
port,
"app1",
scala.concurrent.ExecutionContext.global,
monitoringClient,
operationMode
))
}

def caseStudyListen(): Unit = {
while true do {
Thread.sleep(1000)
}
}

def caseStudyActive(): Unit = {
println("started active observe-remove-set rdt.")
println(s"\nnumber of changes: ${number_of_changes}\nsleep-time milliseconds: ${sleep_time_milliseconds}")

Thread.sleep(10 * 1000)

for i <- 0 to number_of_changes do {
Thread.sleep(sleep_time_milliseconds)

dataManager.applyUnrelatedDelta(data.add(s"hello world ${i} from ${dataManager.replicaId}"))

if i > 0 && i % 10 == 0 then {
for j <- i - 10 to Random().between(i - 10, i) do {
dataManager.applyUnrelatedDelta(data.remove(s"hello world ${j} from ${dataManager.replicaId}"))
}
}
}

while true do {
Thread.sleep(1000)
}
}
}

/*
class LastWriterWinsRDT(number_of_changes: Int, sleep_time_milliseconds: Long) extends CaseStudyRdt {
type RdtType = Set[String] // this is not right
given JsonValueCodec[RdtType] = JsonCodecMaker.make(CodecMakerConfig.withMapAsArray(true))
val dataManager: DataManager[RdtType] = DataManager[RdtType](
LocalUid.gen(),
state => println("replica received new state information"),
_ => ()
)
def connect(
host: String,
port: Int,
monitoringClient: MonitoringClientInterface,
operationMode: ClientOperationMode
): Unit = {
dataManager.addLatentConnection(Channel[RdtType](
host,
port,
"app1",
scala.concurrent.ExecutionContext.global,
monitoringClient,
operationMode
))
}
def caseStudyListen(): Unit = {
while true do {
Thread.sleep(1000)
}
}
def caseStudyActive(): Unit = {
println("started active last-writer-wins rdt.")
println(s"\nnumber of changes: ${number_of_changes}\nsleep-time milliseconds: ${sleep_time_milliseconds}")
Thread.sleep(10 * 1000)
for i <- 0 to number_of_changes do {
Thread.sleep(sleep_time_milliseconds)
dataManager.applyUnrelatedDelta(s"hello world ${i} from ${dataManager.replicaId}")
}
while true do {
Thread.sleep(1000)
}
}
}
*/

0 comments on commit b4f2087

Please sign in to comment.