Skip to content

Commit

Permalink
added new observe-remove type that encodes add-remove relation into t…
Browse files Browse the repository at this point in the history
…he dots
  • Loading branch information
lh70 committed Aug 27, 2024
1 parent 7fa1520 commit 4bdc7a8
Showing 1 changed file with 29 additions and 8 deletions.
37 changes: 29 additions & 8 deletions Modules/DTN/shared/src/main/scala/dtn/RdtCreation.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package dtn

import _root_.replication.DataManager
import _root_.replication.{DataManager, ProtocolDots}
import rdts.base.LocalUid
import dtn.rdt.Channel
import _root_.replication.JsoniterCodecs.given
Expand All @@ -10,6 +10,10 @@ import com.github.plokhotnyuk.jsoniter_scala.macros.CodecMakerConfig
import dtn.rdt.ClientOperationMode
import rdts.datatypes.alternatives.ObserveRemoveSet
import scala.util.Random
import rdts.dotted.Obrem
import rdts.datatypes.contextual.ObserveRemoveMap
import rdts.time.Dot
import rdts.base.Lattice

trait CaseStudyRdt {
def connect(
Expand Down Expand Up @@ -75,18 +79,35 @@ class AddWinsSetRDT(number_of_additions: Int, sleep_time_milliseconds: Long) ext
}

class ObserveRemoveSetRDT(number_of_changes: Int, sleep_time_milliseconds: Long) extends CaseStudyRdt {
type RdtType = ObserveRemoveSet[String]
type RdtType = Obrem[ObserveRemoveMap[String, Dot]]

given JsonValueCodec[RdtType] = JsonCodecMaker.make(CodecMakerConfig.withMapAsArray(true))

given Lattice[Dot] = Lattice.assertEquals

given replicaId: LocalUid = LocalUid.gen()

val dataManager: DataManager[RdtType] = DataManager[RdtType](
LocalUid.gen(),
replicaId,
state =>
println("replica received new state information"), // we ignore state updates as there will be only one active rdt
_ => ()
)

var state: RdtType = ObserveRemoveSet.empty[String]
var state: RdtType = Obrem(ObserveRemoveMap.empty[String, Dot])

private def addStringGetDelta(s: String): RdtType = {
state.mod { ctx ?=> current =>
val nextDot = ctx.nextDot(dataManager.replicaId.uid)
current.update(s, nextDot)
}
}

private def removeStringGetDelta(s: String): RdtType = {
state.mod { ctx ?=> current =>
current.remove(s)
}
}

def connect(
host: String,
Expand Down Expand Up @@ -119,17 +140,17 @@ class ObserveRemoveSetRDT(number_of_changes: Int, sleep_time_milliseconds: Long)
for i <- 0 to number_of_changes do {
Thread.sleep(sleep_time_milliseconds)

var delta = state.add(s"hello world ${i} from ${dataManager.replicaId}")
var delta = addStringGetDelta(s"hello world ${i} from ${dataManager.replicaId}")
state = state.merge(delta)

if i > 0 && i % 10 == 0 then {
for j <- i - 10 to Random().between(i - 10, i) do {
delta = delta.merge(state.remove(s"hello world ${j} from ${dataManager.replicaId}"))
delta = delta.merge(removeStringGetDelta(s"hello world ${j} from ${dataManager.replicaId}"))
state = state.merge(delta)
}
state = state.merge(delta)
}

dataManager.applyUnrelatedDelta(delta)
dataManager.applyLocalDelta(ProtocolDots(state, state.context))
}

println("finshed adding changes")
Expand Down

0 comments on commit 4bdc7a8

Please sign in to comment.