diff --git a/Modules/DTN/shared/src/main/scala/dtn/RdtCreation.scala b/Modules/DTN/shared/src/main/scala/dtn/RdtCreation.scala index 94683638d..bfa6875df 100644 --- a/Modules/DTN/shared/src/main/scala/dtn/RdtCreation.scala +++ b/Modules/DTN/shared/src/main/scala/dtn/RdtCreation.scala @@ -1,6 +1,6 @@ package dtn -import _root_.replication.DataManager +import _root_.replication.{DataManager, ProtocolDots} import rdts.base.LocalUid import dtn.rdt.Channel import _root_.replication.JsoniterCodecs.given @@ -10,6 +10,10 @@ import com.github.plokhotnyuk.jsoniter_scala.macros.CodecMakerConfig import dtn.rdt.ClientOperationMode import rdts.datatypes.alternatives.ObserveRemoveSet import scala.util.Random +import rdts.dotted.Obrem +import rdts.datatypes.contextual.ObserveRemoveMap +import rdts.time.Dot +import rdts.base.Lattice trait CaseStudyRdt { def connect( @@ -75,18 +79,35 @@ class AddWinsSetRDT(number_of_additions: Int, sleep_time_milliseconds: Long) ext } class ObserveRemoveSetRDT(number_of_changes: Int, sleep_time_milliseconds: Long) extends CaseStudyRdt { - type RdtType = ObserveRemoveSet[String] + type RdtType = Obrem[ObserveRemoveMap[String, Dot]] given JsonValueCodec[RdtType] = JsonCodecMaker.make(CodecMakerConfig.withMapAsArray(true)) + given Lattice[Dot] = Lattice.assertEquals + + given replicaId: LocalUid = LocalUid.gen() + val dataManager: DataManager[RdtType] = DataManager[RdtType]( - LocalUid.gen(), + replicaId, state => println("replica received new state information"), // we ignore state updates as there will be only one active rdt _ => () ) - var state: RdtType = ObserveRemoveSet.empty[String] + var state: RdtType = Obrem(ObserveRemoveMap.empty[String, Dot]) + + private def addStringGetDelta(s: String): RdtType = { + state.mod { ctx ?=> current => + val nextDot = ctx.nextDot(dataManager.replicaId.uid) + current.update(s, nextDot) + } + } + + private def removeStringGetDelta(s: String): RdtType = { + state.mod { ctx ?=> current => + current.remove(s) + } + } def connect( host: String, @@ -119,17 +140,17 @@ class ObserveRemoveSetRDT(number_of_changes: Int, sleep_time_milliseconds: Long) for i <- 0 to number_of_changes do { Thread.sleep(sleep_time_milliseconds) - var delta = state.add(s"hello world ${i} from ${dataManager.replicaId}") + var delta = addStringGetDelta(s"hello world ${i} from ${dataManager.replicaId}") state = state.merge(delta) if i > 0 && i % 10 == 0 then { for j <- i - 10 to Random().between(i - 10, i) do { - delta = delta.merge(state.remove(s"hello world ${j} from ${dataManager.replicaId}")) + delta = delta.merge(removeStringGetDelta(s"hello world ${j} from ${dataManager.replicaId}")) + state = state.merge(delta) } - state = state.merge(delta) } - dataManager.applyUnrelatedDelta(delta) + dataManager.applyLocalDelta(ProtocolDots(state, state.context)) } println("finshed adding changes")