From e6bc2e5a71acd044d6debf0300d422715d895f27 Mon Sep 17 00:00:00 2001 From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com> Date: Fri, 13 Sep 2024 15:31:17 -0300 Subject: [PATCH 01/21] Base MS and subscriptions --- build.gradle | 1 + .../cst/memorystorage/MemoryStorage.java | 138 ++++++++++++++++++ 2 files changed, 139 insertions(+) create mode 100644 src/main/java/br/unicamp/cst/memorystorage/MemoryStorage.java diff --git a/build.gradle b/build.gradle index c0c25090..c5d81970 100644 --- a/build.gradle +++ b/build.gradle @@ -51,6 +51,7 @@ dependencies { testImplementation 'org.junit.jupiter:junit-jupiter-api:5.10.1' testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.10.1' testRuntimeOnly("org.junit.platform:junit-platform-launcher") + api 'io.lettuce:lettuce-core:6.3.2.RELEASE' } diff --git a/src/main/java/br/unicamp/cst/memorystorage/MemoryStorage.java b/src/main/java/br/unicamp/cst/memorystorage/MemoryStorage.java new file mode 100644 index 00000000..d729672c --- /dev/null +++ b/src/main/java/br/unicamp/cst/memorystorage/MemoryStorage.java @@ -0,0 +1,138 @@ +package br.unicamp.cst.memorystorage; + +import br.unicamp.cst.core.entities.Codelet; +import br.unicamp.cst.core.entities.Memory; +import br.unicamp.cst.core.entities.Mind; + +import java.util.HashMap; +import java.util.function.Consumer; + +import io.lettuce.core.RedisClient; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.api.async.RedisAsyncCommands; +import io.lettuce.core.pubsub.RedisPubSubAdapter; +import io.lettuce.core.pubsub.RedisPubSubListener; +import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; + +public class MemoryStorage extends Codelet { + private RedisClient redisClient; + private StatefulRedisConnection connection; + private RedisAsyncCommands commands; + private StatefulRedisPubSubConnection pubsubConnection; + + private Mind mind; + private String nodeName; + private String mindName; + private float requestTimeout; + + HashMap> listeners; + + public static void main(String args[]) { + MemoryStorage ms = new MemoryStorage(); + + while (true) { + + } + } + + public MemoryStorage() { + listeners = new HashMap>(); + + redisClient = RedisClient.create("redis://localhost"); + connection = redisClient.connect(); + commands = connection.async(); + pubsubConnection = redisClient.connectPubSub(); + + mindName = "default_mind"; + nodeName = "node"; + + + + RedisPubSubListener listener = new RedisPubSubAdapter() { + @Override + public void message(String received_channel, String message) { + Consumer listener = MemoryStorage.this.listeners.get(received_channel); + + System.out.println(received_channel); + System.out.println(listener); + + if(listener != null) + { + listener.accept(message); + } + } + }; + + pubsubConnection.addListener(listener); + + Consumer handlerTransferMemory = message -> this.handlerTransferMemory(message); + subscribe(String.format("%s:nodes:%s:transfer_memory", mindName, nodeName), handlerTransferMemory); + Consumer handlerNotifyTransfer = message -> this.handlerNotifyTransfer(message); + subscribe(String.format("%s:nodes:%s:transfer_done", mindName, nodeName), handlerNotifyTransfer); + } + + public MemoryStorage(Mind mind, String nodeName, String mindName, float requestTimeout) { + this.mind = mind; + this.nodeName = nodeName; + this.mindName = mindName; + this.requestTimeout = requestTimeout; + + redisClient = RedisClient.create(); + connection = redisClient.connect(); + commands = connection.async(); + + commands.sadd(String.format("%s:nodes", mindName), nodeName); + + Consumer handlerTransferMemory = message -> this.handlerTransferMemory(message); + subscribe(String.format("%s:nodes:%s:transfer_memory", mindName, nodeName), handlerTransferMemory); + + Consumer handlerNotifyTransfer = message -> this.handlerNotifyTransfer(message); + subscribe(String.format("%s:nodes:%s:transfer_done", mindName, nodeName), handlerNotifyTransfer); + } + + @Override + public void accessMemoryObjects() { + } + + @Override + public void calculateActivation() { + } + + @Override + public void proc() { + + } + + + private void sendMemory(Memory memory) { + String memoryName = memory.getName(); + + } + + private void handlerTransferMemory(String message) { + + } + + private void handlerNotifyTransfer(String message) { + + } + + + private void subscribe(String channel, Consumer handler) + { + listeners.put(channel, handler); + pubsubConnection.async().subscribe(channel); + } + + private void unsubscribe(String channel) + { + pubsubConnection.async().unsubscribe(channel); + listeners.remove(channel); + } + + @Override + protected void finalize() { + redisClient.shutdown(); + } + +} From d27a6bff982622b3a932d64863d64740493a0994 Mon Sep 17 00:00:00 2001 From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com> Date: Fri, 20 Sep 2024 16:07:10 -0300 Subject: [PATCH 02/21] MS working Java <-> Java communication and Java <-> Python communication works --- .../cst/memorystorage/MemoryStorage.java | 377 ++++++++++++++++-- 1 file changed, 337 insertions(+), 40 deletions(-) diff --git a/src/main/java/br/unicamp/cst/memorystorage/MemoryStorage.java b/src/main/java/br/unicamp/cst/memorystorage/MemoryStorage.java index d729672c..6f939aa7 100644 --- a/src/main/java/br/unicamp/cst/memorystorage/MemoryStorage.java +++ b/src/main/java/br/unicamp/cst/memorystorage/MemoryStorage.java @@ -2,12 +2,25 @@ import br.unicamp.cst.core.entities.Codelet; import br.unicamp.cst.core.entities.Memory; +import br.unicamp.cst.core.entities.MemoryObject; import br.unicamp.cst.core.entities.Mind; +import java.lang.ref.WeakReference; import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; import java.util.function.Consumer; - +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.slf4j.LoggerFactory; +import com.google.gson.Gson; +import ch.qos.logback.classic.LoggerContext; import io.lettuce.core.RedisClient; +import io.lettuce.core.RedisFuture; import io.lettuce.core.api.StatefulRedisConnection; import io.lettuce.core.api.async.RedisAsyncCommands; import io.lettuce.core.pubsub.RedisPubSubAdapter; @@ -19,45 +32,140 @@ public class MemoryStorage extends Codelet { private StatefulRedisConnection connection; private RedisAsyncCommands commands; private StatefulRedisPubSubConnection pubsubConnection; + HashMap> listeners; private Mind mind; private String nodeName; private String mindName; - private float requestTimeout; + private double requestTimeout; - HashMap> listeners; + private HashMap> memories; + private HashMap lastUpdate; + private HashSet waitingRetrieval; + private HashMap> waitingRequestEvents; + private Executor retrieveExecutor; + + private Gson gson; public static void main(String args[]) { - MemoryStorage ms = new MemoryStorage(); + LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory(); + for (ch.qos.logback.classic.Logger logger : loggerContext.getLoggerList()) { + logger.setLevel(ch.qos.logback.classic.Level.ERROR); + } + + RedisClient redisClient = RedisClient.create("redis://localhost"); + StatefulRedisConnection connection = redisClient.connect(); + RedisAsyncCommands commands = connection.async(); + try { + commands.flushall().get(); + } catch (Exception e) { + // TODO: handle exception + e.printStackTrace(); + } + + String mindName = "default_mind"; + + try { + Mind mind = new Mind(); + MemoryObject memory1 = mind.createMemoryObject("Memory1", ""); + String nodeName = "node0"; + + MemoryStorage ms = new MemoryStorage(mind, nodeName, mindName, 500.0e-3); + ms.timeStep = 100; + + mind.insertCodelet(ms); + mind.start(); + + Thread.sleep(1000); + + Mind mind2 = new Mind(); + MemoryObject mind2_memory1 = mind2.createMemoryObject("Memory1", ""); + MemoryStorage mind2_ms = new MemoryStorage(mind2, nodeName, mindName, 500.0e-3); + mind2_ms.timeStep = 100; + + mind2.insertCodelet(mind2_ms); + mind2.start(); - while (true) { + System.out.println(memory1); + System.out.println(mind2_memory1); + Thread.sleep(1000); + + memory1.setI("INFO"); + + System.out.println(); + System.out.println(memory1); + System.out.println(mind2_memory1); + + Thread.sleep(1000); + + System.out.println(); + System.out.println(memory1); + System.out.println(mind2_memory1); + + long lastTimestamp = memory1.getTimestamp(); + + while (true) { + if (lastTimestamp != memory1.getTimestamp()) { + System.out.println(); + System.out.println(memory1); + + lastTimestamp = memory1.getTimestamp(); + } + } + + // commands.flushall().get(); + + } catch (Exception e) { + // TODO: handle exception + e.printStackTrace(); } } - public MemoryStorage() { + public MemoryStorage(Mind mind, String nodeName, String mindName, double requestTimeout) { listeners = new HashMap>(); + memories = new HashMap>(); + lastUpdate = new HashMap(); + waitingRetrieval = new HashSet(); + waitingRequestEvents = new HashMap>(); + retrieveExecutor = Executors.newFixedThreadPool(3); + gson = new Gson(); + + this.mind = mind; + this.mindName = mindName; + this.requestTimeout = requestTimeout; redisClient = RedisClient.create("redis://localhost"); connection = redisClient.connect(); commands = connection.async(); pubsubConnection = redisClient.connectPubSub(); - mindName = "default_mind"; - nodeName = "node"; + String baseName = nodeName; + + String mindNodesPath = String.format("%s:nodes", mindName); + RedisFuture isMemberFuture = commands.sismember(mindNodesPath, nodeName); + int nodeNumber = 2; + try { + while (isMemberFuture.get()) { + nodeName = baseName + Integer.toString(nodeNumber); + nodeNumber += 1; + isMemberFuture = commands.sismember(mindNodesPath, nodeName); + } + } catch (Exception e) { + // TODO: handle exception + e.printStackTrace(); + } + + this.nodeName = nodeName; + + commands.sadd(mindNodesPath, nodeName); - - RedisPubSubListener listener = new RedisPubSubAdapter() { @Override public void message(String received_channel, String message) { Consumer listener = MemoryStorage.this.listeners.get(received_channel); - System.out.println(received_channel); - System.out.println(listener); - - if(listener != null) - { + if (listener != null) { listener.accept(message); } } @@ -71,25 +179,6 @@ public void message(String received_channel, String message) { subscribe(String.format("%s:nodes:%s:transfer_done", mindName, nodeName), handlerNotifyTransfer); } - public MemoryStorage(Mind mind, String nodeName, String mindName, float requestTimeout) { - this.mind = mind; - this.nodeName = nodeName; - this.mindName = mindName; - this.requestTimeout = requestTimeout; - - redisClient = RedisClient.create(); - connection = redisClient.connect(); - commands = connection.async(); - - commands.sadd(String.format("%s:nodes", mindName), nodeName); - - Consumer handlerTransferMemory = message -> this.handlerTransferMemory(message); - subscribe(String.format("%s:nodes:%s:transfer_memory", mindName, nodeName), handlerTransferMemory); - - Consumer handlerNotifyTransfer = message -> this.handlerNotifyTransfer(message); - subscribe(String.format("%s:nodes:%s:transfer_done", mindName, nodeName), handlerNotifyTransfer); - } - @Override public void accessMemoryObjects() { } @@ -100,32 +189,240 @@ public void calculateActivation() { @Override public void proc() { + HashMap mindMemories = new HashMap(); + HashSet mindMemoriesNames = new HashSet(); + for (Memory memory : mind.getRawMemory().getAllMemoryObjects()) { + String memoryName = memory.getName(); + if (memoryName != "") { + mindMemories.put(memoryName, memory); + mindMemoriesNames.add(memoryName); + } + } + Set memoriesNames = this.memories.keySet(); + + mindMemoriesNames.removeAll(memoriesNames); + Set difference = mindMemoriesNames; + for (String memoryName : difference) { + Memory memory = mindMemories.get(memoryName); + memories.put(memoryName, new WeakReference(memory)); + + String memoryPath = String.format("%s:memories:%s", mindName, memoryName); + RedisFuture existFuture = commands.exists(memoryPath); + try { + if (existFuture.get() > 0) { + retrieveExecutor.execute(() -> { + retrieveMemory(memory); + }); + } else { + Map impostor = new HashMap(); + impostor.put("name", memoryName); + impostor.put("evaluation", "0.0"); + impostor.put("I", ""); + impostor.put("id", "0"); + impostor.put("owner", nodeName); + + commands.hset(memoryPath, impostor); + } + } catch (Exception e) { + // TODO: handle exception + e.printStackTrace(); + } + + String subscribeUpdatePath = String.format("%s:memories:%s:update", mindName, memoryName); + Consumer handlerUpdate = message -> this.updateMemory(memoryName); + subscribe(subscribeUpdatePath, handlerUpdate); + + } + + Set toUpdate = lastUpdate.keySet(); + for (String memoryName : toUpdate) { + if (!memories.containsKey(memoryName)) { + lastUpdate.remove(memoryName); + continue; + } + Memory memory = memories.get(memoryName).get(); + if (memory == null) { + lastUpdate.remove(memoryName); + memories.remove(memoryName); + continue; + } + + if (memory.getTimestamp() > lastUpdate.get(memoryName)) { + updateMemory(memoryName); + } + } + } + + private void updateMemory(String memoryName) { + System.out.println(mindName + ":" + nodeName + " Update memory: " + memoryName); + + String memoryUpdatePath = String.format("%s:memories:%s:update", mindName, memoryName); + + if (!memories.containsKey(memoryName)) { + unsubscribe(memoryUpdatePath); + return; + } + Memory memory = memories.get(memoryName).get(); + if (memory == null) { + unsubscribe(memoryUpdatePath); + memories.remove(memoryName); + return; + } + + try { + String memoryPath = String.format("%s:memories:%s", mindName, memoryName); + Long timestamp = Long.parseLong(commands.hget(memoryPath, "timestamp").get()); + + Long memoryTimestamp = memory.getTimestamp(); + + if (memoryTimestamp < timestamp) { + retrieveExecutor.execute(() -> { + retrieveMemory(memory); + }); + } else if (memoryTimestamp > timestamp) { + sendMemory(memory); + } + + lastUpdate.put(memoryName, memoryTimestamp); + } catch (Exception e) { + // TODO: handle exception + System.err.println("Memory update error!"); + e.printStackTrace(); + } } - private void sendMemory(Memory memory) { + System.out.println(mindName + ":" + nodeName + " Send memory: " + memory.getName()); + String memoryName = memory.getName(); + Long memoryTimestamp = memory.getTimestamp(); + + HashMap memoryDict = new HashMap(); + + memoryDict.put("timestamp", memoryTimestamp.toString()); + memoryDict.put("evaluation", memory.getEvaluation().toString()); + memoryDict.put("name", memoryName); + memoryDict.put("id", memory.getId().toString()); + memoryDict.put("owner", ""); + + Object info = memory.getI(); + memoryDict.put("I", gson.toJson(info)); + String memoryPath = String.format("%s:memories:%s", mindName, memoryName); + commands.hset(memoryPath, memoryDict); + + String memoryUpdatePath = memoryPath + ":update"; + commands.publish(memoryUpdatePath, ""); + + lastUpdate.put(memoryName, memoryTimestamp); } - private void handlerTransferMemory(String message) { + private void retrieveMemory(Memory memory) { + System.out.println(mindName + ":" + nodeName + " Retrieve memory: " + memory.getName()); + String memoryName = memory.getName(); + + if (waitingRetrieval.contains(memoryName)) { + return; + } + waitingRetrieval.add(memoryName); + + String memoryPath = String.format("%s:memories:%s", mindName, memoryName); + Map memoryDict; + try { + memoryDict = commands.hgetall(memoryPath).get(); + String owner = memoryDict.get("owner"); + + if (owner != "") { + CompletableFuture event = new CompletableFuture(); + waitingRequestEvents.put(memoryName, event); + requestMemory(memoryName, owner); + + try { + if (!event.get((long) (requestTimeout * 1000), TimeUnit.MILLISECONDS)) { + sendMemory(memory); + } + } catch (Exception e) { + // sendMemory(memory); + System.out.println(mindName + ":" + nodeName + " Memory request error"); + } + + memoryDict = commands.hgetall(memoryPath).get(); + } + + memory.setEvaluation(Double.parseDouble(memoryDict.get("evaluation"))); + memory.setId(Long.parseLong(memoryDict.get("id"))); + + String infoJSON = memoryDict.get("I"); + Object info = gson.fromJson(infoJSON, Object.class); + memory.setI(info); + + Long timestamp = memory.getTimestamp(); + lastUpdate.put(memoryName, timestamp); + + waitingRetrieval.remove(memoryName); + } catch (Exception e) { + // TODO handle exception + e.printStackTrace(); + } + } + + private void requestMemory(String memoryName, String ownerName) { + System.out.println(mindName + ":" + nodeName + " Request memory: " + memoryName); + + String requestAddr = String.format("%s:nodes:%s:transfer_memory", mindName, ownerName); + + HashMap requestDict = new HashMap(); + requestDict.put("memory_name", memoryName); + requestDict.put("node", nodeName); + String request = gson.toJson(requestDict); + + commands.publish(requestAddr, request); } private void handlerNotifyTransfer(String message) { + System.out.println(mindName + ":" + nodeName + " Transfer done: " + message); + + String memoryName = message; + + CompletableFuture event = waitingRequestEvents.get(memoryName); + if (event != null) { + event.complete(true); + waitingRequestEvents.remove(memoryName); + } } + private void handlerTransferMemory(String message) { + System.out.println(mindName + ":" + nodeName + " Transfer memory: " + message); + + Map request = gson.fromJson(message, HashMap.class); + + String memoryName = request.get("memory_name"); + String requestingNode = request.get("node"); + + Memory memory; + WeakReference memoryReference = memories.get(memoryName); + if (memoryReference == null || memoryReference.get() == null) { + memory = new MemoryObject(); + memory.setName(memoryName); + } else { + memory = memoryReference.get(); + } + + sendMemory(memory); + + String responseAddr = String.format("%s:nodes:%s:transfer_done", mindName, requestingNode); + commands.publish(responseAddr, memoryName); + } - private void subscribe(String channel, Consumer handler) - { + private void subscribe(String channel, Consumer handler) { listeners.put(channel, handler); pubsubConnection.async().subscribe(channel); } - private void unsubscribe(String channel) - { + private void unsubscribe(String channel) { pubsubConnection.async().unsubscribe(channel); listeners.remove(channel); } From e8d0412696fb46570c0c2381fa6aaeeeea5dda61 Mon Sep 17 00:00:00 2001 From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com> Date: Thu, 28 Nov 2024 13:28:41 -0300 Subject: [PATCH 03/21] Fix MS problems - Node name is equal to Python version - Fix request type --- .../cst/memorystorage/MemoryStorage.java | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/src/main/java/br/unicamp/cst/memorystorage/MemoryStorage.java b/src/main/java/br/unicamp/cst/memorystorage/MemoryStorage.java index 6f939aa7..2414cb7a 100644 --- a/src/main/java/br/unicamp/cst/memorystorage/MemoryStorage.java +++ b/src/main/java/br/unicamp/cst/memorystorage/MemoryStorage.java @@ -6,6 +6,7 @@ import br.unicamp.cst.core.entities.Mind; import java.lang.ref.WeakReference; +import java.lang.reflect.Type; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -18,6 +19,7 @@ import org.slf4j.LoggerFactory; import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; import ch.qos.logback.classic.LoggerContext; import io.lettuce.core.RedisClient; import io.lettuce.core.RedisFuture; @@ -143,13 +145,15 @@ public MemoryStorage(Mind mind, String nodeName, String mindName, double request String baseName = nodeName; String mindNodesPath = String.format("%s:nodes", mindName); - RedisFuture isMemberFuture = commands.sismember(mindNodesPath, nodeName); - int nodeNumber = 2; try { - while (isMemberFuture.get()) { - nodeName = baseName + Integer.toString(nodeNumber); - nodeNumber += 1; - isMemberFuture = commands.sismember(mindNodesPath, nodeName); + if(commands.sismember(mindNodesPath, nodeName).get()) + { + Long nodeNumber = commands.scard(mindNodesPath).get(); + nodeName = baseName + Long.toString(nodeNumber); + while (commands.sismember(mindNodesPath, nodeName).get()) { + nodeNumber += 1; + nodeName = baseName+Long.toString(nodeNumber); + } } } catch (Exception e) { // TODO: handle exception @@ -397,7 +401,8 @@ private void handlerNotifyTransfer(String message) { private void handlerTransferMemory(String message) { System.out.println(mindName + ":" + nodeName + " Transfer memory: " + message); - Map request = gson.fromJson(message, HashMap.class); + Type type = new TypeToken>(){}.getType(); + Map request = gson.fromJson(message, type); String memoryName = request.get("memory_name"); String requestingNode = request.get("node"); From 125c40ccdc7588a7c3333bc10bd5a3e5caa17c74 Mon Sep 17 00:00:00 2001 From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com> Date: Wed, 4 Dec 2024 17:00:14 -0300 Subject: [PATCH 04/21] LamportTime class --- .../cst/memorystorage/LamportTime.java | 74 +++++++++++++++++++ .../cst/memorystorage/LogicalTime.java | 21 ++++++ .../cst/memorystorage/LamportTimeTest.java | 57 ++++++++++++++ 3 files changed, 152 insertions(+) create mode 100644 src/main/java/br/unicamp/cst/memorystorage/LamportTime.java create mode 100644 src/main/java/br/unicamp/cst/memorystorage/LogicalTime.java create mode 100644 src/test/java/br/unicamp/cst/memorystorage/LamportTimeTest.java diff --git a/src/main/java/br/unicamp/cst/memorystorage/LamportTime.java b/src/main/java/br/unicamp/cst/memorystorage/LamportTime.java new file mode 100644 index 00000000..d71dec77 --- /dev/null +++ b/src/main/java/br/unicamp/cst/memorystorage/LamportTime.java @@ -0,0 +1,74 @@ +package br.unicamp.cst.memorystorage; + +public class LamportTime implements LogicalTime { + + private int time; + + public LamportTime(int initialTime) + { + this.time = initialTime; + } + + public LamportTime() + { + this(0); + } + + public int getTime() + { + return this.time; + } + + @Override + public LamportTime increment() { + return new LamportTime(this.time+1); + } + + public static LamportTime fromString(String string) + { + return new LamportTime(Integer.parseInt(string)); + } + + public String toString() + { + return Integer.toString(time); + } + + + public static LamportTime synchronize(LogicalTime time0, LogicalTime time1) { + if(!(LogicalTime.class.isInstance(time0) && LogicalTime.class.isInstance(time1))){ + throw new IllegalArgumentException("LamportTime can only synchonize LamportTime instances"); + } + + LamportTime lamportTime0 = (LamportTime) time0; + LamportTime lamportTime1 = (LamportTime) time1; + + int newTime = 0; + if(time0.lessThan(time1)){ + newTime = lamportTime1.getTime(); + } + else{ + newTime = lamportTime0.getTime(); + } + + newTime += 1; + + return new LamportTime(newTime); + + } + + @Override + public boolean lessThan(Object o) { + LamportTime otherTime = (LamportTime) o; + + return this.time < otherTime.getTime(); + } + + @Override + public boolean equals(Object o) + { + LamportTime otherTime = (LamportTime) o; + return this.time == otherTime.getTime(); + } + +} diff --git a/src/main/java/br/unicamp/cst/memorystorage/LogicalTime.java b/src/main/java/br/unicamp/cst/memorystorage/LogicalTime.java new file mode 100644 index 00000000..644fb0a6 --- /dev/null +++ b/src/main/java/br/unicamp/cst/memorystorage/LogicalTime.java @@ -0,0 +1,21 @@ +package br.unicamp.cst.memorystorage; +public interface LogicalTime { + public abstract LogicalTime increment(); + + public static LogicalTime fromString(String string) + { + throw new IllegalStateException("fromString not implemented in the subclass"); + } + + @Override + public abstract String toString(); + + public static LogicalTime synchronize(LogicalTime time0, LogicalTime time1) + { + throw new IllegalStateException("synchronize not implemented in the subclass"); + } + + @Override + public abstract boolean equals(Object o); + public abstract boolean lessThan(Object o); +} diff --git a/src/test/java/br/unicamp/cst/memorystorage/LamportTimeTest.java b/src/test/java/br/unicamp/cst/memorystorage/LamportTimeTest.java new file mode 100644 index 00000000..e832882c --- /dev/null +++ b/src/test/java/br/unicamp/cst/memorystorage/LamportTimeTest.java @@ -0,0 +1,57 @@ +package br.unicamp.cst.memorystorage; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import org.junit.jupiter.api.Test; + +public class LamportTimeTest { + + @Test + public void initialTimeTest() + { + LamportTime time0 = new LamportTime(123); + + assertEquals(123, time0.getTime()); + } + + @Test + public void stringTest() + { + LamportTime time0 = new LamportTime(456); + + assertEquals("456", time0.toString()); + } + + @Test + public void fromStringTest() + { + LamportTime time0 = new LamportTime(987); + + assertTrue(time0.equals(LamportTime.fromString(time0.toString()))); + } + + @Test + public void incrementTest() + { + LamportTime time0 = new LamportTime(); + int time0Time = time0.getTime(); + + LamportTime time1 = time0.increment(); + + assertEquals(time0Time, time0.getTime()); + assertEquals(time0Time+1, time1.getTime()); + } + + @Test + public void synchronizeTest() + { + LamportTime time0 = new LamportTime(-10); + LamportTime time1 = new LamportTime(55); + + LamportTime timeS = LamportTime.synchronize(time0, time1); + + assertTrue(time0.lessThan(timeS)); + assertTrue(time1.lessThan(timeS)); + assertEquals(56, timeS.getTime()); + } +} From 3018144eac7fabc01c2d0a59f8ade36334b6e1a2 Mon Sep 17 00:00:00 2001 From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com> Date: Wed, 4 Dec 2024 17:28:34 -0300 Subject: [PATCH 05/21] MemoryEncoder class --- .../cst/memorystorage/MemoryEncoder.java | 39 +++++++++++++ .../cst/memorystorage/MemoryEncoderTest.java | 57 +++++++++++++++++++ 2 files changed, 96 insertions(+) create mode 100644 src/main/java/br/unicamp/cst/memorystorage/MemoryEncoder.java create mode 100644 src/test/java/br/unicamp/cst/memorystorage/MemoryEncoderTest.java diff --git a/src/main/java/br/unicamp/cst/memorystorage/MemoryEncoder.java b/src/main/java/br/unicamp/cst/memorystorage/MemoryEncoder.java new file mode 100644 index 00000000..142869ba --- /dev/null +++ b/src/main/java/br/unicamp/cst/memorystorage/MemoryEncoder.java @@ -0,0 +1,39 @@ +package br.unicamp.cst.memorystorage; + +import java.util.HashMap; +import java.util.Map; + +import com.google.gson.Gson; + +import br.unicamp.cst.core.entities.Memory; + +public class MemoryEncoder { + static Gson gson = new Gson(); + + private MemoryEncoder() { + + } + + public static Map toDict(Memory memory) { + Map data = new HashMap<>(); + + data.put("name", memory.getName()); + data.put("evaluation", memory.getEvaluation().toString()); + data.put("id", memory.getId().toString()); + data.put("I", gson.toJson(memory.getI())); + data.put("timestamp", memory.getTimestamp().toString()); + + return data; + } + + + public static void loadMemory(Memory memory, Map memoryDict) { + memory.setEvaluation(Double.parseDouble(memoryDict.get("evaluation"))); + memory.setId(Long.parseLong(memoryDict.get("id"))); + + String infoJSON = memoryDict.get("I"); + Object info = gson.fromJson(infoJSON, Object.class); + memory.setI(info); + } + +} diff --git a/src/test/java/br/unicamp/cst/memorystorage/MemoryEncoderTest.java b/src/test/java/br/unicamp/cst/memorystorage/MemoryEncoderTest.java new file mode 100644 index 00000000..c7a5a1ad --- /dev/null +++ b/src/test/java/br/unicamp/cst/memorystorage/MemoryEncoderTest.java @@ -0,0 +1,57 @@ +package br.unicamp.cst.memorystorage; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Map; +import java.util.HashMap; +import java.util.List; + +import org.junit.jupiter.api.Test; + +import br.unicamp.cst.core.entities.Memory; +import br.unicamp.cst.core.entities.MemoryObject; + +public class MemoryEncoderTest { + + @Test + public void toDictTest() + { + MemoryObject memory = new MemoryObject(); + memory.setName("MemoryName"); + memory.setI(123.456); + memory.setId(123l); + memory.setEvaluation(0.5); + + Map memoryDict = MemoryEncoder.toDict(memory); + + assertEquals("123.456", memoryDict.get("I")); + assertEquals(memory.getTimestamp().toString(), memoryDict.get("timestamp")); + assertEquals(Float.toString(0.5f), memoryDict.get("evaluation")); + assertEquals("MemoryName", memoryDict.get("name")); + assertEquals(memory.getId().toString(), memoryDict.get("id")); + } + + @Test + public void loadMemoryTest() + { + Memory memory = new MemoryObject(); + Map memoryDict = new HashMap(); + memoryDict.put("evaluation", "0.5"); + memoryDict.put("id", "123"); + memoryDict.put("I", "[5, 3, 4]"); + + MemoryEncoder.loadMemory(memory, memoryDict); + + assertEquals(0.5, memory.getEvaluation()); + assertEquals(123, memory.getId()); + + List info = (List) memory.getI(); + + assertEquals(3, info.size()); + assertEquals(5, info.get(0)); + assertEquals(3, info.get(1)); + assertEquals(4, info.get(2)); + + } +} From 00e39326c3b3613a943cfc1ac2cbbe836bdd6f80 Mon Sep 17 00:00:00 2001 From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com> Date: Wed, 4 Dec 2024 20:05:36 -0300 Subject: [PATCH 06/21] MS - Com Logical time e testes --- .../cst/memorystorage/MemoryEncoder.java | 17 +- ...Storage.java => MemoryStorageCodelet.java} | 227 ++++++++---------- .../cst/memorystorage/MemoryEncoderTest.java | 1 - .../cst/memorystorage/MemoryStorageTest.java | 227 ++++++++++++++++++ 4 files changed, 338 insertions(+), 134 deletions(-) rename src/main/java/br/unicamp/cst/memorystorage/{MemoryStorage.java => MemoryStorageCodelet.java} (64%) create mode 100644 src/test/java/br/unicamp/cst/memorystorage/MemoryStorageTest.java diff --git a/src/main/java/br/unicamp/cst/memorystorage/MemoryEncoder.java b/src/main/java/br/unicamp/cst/memorystorage/MemoryEncoder.java index 142869ba..77181282 100644 --- a/src/main/java/br/unicamp/cst/memorystorage/MemoryEncoder.java +++ b/src/main/java/br/unicamp/cst/memorystorage/MemoryEncoder.java @@ -4,11 +4,13 @@ import java.util.Map; import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.ToNumberPolicy; import br.unicamp.cst.core.entities.Memory; public class MemoryEncoder { - static Gson gson = new Gson(); + static Gson gson = new GsonBuilder().setObjectToNumberStrategy(ToNumberPolicy.LONG_OR_DOUBLE).create(); private MemoryEncoder() { @@ -20,8 +22,17 @@ public static Map toDict(Memory memory) { data.put("name", memory.getName()); data.put("evaluation", memory.getEvaluation().toString()); data.put("id", memory.getId().toString()); - data.put("I", gson.toJson(memory.getI())); - data.put("timestamp", memory.getTimestamp().toString()); + data.put("timestamp", memory.getTimestamp().toString()); + + Object info = memory.getI(); + if(String.class.isInstance(info)) + { + data.put("I", (String) info); + } + else + { + data.put("I", gson.toJson(info)); + } return data; } diff --git a/src/main/java/br/unicamp/cst/memorystorage/MemoryStorage.java b/src/main/java/br/unicamp/cst/memorystorage/MemoryStorageCodelet.java similarity index 64% rename from src/main/java/br/unicamp/cst/memorystorage/MemoryStorage.java rename to src/main/java/br/unicamp/cst/memorystorage/MemoryStorageCodelet.java index 2414cb7a..860f71ef 100644 --- a/src/main/java/br/unicamp/cst/memorystorage/MemoryStorage.java +++ b/src/main/java/br/unicamp/cst/memorystorage/MemoryStorageCodelet.java @@ -13,7 +13,7 @@ import java.util.Set; import java.util.function.Consumer; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -29,7 +29,7 @@ import io.lettuce.core.pubsub.RedisPubSubListener; import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; -public class MemoryStorage extends Codelet { +public class MemoryStorageCodelet extends Codelet { private RedisClient redisClient; private StatefulRedisConnection connection; private RedisAsyncCommands commands; @@ -38,99 +38,38 @@ public class MemoryStorage extends Codelet { private Mind mind; private String nodeName; + + public String getNodeName() { + return nodeName; + } + private String mindName; private double requestTimeout; private HashMap> memories; private HashMap lastUpdate; + private HashMap memoryLogicalTime; private HashSet waitingRetrieval; private HashMap> waitingRequestEvents; - private Executor retrieveExecutor; + private ExecutorService retrieveExecutor; + private LamportTime currentTime; private Gson gson; - public static void main(String args[]) { - LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory(); - for (ch.qos.logback.classic.Logger logger : loggerContext.getLoggerList()) { - logger.setLevel(ch.qos.logback.classic.Level.ERROR); - } - - RedisClient redisClient = RedisClient.create("redis://localhost"); - StatefulRedisConnection connection = redisClient.connect(); - RedisAsyncCommands commands = connection.async(); - try { - commands.flushall().get(); - } catch (Exception e) { - // TODO: handle exception - e.printStackTrace(); - } - - String mindName = "default_mind"; - - try { - Mind mind = new Mind(); - MemoryObject memory1 = mind.createMemoryObject("Memory1", ""); - String nodeName = "node0"; - - MemoryStorage ms = new MemoryStorage(mind, nodeName, mindName, 500.0e-3); - ms.timeStep = 100; - - mind.insertCodelet(ms); - mind.start(); - - Thread.sleep(1000); - - Mind mind2 = new Mind(); - MemoryObject mind2_memory1 = mind2.createMemoryObject("Memory1", ""); - MemoryStorage mind2_ms = new MemoryStorage(mind2, nodeName, mindName, 500.0e-3); - mind2_ms.timeStep = 100; - - mind2.insertCodelet(mind2_ms); - mind2.start(); - - System.out.println(memory1); - System.out.println(mind2_memory1); - - Thread.sleep(1000); - - memory1.setI("INFO"); - - System.out.println(); - System.out.println(memory1); - System.out.println(mind2_memory1); - - Thread.sleep(1000); - - System.out.println(); - System.out.println(memory1); - System.out.println(mind2_memory1); - - long lastTimestamp = memory1.getTimestamp(); - - while (true) { - if (lastTimestamp != memory1.getTimestamp()) { - System.out.println(); - System.out.println(memory1); - - lastTimestamp = memory1.getTimestamp(); - } - } - - // commands.flushall().get(); - - } catch (Exception e) { - // TODO: handle exception - e.printStackTrace(); - } + public MemoryStorageCodelet(Mind mind) + { + this(mind, "node", "default_mind", 500.0e-3); } - public MemoryStorage(Mind mind, String nodeName, String mindName, double requestTimeout) { - listeners = new HashMap>(); - memories = new HashMap>(); - lastUpdate = new HashMap(); - waitingRetrieval = new HashSet(); - waitingRequestEvents = new HashMap>(); + public MemoryStorageCodelet(Mind mind, String nodeName, String mindName, double requestTimeout) { + listeners = new HashMap<>(); + memories = new HashMap<>(); + lastUpdate = new HashMap<>(); + memoryLogicalTime = new HashMap<>(); + waitingRetrieval = new HashSet<>(); + waitingRequestEvents = new HashMap<>(); retrieveExecutor = Executors.newFixedThreadPool(3); + currentTime = new LamportTime(); gson = new Gson(); this.mind = mind; @@ -146,11 +85,14 @@ public MemoryStorage(Mind mind, String nodeName, String mindName, double request String mindNodesPath = String.format("%s:nodes", mindName); try { - if(commands.sismember(mindNodesPath, nodeName).get()) + boolean isMemberResult = commands.sismember(mindNodesPath, nodeName).get(); + if(isMemberResult) { Long nodeNumber = commands.scard(mindNodesPath).get(); nodeName = baseName + Long.toString(nodeNumber); - while (commands.sismember(mindNodesPath, nodeName).get()) { + + isMemberResult = commands.sismember(mindNodesPath, nodeName).get(); + while (isMemberResult) { nodeNumber += 1; nodeName = baseName+Long.toString(nodeNumber); } @@ -166,8 +108,8 @@ public MemoryStorage(Mind mind, String nodeName, String mindName, double request RedisPubSubListener listener = new RedisPubSubAdapter() { @Override - public void message(String received_channel, String message) { - Consumer listener = MemoryStorage.this.listeners.get(received_channel); + public void message(String receivedChannel, String message) { + Consumer listener = MemoryStorageCodelet.this.listeners.get(receivedChannel); if (listener != null) { listener.accept(message); @@ -177,9 +119,9 @@ public void message(String received_channel, String message) { pubsubConnection.addListener(listener); - Consumer handlerTransferMemory = message -> this.handlerTransferMemory(message); + Consumer handlerTransferMemory = this::handlerTransferMemory; subscribe(String.format("%s:nodes:%s:transfer_memory", mindName, nodeName), handlerTransferMemory); - Consumer handlerNotifyTransfer = message -> this.handlerNotifyTransfer(message); + Consumer handlerNotifyTransfer = this::handlerNotifyTransfer; subscribe(String.format("%s:nodes:%s:transfer_done", mindName, nodeName), handlerNotifyTransfer); } @@ -193,11 +135,11 @@ public void calculateActivation() { @Override public void proc() { - HashMap mindMemories = new HashMap(); - HashSet mindMemoriesNames = new HashSet(); + HashMap mindMemories = new HashMap<>(); + HashSet mindMemoriesNames = new HashSet<>(); for (Memory memory : mind.getRawMemory().getAllMemoryObjects()) { String memoryName = memory.getName(); - if (memoryName != "") { + if (!memoryName.equals("")) { mindMemories.put(memoryName, memory); mindMemoriesNames.add(memoryName); } @@ -219,14 +161,16 @@ public void proc() { retrieveMemory(memory); }); } else { - Map impostor = new HashMap(); + Map impostor = new HashMap<>(); impostor.put("name", memoryName); impostor.put("evaluation", "0.0"); impostor.put("I", ""); impostor.put("id", "0"); impostor.put("owner", nodeName); + impostor.put("logical_time", currentTime.toString()); commands.hset(memoryPath, impostor); + currentTime = currentTime.increment(); } } catch (Exception e) { // TODO: handle exception @@ -243,6 +187,7 @@ public void proc() { for (String memoryName : toUpdate) { if (!memories.containsKey(memoryName)) { lastUpdate.remove(memoryName); + memoryLogicalTime.remove(memoryName); continue; } Memory memory = memories.get(memoryName).get(); @@ -253,14 +198,13 @@ public void proc() { } if (memory.getTimestamp() > lastUpdate.get(memoryName)) { + memoryLogicalTime.put(memoryName, currentTime); updateMemory(memoryName); } } } private void updateMemory(String memoryName) { - System.out.println(mindName + ":" + nodeName + " Update memory: " + memoryName); - String memoryUpdatePath = String.format("%s:memories:%s:update", mindName, memoryName); if (!memories.containsKey(memoryName)) { @@ -276,19 +220,21 @@ private void updateMemory(String memoryName) { try { String memoryPath = String.format("%s:memories:%s", mindName, memoryName); - Long timestamp = Long.parseLong(commands.hget(memoryPath, "timestamp").get()); + String messageTimeStr = commands.hget(memoryPath, "logical_time").get(); + LamportTime messageTime = LamportTime.fromString(messageTimeStr); + + LamportTime memoryTime = memoryLogicalTime.get(memoryName); - Long memoryTimestamp = memory.getTimestamp(); - if (memoryTimestamp < timestamp) { + if (memoryTime.lessThan(messageTime)) { retrieveExecutor.execute(() -> { retrieveMemory(memory); }); - } else if (memoryTimestamp > timestamp) { + } else if (messageTime.lessThan(memoryTime)) { sendMemory(memory); } - lastUpdate.put(memoryName, memoryTimestamp); + lastUpdate.put(memoryName, memory.getTimestamp()); } catch (Exception e) { // TODO: handle exception System.err.println("Memory update error!"); @@ -297,21 +243,11 @@ private void updateMemory(String memoryName) { } private void sendMemory(Memory memory) { - System.out.println(mindName + ":" + nodeName + " Send memory: " + memory.getName()); - String memoryName = memory.getName(); - Long memoryTimestamp = memory.getTimestamp(); - - HashMap memoryDict = new HashMap(); - memoryDict.put("timestamp", memoryTimestamp.toString()); - memoryDict.put("evaluation", memory.getEvaluation().toString()); - memoryDict.put("name", memoryName); - memoryDict.put("id", memory.getId().toString()); + Map memoryDict = MemoryEncoder.toDict(memory); memoryDict.put("owner", ""); - - Object info = memory.getI(); - memoryDict.put("I", gson.toJson(info)); + memoryDict.put("logical_time", memoryLogicalTime.get(memoryName).toString()); String memoryPath = String.format("%s:memories:%s", mindName, memoryName); commands.hset(memoryPath, memoryDict); @@ -319,12 +255,10 @@ private void sendMemory(Memory memory) { String memoryUpdatePath = memoryPath + ":update"; commands.publish(memoryUpdatePath, ""); - lastUpdate.put(memoryName, memoryTimestamp); + currentTime = currentTime.increment(); } private void retrieveMemory(Memory memory) { - System.out.println(mindName + ":" + nodeName + " Retrieve memory: " + memory.getName()); - String memoryName = memory.getName(); if (waitingRetrieval.contains(memoryName)) { @@ -349,21 +283,19 @@ private void retrieveMemory(Memory memory) { } } catch (Exception e) { // sendMemory(memory); - System.out.println(mindName + ":" + nodeName + " Memory request error"); } memoryDict = commands.hgetall(memoryPath).get(); } - memory.setEvaluation(Double.parseDouble(memoryDict.get("evaluation"))); - memory.setId(Long.parseLong(memoryDict.get("id"))); - - String infoJSON = memoryDict.get("I"); - Object info = gson.fromJson(infoJSON, Object.class); - memory.setI(info); + MemoryEncoder.loadMemory(memory, memoryDict); + LamportTime messageTime = LamportTime.fromString(memoryDict.get("logical_time")); + currentTime = LamportTime.synchronize(messageTime, currentTime); + Long timestamp = memory.getTimestamp(); lastUpdate.put(memoryName, timestamp); + memoryLogicalTime.put(memoryName, messageTime); waitingRetrieval.remove(memoryName); } catch (Exception e) { @@ -373,22 +305,32 @@ private void retrieveMemory(Memory memory) { } private void requestMemory(String memoryName, String ownerName) { - System.out.println(mindName + ":" + nodeName + " Request memory: " + memoryName); - String requestAddr = String.format("%s:nodes:%s:transfer_memory", mindName, ownerName); HashMap requestDict = new HashMap(); requestDict.put("memory_name", memoryName); requestDict.put("node", nodeName); - String request = gson.toJson(requestDict); + + Map fullRequestDict = new HashMap<>(); + fullRequestDict.put("request", requestDict); + fullRequestDict.put("logical_time", currentTime.toString()); + + String request = gson.toJson(fullRequestDict); commands.publish(requestAddr, request); } private void handlerNotifyTransfer(String message) { - System.out.println(mindName + ":" + nodeName + " Transfer done: " + message); + Type type = new TypeToken>(){}.getType(); + Map data = gson.fromJson(message, type); + + if(data.containsKey("logical_time")) + { + LamportTime messageTime = LamportTime.fromString(data.get("logical_time")); + currentTime = LamportTime.synchronize(messageTime, currentTime); + } - String memoryName = message; + String memoryName = data.get("memory_name"); CompletableFuture event = waitingRequestEvents.get(memoryName); if (event != null) { @@ -399,10 +341,16 @@ private void handlerNotifyTransfer(String message) { } private void handlerTransferMemory(String message) { - System.out.println(mindName + ":" + nodeName + " Transfer memory: " + message); + Type type = new TypeToken>(){}.getType(); + Map data = gson.fromJson(message, type); - Type type = new TypeToken>(){}.getType(); - Map request = gson.fromJson(message, type); + if(data.containsKey("logical_time")) + { + LamportTime messageTime = LamportTime.fromString((String) data.get("logical_time")); + currentTime = LamportTime.synchronize(messageTime, currentTime); + } + + Map request = (Map) data.get("request"); String memoryName = request.get("memory_name"); String requestingNode = request.get("node"); @@ -416,10 +364,17 @@ private void handlerTransferMemory(String message) { memory = memoryReference.get(); } + memoryLogicalTime.put(memoryName, currentTime); + sendMemory(memory); + Map response = new HashMap<>(); + response.put("memory_name", memoryName); + response.put("logical_time", currentTime.toString()); + String responseString = gson.toJson(response); + String responseAddr = String.format("%s:nodes:%s:transfer_done", mindName, requestingNode); - commands.publish(responseAddr, memoryName); + commands.publish(responseAddr, responseString); } private void subscribe(String channel, Consumer handler) { @@ -432,8 +387,20 @@ private void unsubscribe(String channel) { listeners.remove(channel); } + @Override + public void stop() + { + pubsubConnection.close(); + retrieveExecutor.shutdownNow(); + redisClient.shutdown(); + + super.stop(); + } + @Override protected void finalize() { + pubsubConnection.close(); + retrieveExecutor.shutdownNow(); redisClient.shutdown(); } diff --git a/src/test/java/br/unicamp/cst/memorystorage/MemoryEncoderTest.java b/src/test/java/br/unicamp/cst/memorystorage/MemoryEncoderTest.java index c7a5a1ad..34cc7839 100644 --- a/src/test/java/br/unicamp/cst/memorystorage/MemoryEncoderTest.java +++ b/src/test/java/br/unicamp/cst/memorystorage/MemoryEncoderTest.java @@ -1,7 +1,6 @@ package br.unicamp.cst.memorystorage; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.Map; import java.util.HashMap; diff --git a/src/test/java/br/unicamp/cst/memorystorage/MemoryStorageTest.java b/src/test/java/br/unicamp/cst/memorystorage/MemoryStorageTest.java new file mode 100644 index 00000000..5a9a756f --- /dev/null +++ b/src/test/java/br/unicamp/cst/memorystorage/MemoryStorageTest.java @@ -0,0 +1,227 @@ +package br.unicamp.cst.memorystorage; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assumptions.assumeTrue; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Set; +import java.util.Map; +import java.util.concurrent.ExecutionException; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.platform.commons.function.Try; + +import br.unicamp.cst.core.entities.Memory; +import br.unicamp.cst.core.entities.Mind; +import io.lettuce.core.RedisClient; +import io.lettuce.core.RedisConnectionException; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.api.async.RedisAsyncCommands; + +public class MemoryStorageTest { + + private static RedisClient client; + private static StatefulRedisConnection connection; + private static RedisAsyncCommands commands; + + private Mind mind; + private Mind mind2; + + private List startTimes; + private long sleepTime; + + @BeforeAll + public static void initAll() throws Exception { + client = RedisClient.create("redis://localhost"); + + try { + connection = client.connect(); + } catch (RedisConnectionException e) { + assumeTrue(false); + } + + commands = connection.async(); + } + + @BeforeEach + public void init() throws Exception { + commands.flushall().get(); + + startTimes = new ArrayList<>(); + startTimes.add(0d); + startTimes.add(1e3); + + sleepTime = (long) (0.75 * 1000); + + mind = new Mind(); + mind2 = new Mind(); + } + + @AfterEach + public void tearDown() throws Exception { + mind.shutDown(); + mind2.shutDown(); + + commands.flushall().get(); + } + + @Test + public void nodeEnterTest() throws Exception { + MemoryStorageCodelet msCodelet = new MemoryStorageCodelet(mind); + msCodelet.setTimeStep(50); + mind.insertCodelet(msCodelet); + mind.start(); + + Thread.sleep(sleepTime); + + assertEquals("node", msCodelet.getNodeName()); + + Set members = commands.smembers("default_mind:nodes").get(); + assertEquals(1, members.size()); + assertTrue(members.contains("node")); + + MemoryStorageCodelet msCodelet2 = new MemoryStorageCodelet(mind2); + msCodelet.setTimeStep(50); + mind2.insertCodelet(msCodelet2); + mind2.start(); + + Thread.sleep(sleepTime); + + assertEquals("node1", msCodelet2.getNodeName()); + + members = commands.smembers("default_mind:nodes").get(); + assertEquals(2, members.size()); + assertTrue(members.contains("node")); + assertTrue(members.contains("node1")); + } + + public void redisArgsTest() { + // TODO + } + + @Test + public void memoryTransferTest() throws Exception { + Memory memory1 = mind.createMemoryObject("Memory1", "INFO"); + + MemoryStorageCodelet msCodelet = new MemoryStorageCodelet(mind); + msCodelet.setTimeStep(50); + mind.insertCodelet(msCodelet); + mind.start(); + + Thread.sleep(sleepTime); + + assertTrue(commands.exists("default_mind:memories:Memory1").get() >= 1); + Map result = commands.hgetall("default_mind:memories:Memory1").get(); + + Map expectedResult = new HashMap<>(); + expectedResult.put("name", "Memory1"); + expectedResult.put("evaluation", "0.0"); + expectedResult.put("I", ""); + expectedResult.put("id", "0"); + expectedResult.put("owner", "node"); + expectedResult.put("logical_time", "0"); + + assertEquals(expectedResult, result); + + String request = "{'request':{'memory_name':'Memory1', 'node':'node1'}, 'logical_time':'0'}"; + commands.publish("default_mind:nodes:node:transfer_memory", request); + + Thread.sleep(sleepTime); + + result = commands.hgetall("default_mind:memories:Memory1").get(); + + expectedResult = new HashMap<>(); + expectedResult.put("name", "Memory1"); + expectedResult.put("evaluation", "0.0"); + expectedResult.put("I", "INFO"); + expectedResult.put("id", "0"); + expectedResult.put("owner", ""); + + result.remove("logical_time"); + result.remove("timestamp"); + + assertEquals(expectedResult, result); + } + + @Test + public void msTest() throws Exception + { + Memory memory1 = mind.createMemoryObject("Memory1", ""); + + MemoryStorageCodelet msCodelet = new MemoryStorageCodelet(mind); + msCodelet.setTimeStep(50); + mind.insertCodelet(msCodelet); + mind.start(); + + assertEquals("", memory1.getI()); + + int[] info = {1, 1, 1}; + memory1.setI(info); + + Thread.sleep(sleepTime); + + Memory mind2Memory1 = mind2.createMemoryObject("Memory1", ""); + + MemoryStorageCodelet msCodelet2 = new MemoryStorageCodelet(mind2); + msCodelet.setTimeStep(50); + mind2.insertCodelet(msCodelet2); + mind2.start(); + + assertEquals("", mind2Memory1.getI()); + + Thread.sleep(sleepTime); + + for(int i = 0; i<3; i++) + { + assertEquals(info[i], ((int[]) memory1.getI())[i]); + assertEquals(info[i], Math.toIntExact((long)((ArrayList) mind2Memory1.getI()).get(i))); + } + + + Map result = commands.hgetall("default_mind:memories:Memory1").get(); + + Map expectedResult = new HashMap<>(); + expectedResult.put("name", "Memory1"); + expectedResult.put("evaluation", "0.0"); + expectedResult.put("I", "[1,1,1]"); + expectedResult.put("id", "0"); + expectedResult.put("owner", ""); + + assertTrue(result.containsKey("logical_time")); + assertTrue(result.containsKey("timestamp")); + result.remove("logical_time"); + result.remove("timestamp"); + assertEquals(result, expectedResult); + + memory1.setI("INFO"); + Thread.sleep(sleepTime); + + assertEquals("INFO", memory1.getI()); + assertEquals("INFO", mind2Memory1.getI()); + + mind2Memory1.setI("INFO2"); + Thread.sleep(sleepTime); + + assertEquals("INFO2", memory1.getI()); + assertEquals("INFO2", mind2Memory1.getI()); + + memory1.setI(1); + Thread.sleep(sleepTime); + + assertEquals(1, memory1.getI()); + assertEquals(1, Math.toIntExact((long) mind2Memory1.getI())); + + memory1.setI(true); + Thread.sleep(sleepTime); + + assertEquals(true, memory1.getI()); + assertEquals(true, mind2Memory1.getI()); + } + +} From 129a2717bd9745825a17ef12019d423adc3b44fa Mon Sep 17 00:00:00 2001 From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com> Date: Thu, 5 Dec 2024 14:25:06 -0300 Subject: [PATCH 07/21] Redis args --- .../memorystorage/MemoryStorageCodelet.java | 21 ++++++++++++++++--- .../cst/memorystorage/MemoryStorageTest.java | 19 +++++++++++++++-- 2 files changed, 35 insertions(+), 5 deletions(-) diff --git a/src/main/java/br/unicamp/cst/memorystorage/MemoryStorageCodelet.java b/src/main/java/br/unicamp/cst/memorystorage/MemoryStorageCodelet.java index 860f71ef..be11adca 100644 --- a/src/main/java/br/unicamp/cst/memorystorage/MemoryStorageCodelet.java +++ b/src/main/java/br/unicamp/cst/memorystorage/MemoryStorageCodelet.java @@ -58,10 +58,25 @@ public String getNodeName() { public MemoryStorageCodelet(Mind mind) { - this(mind, "node", "default_mind", 500.0e-3); + this(mind, "node", "default_mind", 500.0e-3, RedisClient.create("redis://localhost")); } - public MemoryStorageCodelet(Mind mind, String nodeName, String mindName, double requestTimeout) { + public MemoryStorageCodelet(Mind mind, RedisClient redisClient) + { + this(mind, "node", "default_mind", 500.0e-3, redisClient); + } + + public MemoryStorageCodelet(Mind mind, String mindName) + { + this(mind, "node", mindName, 500.0e-3, RedisClient.create("redis://localhost")); + } + + public MemoryStorageCodelet(Mind mind, String mindName, RedisClient redisClient) + { + this(mind, "node", mindName, 500.0e-3, redisClient); + } + + public MemoryStorageCodelet(Mind mind, String nodeName, String mindName, double requestTimeout, RedisClient redisClient) { listeners = new HashMap<>(); memories = new HashMap<>(); lastUpdate = new HashMap<>(); @@ -76,7 +91,7 @@ public MemoryStorageCodelet(Mind mind, String nodeName, String mindName, double this.mindName = mindName; this.requestTimeout = requestTimeout; - redisClient = RedisClient.create("redis://localhost"); + this.redisClient = redisClient; connection = redisClient.connect(); commands = connection.async(); pubsubConnection = redisClient.connectPubSub(); diff --git a/src/test/java/br/unicamp/cst/memorystorage/MemoryStorageTest.java b/src/test/java/br/unicamp/cst/memorystorage/MemoryStorageTest.java index 5a9a756f..18b7fe75 100644 --- a/src/test/java/br/unicamp/cst/memorystorage/MemoryStorageTest.java +++ b/src/test/java/br/unicamp/cst/memorystorage/MemoryStorageTest.java @@ -21,6 +21,7 @@ import br.unicamp.cst.core.entities.Mind; import io.lettuce.core.RedisClient; import io.lettuce.core.RedisConnectionException; +import io.lettuce.core.RedisURI; import io.lettuce.core.api.StatefulRedisConnection; import io.lettuce.core.api.async.RedisAsyncCommands; @@ -101,8 +102,22 @@ public void nodeEnterTest() throws Exception { assertTrue(members.contains("node1")); } - public void redisArgsTest() { - // TODO + @Test + public void redisArgsTest() throws Exception { + RedisURI uri = RedisURI.Builder + .redis("localhost", 6379) + .build(); + RedisClient client = RedisClient.create(uri); + + MemoryStorageCodelet msCodelet = new MemoryStorageCodelet(mind, client); + mind.insertCodelet(msCodelet); + mind.start(); + + Thread.sleep(sleepTime); + + Set members = commands.smembers("default_mind:nodes").get(); + assertEquals(1, members.size()); + assertTrue(members.contains("node")); } @Test From 3db3b3651773736f7a0c5a57067992cc4c221a9f Mon Sep 17 00:00:00 2001 From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com> Date: Thu, 5 Dec 2024 14:25:18 -0300 Subject: [PATCH 08/21] Fix tests - RawMemory id --- .../br/unicamp/cst/memorystorage/MemoryStorageTest.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/test/java/br/unicamp/cst/memorystorage/MemoryStorageTest.java b/src/test/java/br/unicamp/cst/memorystorage/MemoryStorageTest.java index 18b7fe75..05a9ac09 100644 --- a/src/test/java/br/unicamp/cst/memorystorage/MemoryStorageTest.java +++ b/src/test/java/br/unicamp/cst/memorystorage/MemoryStorageTest.java @@ -4,6 +4,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assumptions.assumeTrue; +import java.lang.reflect.Field; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -19,6 +20,7 @@ import br.unicamp.cst.core.entities.Memory; import br.unicamp.cst.core.entities.Mind; +import br.unicamp.cst.core.entities.RawMemory; import io.lettuce.core.RedisClient; import io.lettuce.core.RedisConnectionException; import io.lettuce.core.RedisURI; @@ -62,6 +64,10 @@ public void init() throws Exception { mind = new Mind(); mind2 = new Mind(); + + Field field = RawMemory.class.getDeclaredField("lastid"); + field.setAccessible(true); + field.setLong(null, 0); } @AfterEach From bb3ff1dda0068a46ea25cb6b570d570bd5ae6e54 Mon Sep 17 00:00:00 2001 From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com> Date: Thu, 5 Dec 2024 14:29:40 -0300 Subject: [PATCH 09/21] Tests CI with Redis server --- .github/workflows/java-ci.yml | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/.github/workflows/java-ci.yml b/.github/workflows/java-ci.yml index 4e0d1401..2e6a63a2 100644 --- a/.github/workflows/java-ci.yml +++ b/.github/workflows/java-ci.yml @@ -9,6 +9,7 @@ on: pull_request: branches: - master + workflow_dispatch: jobs: build: @@ -17,6 +18,22 @@ jobs: matrix: java: [ '8', '11', '17'] name: JDK ${{ matrix.Java }} build + + services: + # Label used to access the service container + redis: + # Docker Hub image + image: redis + # Set health checks to wait until redis has started + options: >- + --health-cmd "redis-cli ping" + --health-interval 10s + --health-timeout 5s + --health-retries 5 + ports: + # Maps port 6379 on service container to the host + - 6379:6379 + steps: - uses: actions/checkout@v4.1.1 - name: Set up Java From d3c2d97f92fbad41a4ef13b1b7e4cd63ae4221a5 Mon Sep 17 00:00:00 2001 From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com> Date: Thu, 5 Dec 2024 14:31:02 -0300 Subject: [PATCH 10/21] JavaCI - Remove workflow dispatch --- .github/workflows/java-ci.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/java-ci.yml b/.github/workflows/java-ci.yml index 2e6a63a2..d0e330f0 100644 --- a/.github/workflows/java-ci.yml +++ b/.github/workflows/java-ci.yml @@ -9,7 +9,6 @@ on: pull_request: branches: - master - workflow_dispatch: jobs: build: From 4a0369ce51748d030069a847947e7bc5ab4a1718 Mon Sep 17 00:00:00 2001 From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com> Date: Thu, 5 Dec 2024 15:13:15 -0300 Subject: [PATCH 11/21] Fix problems Fixed Java and Sonar problems --- .../memorystorage/MemoryStorageCodelet.java | 188 ++++++++---------- .../cst/memorystorage/MemoryStorageTest.java | 2 - 2 files changed, 88 insertions(+), 102 deletions(-) diff --git a/src/main/java/br/unicamp/cst/memorystorage/MemoryStorageCodelet.java b/src/main/java/br/unicamp/cst/memorystorage/MemoryStorageCodelet.java index be11adca..3fe00ba2 100644 --- a/src/main/java/br/unicamp/cst/memorystorage/MemoryStorageCodelet.java +++ b/src/main/java/br/unicamp/cst/memorystorage/MemoryStorageCodelet.java @@ -1,26 +1,28 @@ package br.unicamp.cst.memorystorage; -import br.unicamp.cst.core.entities.Codelet; -import br.unicamp.cst.core.entities.Memory; -import br.unicamp.cst.core.entities.MemoryObject; -import br.unicamp.cst.core.entities.Mind; - import java.lang.ref.WeakReference; import java.lang.reflect.Type; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; -import java.util.function.Consumer; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; +import java.util.logging.Level; +import java.util.logging.Logger; -import org.slf4j.LoggerFactory; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; -import ch.qos.logback.classic.LoggerContext; + +import br.unicamp.cst.core.entities.Codelet; +import br.unicamp.cst.core.entities.Memory; +import br.unicamp.cst.core.entities.MemoryObject; +import br.unicamp.cst.core.entities.Mind; import io.lettuce.core.RedisClient; import io.lettuce.core.RedisFuture; import io.lettuce.core.api.StatefulRedisConnection; @@ -30,6 +32,14 @@ import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; public class MemoryStorageCodelet extends Codelet { + private static final String LOGICAL_TIME_FIELD = "logical_time"; + private static final String OWNER_FIELD = "owner"; + private static final String MEMORY_NAME_FIELD = "memory_name"; + + private static final String MEMORY_PATH_TEMPLATE = "%s:memories:%s"; + + private static final Logger LOGGER = Logger.getLogger(MemoryStorageCodelet.class.getName()); + private RedisClient redisClient; private StatefulRedisConnection connection; private RedisAsyncCommands commands; @@ -56,27 +66,25 @@ public String getNodeName() { private LamportTime currentTime; private Gson gson; - public MemoryStorageCodelet(Mind mind) - { + public MemoryStorageCodelet(Mind mind) throws ExecutionException, InterruptedException { this(mind, "node", "default_mind", 500.0e-3, RedisClient.create("redis://localhost")); } - public MemoryStorageCodelet(Mind mind, RedisClient redisClient) - { + public MemoryStorageCodelet(Mind mind, RedisClient redisClient) throws ExecutionException, InterruptedException { this(mind, "node", "default_mind", 500.0e-3, redisClient); } - public MemoryStorageCodelet(Mind mind, String mindName) - { + public MemoryStorageCodelet(Mind mind, String mindName) throws ExecutionException, InterruptedException { this(mind, "node", mindName, 500.0e-3, RedisClient.create("redis://localhost")); } public MemoryStorageCodelet(Mind mind, String mindName, RedisClient redisClient) - { + throws ExecutionException, InterruptedException { this(mind, "node", mindName, 500.0e-3, redisClient); } - public MemoryStorageCodelet(Mind mind, String nodeName, String mindName, double requestTimeout, RedisClient redisClient) { + public MemoryStorageCodelet(Mind mind, String nodeName, String mindName, + double requestTimeout, RedisClient redisClient) throws ExecutionException, InterruptedException { listeners = new HashMap<>(); memories = new HashMap<>(); lastUpdate = new HashMap<>(); @@ -99,22 +107,16 @@ public MemoryStorageCodelet(Mind mind, String nodeName, String mindName, double String baseName = nodeName; String mindNodesPath = String.format("%s:nodes", mindName); - try { - boolean isMemberResult = commands.sismember(mindNodesPath, nodeName).get(); - if(isMemberResult) - { - Long nodeNumber = commands.scard(mindNodesPath).get(); + boolean isMemberResult = commands.sismember(mindNodesPath, nodeName).get(); + if (isMemberResult) { + Long nodeNumber = commands.scard(mindNodesPath).get(); + nodeName = baseName + Long.toString(nodeNumber); + + isMemberResult = commands.sismember(mindNodesPath, nodeName).get(); + while (isMemberResult) { + nodeNumber += 1; nodeName = baseName + Long.toString(nodeNumber); - - isMemberResult = commands.sismember(mindNodesPath, nodeName).get(); - while (isMemberResult) { - nodeNumber += 1; - nodeName = baseName+Long.toString(nodeNumber); - } } - } catch (Exception e) { - // TODO: handle exception - e.printStackTrace(); } this.nodeName = nodeName; @@ -141,11 +143,11 @@ public void message(String receivedChannel, String message) { } @Override - public void accessMemoryObjects() { + public void accessMemoryObjects() { // NOSONAR } @Override - public void calculateActivation() { + public void calculateActivation() { // NOSONAR } @Override @@ -166,30 +168,27 @@ public void proc() { Set difference = mindMemoriesNames; for (String memoryName : difference) { Memory memory = mindMemories.get(memoryName); - memories.put(memoryName, new WeakReference(memory)); + memories.put(memoryName, new WeakReference<>(memory)); - String memoryPath = String.format("%s:memories:%s", mindName, memoryName); + String memoryPath = String.format(MEMORY_PATH_TEMPLATE, mindName, memoryName); RedisFuture existFuture = commands.exists(memoryPath); try { if (existFuture.get() > 0) { - retrieveExecutor.execute(() -> { - retrieveMemory(memory); - }); + retrieveExecutor.execute(() -> retrieveMemory(memory)); } else { Map impostor = new HashMap<>(); impostor.put("name", memoryName); impostor.put("evaluation", "0.0"); impostor.put("I", ""); impostor.put("id", "0"); - impostor.put("owner", nodeName); - impostor.put("logical_time", currentTime.toString()); + impostor.put(OWNER_FIELD, nodeName); + impostor.put(LOGICAL_TIME_FIELD, currentTime.toString()); commands.hset(memoryPath, impostor); currentTime = currentTime.increment(); } - } catch (Exception e) { - // TODO: handle exception - e.printStackTrace(); + } catch (ExecutionException | InterruptedException e) { // NOSONAR + LOGGER.log(Level.SEVERE, "Can't send memory to Redis server"); } String subscribeUpdatePath = String.format("%s:memories:%s:update", mindName, memoryName); @@ -200,17 +199,12 @@ public void proc() { Set toUpdate = lastUpdate.keySet(); for (String memoryName : toUpdate) { - if (!memories.containsKey(memoryName)) { + if (!memories.containsKey(memoryName) || memories.get(memoryName).get() == null) { lastUpdate.remove(memoryName); memoryLogicalTime.remove(memoryName); continue; } Memory memory = memories.get(memoryName).get(); - if (memory == null) { - lastUpdate.remove(memoryName); - memories.remove(memoryName); - continue; - } if (memory.getTimestamp() > lastUpdate.get(memoryName)) { memoryLogicalTime.put(memoryName, currentTime); @@ -234,26 +228,21 @@ private void updateMemory(String memoryName) { } try { - String memoryPath = String.format("%s:memories:%s", mindName, memoryName); - String messageTimeStr = commands.hget(memoryPath, "logical_time").get(); + String memoryPath = String.format(MEMORY_PATH_TEMPLATE, mindName, memoryName); + String messageTimeStr = commands.hget(memoryPath, LOGICAL_TIME_FIELD).get(); LamportTime messageTime = LamportTime.fromString(messageTimeStr); LamportTime memoryTime = memoryLogicalTime.get(memoryName); - if (memoryTime.lessThan(messageTime)) { - retrieveExecutor.execute(() -> { - retrieveMemory(memory); - }); + retrieveExecutor.execute(() -> retrieveMemory(memory)); } else if (messageTime.lessThan(memoryTime)) { sendMemory(memory); } lastUpdate.put(memoryName, memory.getTimestamp()); - } catch (Exception e) { - // TODO: handle exception - System.err.println("Memory update error!"); - e.printStackTrace(); + } catch (ExecutionException | InterruptedException e) { // NOSONAR + LOGGER.log(Level.SEVERE, "Can't retrieve information from Redis server"); } } @@ -261,10 +250,10 @@ private void sendMemory(Memory memory) { String memoryName = memory.getName(); Map memoryDict = MemoryEncoder.toDict(memory); - memoryDict.put("owner", ""); - memoryDict.put("logical_time", memoryLogicalTime.get(memoryName).toString()); + memoryDict.put(OWNER_FIELD, ""); + memoryDict.put(LOGICAL_TIME_FIELD, memoryLogicalTime.get(memoryName).toString()); - String memoryPath = String.format("%s:memories:%s", mindName, memoryName); + String memoryPath = String.format(MEMORY_PATH_TEMPLATE, mindName, memoryName); commands.hset(memoryPath, memoryDict); String memoryUpdatePath = memoryPath + ":update"; @@ -281,23 +270,24 @@ private void retrieveMemory(Memory memory) { } waitingRetrieval.add(memoryName); - String memoryPath = String.format("%s:memories:%s", mindName, memoryName); + String memoryPath = String.format(MEMORY_PATH_TEMPLATE, mindName, memoryName); Map memoryDict; try { memoryDict = commands.hgetall(memoryPath).get(); - String owner = memoryDict.get("owner"); + String owner = memoryDict.get(OWNER_FIELD); - if (owner != "") { - CompletableFuture event = new CompletableFuture(); + if (!owner.equals("")) { + CompletableFuture event = new CompletableFuture<>(); waitingRequestEvents.put(memoryName, event); requestMemory(memoryName, owner); - try { - if (!event.get((long) (requestTimeout * 1000), TimeUnit.MILLISECONDS)) { + try { // NOSONAR + boolean eventResult = event.get((long) (requestTimeout * 1000), TimeUnit.MILLISECONDS); + if (!eventResult) { sendMemory(memory); } - } catch (Exception e) { - // sendMemory(memory); + } catch (TimeoutException e) { + sendMemory(memory); } memoryDict = commands.hgetall(memoryPath).get(); @@ -305,30 +295,29 @@ private void retrieveMemory(Memory memory) { MemoryEncoder.loadMemory(memory, memoryDict); - LamportTime messageTime = LamportTime.fromString(memoryDict.get("logical_time")); + LamportTime messageTime = LamportTime.fromString(memoryDict.get(LOGICAL_TIME_FIELD)); currentTime = LamportTime.synchronize(messageTime, currentTime); - + Long timestamp = memory.getTimestamp(); lastUpdate.put(memoryName, timestamp); memoryLogicalTime.put(memoryName, messageTime); waitingRetrieval.remove(memoryName); - } catch (Exception e) { - // TODO handle exception - e.printStackTrace(); + } catch (ExecutionException | InterruptedException e) { // NOSONAR + LOGGER.log(Level.SEVERE, "Can't send memory to Redis server"); } } private void requestMemory(String memoryName, String ownerName) { String requestAddr = String.format("%s:nodes:%s:transfer_memory", mindName, ownerName); - HashMap requestDict = new HashMap(); - requestDict.put("memory_name", memoryName); + HashMap requestDict = new HashMap<>(); + requestDict.put(MEMORY_NAME_FIELD, memoryName); requestDict.put("node", nodeName); Map fullRequestDict = new HashMap<>(); fullRequestDict.put("request", requestDict); - fullRequestDict.put("logical_time", currentTime.toString()); + fullRequestDict.put(LOGICAL_TIME_FIELD, currentTime.toString()); String request = gson.toJson(fullRequestDict); @@ -336,16 +325,16 @@ private void requestMemory(String memoryName, String ownerName) { } private void handlerNotifyTransfer(String message) { - Type type = new TypeToken>(){}.getType(); + Type type = new TypeToken>() { + }.getType(); Map data = gson.fromJson(message, type); - if(data.containsKey("logical_time")) - { - LamportTime messageTime = LamportTime.fromString(data.get("logical_time")); + if (data.containsKey(LOGICAL_TIME_FIELD)) { + LamportTime messageTime = LamportTime.fromString(data.get(LOGICAL_TIME_FIELD)); currentTime = LamportTime.synchronize(messageTime, currentTime); } - String memoryName = data.get("memory_name"); + String memoryName = data.get(MEMORY_NAME_FIELD); CompletableFuture event = waitingRequestEvents.get(memoryName); if (event != null) { @@ -355,19 +344,26 @@ private void handlerNotifyTransfer(String message) { } } + @SuppressWarnings("unchecked") private void handlerTransferMemory(String message) { - Type type = new TypeToken>(){}.getType(); + Type type = new TypeToken>() { + }.getType(); Map data = gson.fromJson(message, type); - if(data.containsKey("logical_time")) - { - LamportTime messageTime = LamportTime.fromString((String) data.get("logical_time")); + if (data.containsKey(LOGICAL_TIME_FIELD)) { + LamportTime messageTime = LamportTime.fromString((String) data.get(LOGICAL_TIME_FIELD)); currentTime = LamportTime.synchronize(messageTime, currentTime); } - Map request = (Map) data.get("request"); + Map request; + try { + request = (Map) data.get("request"); + } catch (ClassCastException e) { + LOGGER.warning("Transfer memory request is not valid"); + return; + } - String memoryName = request.get("memory_name"); + String memoryName = request.get(MEMORY_NAME_FIELD); String requestingNode = request.get("node"); Memory memory; @@ -383,9 +379,9 @@ private void handlerTransferMemory(String message) { sendMemory(memory); - Map response = new HashMap<>(); - response.put("memory_name", memoryName); - response.put("logical_time", currentTime.toString()); + Map response = new HashMap<>(); + response.put(MEMORY_NAME_FIELD, memoryName); + response.put(LOGICAL_TIME_FIELD, currentTime.toString()); String responseString = gson.toJson(response); String responseAddr = String.format("%s:nodes:%s:transfer_done", mindName, requestingNode); @@ -403,8 +399,7 @@ private void unsubscribe(String channel) { } @Override - public void stop() - { + public synchronized void stop() { pubsubConnection.close(); retrieveExecutor.shutdownNow(); redisClient.shutdown(); @@ -412,11 +407,4 @@ public void stop() super.stop(); } - @Override - protected void finalize() { - pubsubConnection.close(); - retrieveExecutor.shutdownNow(); - redisClient.shutdown(); - } - } diff --git a/src/test/java/br/unicamp/cst/memorystorage/MemoryStorageTest.java b/src/test/java/br/unicamp/cst/memorystorage/MemoryStorageTest.java index 05a9ac09..d66806b5 100644 --- a/src/test/java/br/unicamp/cst/memorystorage/MemoryStorageTest.java +++ b/src/test/java/br/unicamp/cst/memorystorage/MemoryStorageTest.java @@ -10,13 +10,11 @@ import java.util.List; import java.util.Set; import java.util.Map; -import java.util.concurrent.ExecutionException; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.platform.commons.function.Try; import br.unicamp.cst.core.entities.Memory; import br.unicamp.cst.core.entities.Mind; From 3e9e688e7b034c8f5946432098a669d8020f793a Mon Sep 17 00:00:00 2001 From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com> Date: Fri, 6 Dec 2024 14:46:35 -0300 Subject: [PATCH 12/21] Documentation --- .../cst/memorystorage/LamportTime.java | 15 +++ .../cst/memorystorage/LogicalTime.java | 23 ++++ .../cst/memorystorage/MemoryEncoder.java | 18 +++- .../memorystorage/MemoryStorageCodelet.java | 101 +++++++++++++++++- 4 files changed, 151 insertions(+), 6 deletions(-) diff --git a/src/main/java/br/unicamp/cst/memorystorage/LamportTime.java b/src/main/java/br/unicamp/cst/memorystorage/LamportTime.java index d71dec77..57531b4d 100644 --- a/src/main/java/br/unicamp/cst/memorystorage/LamportTime.java +++ b/src/main/java/br/unicamp/cst/memorystorage/LamportTime.java @@ -1,19 +1,34 @@ package br.unicamp.cst.memorystorage; +/** + * Logical time implementation using Lamport times. + */ public class LamportTime implements LogicalTime { private int time; + /** + * LamportTime constructor. + * + * @param initialTime time to start the clock. + */ public LamportTime(int initialTime) { this.time = initialTime; } + /** + * LamportTime constructor. + */ public LamportTime() { this(0); } + /** + * Get the clock internal time. + * @return clock internal time. + */ public int getTime() { return this.time; diff --git a/src/main/java/br/unicamp/cst/memorystorage/LogicalTime.java b/src/main/java/br/unicamp/cst/memorystorage/LogicalTime.java index 644fb0a6..57b575b4 100644 --- a/src/main/java/br/unicamp/cst/memorystorage/LogicalTime.java +++ b/src/main/java/br/unicamp/cst/memorystorage/LogicalTime.java @@ -1,7 +1,23 @@ package br.unicamp.cst.memorystorage; + +/** + * A logical time for distributed communication. + */ public interface LogicalTime { + + /** + * Returns a time with the self time incremented by one. + * + * @return incremented time. + */ public abstract LogicalTime increment(); + /** + * Creates a instance from a string. + * + * @param string String to create time, generated with LogicalTime.toString(). + * @return Created time. + */ public static LogicalTime fromString(String string) { throw new IllegalStateException("fromString not implemented in the subclass"); @@ -10,6 +26,13 @@ public static LogicalTime fromString(String string) @Override public abstract String toString(); + /** + * Compares two times, and return the current time. + * + * @param time0 first time to compare. + * @param time1 second time to compare. + * @return current time. + */ public static LogicalTime synchronize(LogicalTime time0, LogicalTime time1) { throw new IllegalStateException("synchronize not implemented in the subclass"); diff --git a/src/main/java/br/unicamp/cst/memorystorage/MemoryEncoder.java b/src/main/java/br/unicamp/cst/memorystorage/MemoryEncoder.java index 77181282..1bceaf96 100644 --- a/src/main/java/br/unicamp/cst/memorystorage/MemoryEncoder.java +++ b/src/main/java/br/unicamp/cst/memorystorage/MemoryEncoder.java @@ -9,13 +9,22 @@ import br.unicamp.cst.core.entities.Memory; +/** + * Encodes and decodes Memories. + */ public class MemoryEncoder { static Gson gson = new GsonBuilder().setObjectToNumberStrategy(ToNumberPolicy.LONG_OR_DOUBLE).create(); - + private MemoryEncoder() { } + /** + * Encodes a memory to a Map. + * + * @param memory memory to encode. + * @return the encoded memory. + */ public static Map toDict(Memory memory) { Map data = new HashMap<>(); @@ -37,7 +46,12 @@ public static Map toDict(Memory memory) { return data; } - + /** + * Load a memory from a Map. + * + * @param memory memory to store the loaded info. + * @param memoryDict map encoded memory. + */ public static void loadMemory(Memory memory, Map memoryDict) { memory.setEvaluation(Double.parseDouble(memoryDict.get("evaluation"))); memory.setId(Long.parseLong(memoryDict.get("id"))); diff --git a/src/main/java/br/unicamp/cst/memorystorage/MemoryStorageCodelet.java b/src/main/java/br/unicamp/cst/memorystorage/MemoryStorageCodelet.java index 3fe00ba2..378bb417 100644 --- a/src/main/java/br/unicamp/cst/memorystorage/MemoryStorageCodelet.java +++ b/src/main/java/br/unicamp/cst/memorystorage/MemoryStorageCodelet.java @@ -31,6 +31,14 @@ import io.lettuce.core.pubsub.RedisPubSubListener; import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; +/** + * Synchonizes local memories with a Redis database. + * + * When using MemoryStorage, each local CST instance is called a node. + * Memories with the same name in participating nodes are synchronized. + * + * The collection of synchonized nodes is a mind.A single Redis instance can support multiple minds with unique names + */ public class MemoryStorageCodelet extends Codelet { private static final String LOGICAL_TIME_FIELD = "logical_time"; private static final String OWNER_FIELD = "owner"; @@ -49,10 +57,6 @@ public class MemoryStorageCodelet extends Codelet { private Mind mind; private String nodeName; - public String getNodeName() { - return nodeName; - } - private String mindName; private double requestTimeout; @@ -66,23 +70,66 @@ public String getNodeName() { private LamportTime currentTime; private Gson gson; + /** + * MemoryStorageCodelet constructor. + * + * @param mind agent mind, used to monitor memories. + * @throws ExecutionException + * @throws InterruptedException + */ public MemoryStorageCodelet(Mind mind) throws ExecutionException, InterruptedException { this(mind, "node", "default_mind", 500.0e-3, RedisClient.create("redis://localhost")); } + /** + * MemoryStorageCodelet constructor. + * + * @param mind agent mind, used to monitor memories. + * @param redisClient redis client to connect. + * @throws ExecutionException + * @throws InterruptedException + */ public MemoryStorageCodelet(Mind mind, RedisClient redisClient) throws ExecutionException, InterruptedException { this(mind, "node", "default_mind", 500.0e-3, redisClient); } + /** + * MemoryStorageCodelet constructor. + * + * @param mind agent mind, used to monitor memories. + * @param mindName name of the network mind. + * @throws ExecutionException + * @throws InterruptedException + */ public MemoryStorageCodelet(Mind mind, String mindName) throws ExecutionException, InterruptedException { this(mind, "node", mindName, 500.0e-3, RedisClient.create("redis://localhost")); } + /*** + * MemoryStorageCodelet constructor. + * + * @param mind agent mind, used to monitor memories. + * @param mindName name of the network mind. + * @param redisClient redis client to connect. + * @throws ExecutionException + * @throws InterruptedException + */ public MemoryStorageCodelet(Mind mind, String mindName, RedisClient redisClient) throws ExecutionException, InterruptedException { this(mind, "node", mindName, 500.0e-3, redisClient); } + /** + * MemoryStorageCodelet constructor. + * + * @param mind agent mind, used to monitor memories. + * @param nodeName name of the local node in the network. + * @param mindName name of the network mind. + * @param requestTimeout time before timeout when requesting a memory synchonization. + * @param redisClient redis client to connect. + * @throws ExecutionException + * @throws InterruptedException + */ public MemoryStorageCodelet(Mind mind, String nodeName, String mindName, double requestTimeout, RedisClient redisClient) throws ExecutionException, InterruptedException { listeners = new HashMap<>(); @@ -142,6 +189,15 @@ public void message(String receivedChannel, String message) { subscribe(String.format("%s:nodes:%s:transfer_done", mindName, nodeName), handlerNotifyTransfer); } + /** + * Gets the name of the node. + * + * @return node name. + */ + public String getNodeName() { + return nodeName; + } + @Override public void accessMemoryObjects() { // NOSONAR } @@ -213,6 +269,15 @@ public void proc() { } } + /** + * Updates a memory, sending or retrieving the memory data + * to/from the database. + * + * Performs a time comparison with the local data and storage + * data to decide whether to send or retrieve the data. + * + * @param memoryName name of the memory to synchonize. + */ private void updateMemory(String memoryName) { String memoryUpdatePath = String.format("%s:memories:%s:update", mindName, memoryName); @@ -246,6 +311,11 @@ private void updateMemory(String memoryName) { } } + /** + * Sends a memory data to the storage. + * + * @param memory memory to send. + */ private void sendMemory(Memory memory) { String memoryName = memory.getName(); @@ -262,6 +332,13 @@ private void sendMemory(Memory memory) { currentTime = currentTime.increment(); } + /** + * Retrieves a memory data from the storage. + * + * Blocks the application, it is advisable to use a separate thread to call the method. + * + * @param memory memory to retrieve data. + */ private void retrieveMemory(Memory memory) { String memoryName = memory.getName(); @@ -308,6 +385,12 @@ private void retrieveMemory(Memory memory) { } } + /** + * Requests another node to send its local memory to storage. + * + * @param memoryName name of the memory to request. + * @param ownerName node owning the memory. + */ private void requestMemory(String memoryName, String ownerName) { String requestAddr = String.format("%s:nodes:%s:transfer_memory", mindName, ownerName); @@ -324,6 +407,11 @@ private void requestMemory(String memoryName, String ownerName) { commands.publish(requestAddr, request); } + /** + * Handles a message in the notify transfer channel. + * + * @param message message received in the channel. + */ private void handlerNotifyTransfer(String message) { Type type = new TypeToken>() { }.getType(); @@ -344,6 +432,11 @@ private void handlerNotifyTransfer(String message) { } } + /** + * Handles a message in the transfer memory channel. + * + * @param message message received in the channel. + */ @SuppressWarnings("unchecked") private void handlerTransferMemory(String message) { Type type = new TypeToken>() { From 0ef03d68d92ad7221ec4ff50d90b01893ea4270f Mon Sep 17 00:00:00 2001 From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com> Date: Mon, 9 Dec 2024 15:39:44 -0300 Subject: [PATCH 13/21] External test Test MS-Java with MS-Python --- .../cst/memorystorage/ExternalTest.java | 108 ++++++++++++++++++ .../cst/memorystorage/MemoryEncoder.java | 10 +- .../cst/memorystorage/MemoryStorageTest.java | 2 +- 3 files changed, 110 insertions(+), 10 deletions(-) create mode 100644 src/main/java/br/unicamp/cst/memorystorage/ExternalTest.java diff --git a/src/main/java/br/unicamp/cst/memorystorage/ExternalTest.java b/src/main/java/br/unicamp/cst/memorystorage/ExternalTest.java new file mode 100644 index 00000000..b0af0d0d --- /dev/null +++ b/src/main/java/br/unicamp/cst/memorystorage/ExternalTest.java @@ -0,0 +1,108 @@ +package br.unicamp.cst.memorystorage; + +import br.unicamp.cst.core.entities.MemoryObject; +import br.unicamp.cst.core.entities.Mind; +import io.lettuce.core.RedisClient; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.api.async.RedisAsyncCommands; + +/** + * Code created to test the communication of CST-Java Memory Storage with other instances. + * At the moment, it must be executed manually. + */ +public class ExternalTest { + + private static final long SLEEP_TIME = (long) (0.75 * 1000); + + public static void main(String[] args){ + RedisClient redisClient = RedisClient.create("redis://localhost"); + StatefulRedisConnection connection = redisClient.connect(); + RedisAsyncCommands commands = connection.async(); + try { + commands.flushall().get(); + } catch (Exception e) { + e.printStackTrace(); + System.exit(1); + } + + Mind mind = new Mind(); + MemoryObject memory1 = mind.createMemoryObject("Memory1", false); + MemoryStorageCodelet ms; + try{ + ms = new MemoryStorageCodelet(mind); + ms.setTimeStep(100); + } catch(Exception e) + { + e.printStackTrace(); + System.exit(1); + return; + } + + long lastTimestamp = memory1.getTimestamp(); + + mind.insertCodelet(ms); + mind.start(); + + + try{ + boolean valid = false; + + for(int i = 0; i<30; i++) + { + Thread.sleep(100); + + if (lastTimestamp != memory1.getTimestamp()) { + boolean info = (boolean) memory1.getI(); + if(info) + { + valid = true; + break; + } + } + } + + if(!valid) + { + System.err.print("Could not communicate with the other CST node"); + System.exit(1); + } + + memory1.setI("JAVA_INFO"); + + Thread.sleep(SLEEP_TIME); + + String stringInfo = (String) memory1.getI(); + assert stringInfo.equals("OTHER_INFO"); + + memory1.setI(1); + Thread.sleep(SLEEP_TIME); + + Long longInfo = (Long) memory1.getI(); + assert longInfo == -1; + + memory1.setI(1.0); + + Thread.sleep(SLEEP_TIME); + + Double doubleInfo = (Double) memory1.getI(); + assert doubleInfo == 5.0; + + //while (true) { + // if (lastTimestamp != memory1.getTimestamp()) { + // System.out.println(); + // System.out.println(memory1); + // + // lastTimestamp = memory1.getTimestamp(); + // } + //} + } catch(Exception e) + { + e.printStackTrace(); + System.exit(1); + return; + } + + + } + +} diff --git a/src/main/java/br/unicamp/cst/memorystorage/MemoryEncoder.java b/src/main/java/br/unicamp/cst/memorystorage/MemoryEncoder.java index 1bceaf96..84215ffc 100644 --- a/src/main/java/br/unicamp/cst/memorystorage/MemoryEncoder.java +++ b/src/main/java/br/unicamp/cst/memorystorage/MemoryEncoder.java @@ -33,15 +33,7 @@ public static Map toDict(Memory memory) { data.put("id", memory.getId().toString()); data.put("timestamp", memory.getTimestamp().toString()); - Object info = memory.getI(); - if(String.class.isInstance(info)) - { - data.put("I", (String) info); - } - else - { - data.put("I", gson.toJson(info)); - } + data.put("I", gson.toJson(memory.getI())); return data; } diff --git a/src/test/java/br/unicamp/cst/memorystorage/MemoryStorageTest.java b/src/test/java/br/unicamp/cst/memorystorage/MemoryStorageTest.java index d66806b5..2576849f 100644 --- a/src/test/java/br/unicamp/cst/memorystorage/MemoryStorageTest.java +++ b/src/test/java/br/unicamp/cst/memorystorage/MemoryStorageTest.java @@ -158,7 +158,7 @@ public void memoryTransferTest() throws Exception { expectedResult = new HashMap<>(); expectedResult.put("name", "Memory1"); expectedResult.put("evaluation", "0.0"); - expectedResult.put("I", "INFO"); + expectedResult.put("I", "\"INFO\""); expectedResult.put("id", "0"); expectedResult.put("owner", ""); From 3f230dc77260bd97b0c9255a86ec5338da1dc8e2 Mon Sep 17 00:00:00 2001 From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com> Date: Mon, 9 Dec 2024 16:01:23 -0300 Subject: [PATCH 14/21] Update MemoryEncoderTest.java --- .../java/br/unicamp/cst/memorystorage/MemoryEncoderTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/br/unicamp/cst/memorystorage/MemoryEncoderTest.java b/src/test/java/br/unicamp/cst/memorystorage/MemoryEncoderTest.java index 34cc7839..fc99d720 100644 --- a/src/test/java/br/unicamp/cst/memorystorage/MemoryEncoderTest.java +++ b/src/test/java/br/unicamp/cst/memorystorage/MemoryEncoderTest.java @@ -45,7 +45,7 @@ public void loadMemoryTest() assertEquals(0.5, memory.getEvaluation()); assertEquals(123, memory.getId()); - List info = (List) memory.getI(); + List info = (List) memory.getI(); assertEquals(3, info.size()); assertEquals(5, info.get(0)); From 8bf0293417bd82626214a36cbb96ab5cf14650c2 Mon Sep 17 00:00:00 2001 From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com> Date: Wed, 11 Dec 2024 16:14:07 -0300 Subject: [PATCH 15/21] Move ExternalTest --- .../java/br/unicamp/cst/memorystorage/ExternalTest.java | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename src/{main => test}/java/br/unicamp/cst/memorystorage/ExternalTest.java (100%) diff --git a/src/main/java/br/unicamp/cst/memorystorage/ExternalTest.java b/src/test/java/br/unicamp/cst/memorystorage/ExternalTest.java similarity index 100% rename from src/main/java/br/unicamp/cst/memorystorage/ExternalTest.java rename to src/test/java/br/unicamp/cst/memorystorage/ExternalTest.java From 3606ce2bd9a4fd02e12e74023908b8390d57a3c8 Mon Sep 17 00:00:00 2001 From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com> Date: Wed, 11 Dec 2024 16:23:48 -0300 Subject: [PATCH 16/21] Test MS mind names --- .../cst/memorystorage/MemoryStorageTest.java | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/src/test/java/br/unicamp/cst/memorystorage/MemoryStorageTest.java b/src/test/java/br/unicamp/cst/memorystorage/MemoryStorageTest.java index 2576849f..8c06eb08 100644 --- a/src/test/java/br/unicamp/cst/memorystorage/MemoryStorageTest.java +++ b/src/test/java/br/unicamp/cst/memorystorage/MemoryStorageTest.java @@ -76,6 +76,32 @@ public void tearDown() throws Exception { commands.flushall().get(); } + @Test + public void testMindName() throws Exception{ + MemoryStorageCodelet msCodelet = new MemoryStorageCodelet(mind, "Mind1"); + msCodelet.setTimeStep(50); + mind.insertCodelet(msCodelet); + + RedisURI uri = RedisURI.Builder + .redis("localhost", 6379) + .build(); + RedisClient client = RedisClient.create(uri); + + MemoryStorageCodelet msCodelet2 = new MemoryStorageCodelet(mind, "Mind2", client); + msCodelet2.setTimeStep(50); + mind.insertCodelet(msCodelet2); + + mind.start(); + + Set members = commands.smembers("Mind1:nodes").get(); + assertEquals(1, members.size()); + assertTrue(members.contains("node")); + + Set members2 = commands.smembers("Mind2:nodes").get(); + assertEquals(1, members2.size()); + assertTrue(members2.contains("node")); + } + @Test public void nodeEnterTest() throws Exception { MemoryStorageCodelet msCodelet = new MemoryStorageCodelet(mind); From e857da3f597a4a30f320b0b2b0747066864f919e Mon Sep 17 00:00:00 2001 From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com> Date: Wed, 11 Dec 2024 16:50:01 -0300 Subject: [PATCH 17/21] Test LogicalTime static members --- .../cst/memorystorage/LogicalTimeTest.java | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) create mode 100644 src/test/java/br/unicamp/cst/memorystorage/LogicalTimeTest.java diff --git a/src/test/java/br/unicamp/cst/memorystorage/LogicalTimeTest.java b/src/test/java/br/unicamp/cst/memorystorage/LogicalTimeTest.java new file mode 100644 index 00000000..0392783c --- /dev/null +++ b/src/test/java/br/unicamp/cst/memorystorage/LogicalTimeTest.java @@ -0,0 +1,24 @@ +package br.unicamp.cst.memorystorage; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import org.junit.jupiter.api.Test; + +public class LogicalTimeTest { + + @Test + public void fromStringNotImplementedTest() + { + assertThrows(IllegalStateException.class, () -> LogicalTime.fromString("null")); + } + + @Test + public void synchronizeNotImplementedTest() + { + LamportTime time = new LamportTime(); + + assertThrows(IllegalStateException.class, () -> LogicalTime.synchronize(time, time)); + } + +} From bb820c79736054c27e0b824b5af1d78239b895ed Mon Sep 17 00:00:00 2001 From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com> Date: Wed, 11 Dec 2024 16:50:10 -0300 Subject: [PATCH 18/21] Test MS node name --- .../memorystorage/MemoryStorageCodelet.java | 1 + .../cst/memorystorage/MemoryStorageTest.java | 29 +++++++++++++++++-- 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/src/main/java/br/unicamp/cst/memorystorage/MemoryStorageCodelet.java b/src/main/java/br/unicamp/cst/memorystorage/MemoryStorageCodelet.java index 378bb417..b8bd6542 100644 --- a/src/main/java/br/unicamp/cst/memorystorage/MemoryStorageCodelet.java +++ b/src/main/java/br/unicamp/cst/memorystorage/MemoryStorageCodelet.java @@ -163,6 +163,7 @@ public MemoryStorageCodelet(Mind mind, String nodeName, String mindName, while (isMemberResult) { nodeNumber += 1; nodeName = baseName + Long.toString(nodeNumber); + isMemberResult = commands.sismember(mindNodesPath, nodeName).get(); } } diff --git a/src/test/java/br/unicamp/cst/memorystorage/MemoryStorageTest.java b/src/test/java/br/unicamp/cst/memorystorage/MemoryStorageTest.java index 8c06eb08..d0d6d8a7 100644 --- a/src/test/java/br/unicamp/cst/memorystorage/MemoryStorageTest.java +++ b/src/test/java/br/unicamp/cst/memorystorage/MemoryStorageTest.java @@ -33,6 +33,7 @@ public class MemoryStorageTest { private Mind mind; private Mind mind2; + private Mind mind3; private List startTimes; private long sleepTime; @@ -62,6 +63,7 @@ public void init() throws Exception { mind = new Mind(); mind2 = new Mind(); + mind3 = new Mind(); Field field = RawMemory.class.getDeclaredField("lastid"); field.setAccessible(true); @@ -72,6 +74,7 @@ public void init() throws Exception { public void tearDown() throws Exception { mind.shutDown(); mind2.shutDown(); + mind3.shutDown(); commands.flushall().get(); } @@ -104,6 +107,11 @@ public void testMindName() throws Exception{ @Test public void nodeEnterTest() throws Exception { + RedisURI uri = RedisURI.Builder + .redis("localhost", 6379) + .build(); + RedisClient client = RedisClient.create(uri); + MemoryStorageCodelet msCodelet = new MemoryStorageCodelet(mind); msCodelet.setTimeStep(50); mind.insertCodelet(msCodelet); @@ -117,19 +125,34 @@ public void nodeEnterTest() throws Exception { assertEquals(1, members.size()); assertTrue(members.contains("node")); - MemoryStorageCodelet msCodelet2 = new MemoryStorageCodelet(mind2); + MemoryStorageCodelet msCodelet2 = new MemoryStorageCodelet(mind2,"node2", "default_mind", 500.0e-3, client ); msCodelet.setTimeStep(50); mind2.insertCodelet(msCodelet2); mind2.start(); Thread.sleep(sleepTime); - assertEquals("node1", msCodelet2.getNodeName()); + assertEquals("node2", msCodelet2.getNodeName()); members = commands.smembers("default_mind:nodes").get(); assertEquals(2, members.size()); assertTrue(members.contains("node")); - assertTrue(members.contains("node1")); + assertTrue(members.contains("node2")); + + MemoryStorageCodelet msCodelet3 = new MemoryStorageCodelet(mind3); + msCodelet3.setTimeStep(50); + mind3.insertCodelet(msCodelet3); + mind3.start(); + + Thread.sleep(sleepTime); + + assertEquals("node3", msCodelet3.getNodeName()); + + members = commands.smembers("default_mind:nodes").get(); + assertEquals(3, members.size()); + assertTrue(members.contains("node")); + assertTrue(members.contains("node2")); + assertTrue(members.contains("node3")); } @Test From 06b21b2b73dc9f622f6fe415d8b8a9df450ffdb8 Mon Sep 17 00:00:00 2001 From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com> Date: Wed, 11 Dec 2024 17:03:41 -0300 Subject: [PATCH 19/21] LamportTime 100% coverage --- .../cst/memorystorage/LamportTime.java | 2 +- .../cst/memorystorage/LamportTimeTest.java | 55 +++++++++++++++++++ 2 files changed, 56 insertions(+), 1 deletion(-) diff --git a/src/main/java/br/unicamp/cst/memorystorage/LamportTime.java b/src/main/java/br/unicamp/cst/memorystorage/LamportTime.java index 57531b4d..8a95df09 100644 --- a/src/main/java/br/unicamp/cst/memorystorage/LamportTime.java +++ b/src/main/java/br/unicamp/cst/memorystorage/LamportTime.java @@ -51,7 +51,7 @@ public String toString() public static LamportTime synchronize(LogicalTime time0, LogicalTime time1) { - if(!(LogicalTime.class.isInstance(time0) && LogicalTime.class.isInstance(time1))){ + if(!(LamportTime.class.isInstance(time0) && LamportTime.class.isInstance(time1))){ throw new IllegalArgumentException("LamportTime can only synchonize LamportTime instances"); } diff --git a/src/test/java/br/unicamp/cst/memorystorage/LamportTimeTest.java b/src/test/java/br/unicamp/cst/memorystorage/LamportTimeTest.java index e832882c..596cb001 100644 --- a/src/test/java/br/unicamp/cst/memorystorage/LamportTimeTest.java +++ b/src/test/java/br/unicamp/cst/memorystorage/LamportTimeTest.java @@ -1,6 +1,9 @@ package br.unicamp.cst.memorystorage; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import org.junit.jupiter.api.Test; @@ -53,5 +56,57 @@ public void synchronizeTest() assertTrue(time0.lessThan(timeS)); assertTrue(time1.lessThan(timeS)); assertEquals(56, timeS.getTime()); + + LamportTime timeS2 = LamportTime.synchronize(time1, time0); + + assertTrue(time0.lessThan(timeS2)); + assertTrue(time1.lessThan(timeS2)); + assertEquals(56, timeS2.getTime()); + } + + @Test + public void equalsTest() + { + LamportTime time0 = new LamportTime(0); + LamportTime time1 = new LamportTime(1); + LamportTime time2 = new LamportTime(1); + + assertNotEquals(time0, time1); + assertEquals(time1, time2); + } + + @Test + public void lessThanTest() + { + LamportTime time0 = new LamportTime(0); + LamportTime time1 = new LamportTime(1); + LamportTime time2 = new LamportTime(2); + + assertTrue(time0.lessThan(time1)); + assertFalse(time2.lessThan(time1)); + } + + @Test + public void synchonizeNonLamportTest() + { + LogicalTime logicalTime = new LogicalTime() { + + @Override + public LogicalTime increment() { + throw new UnsupportedOperationException("Unimplemented method 'increment'"); + } + + @Override + public boolean lessThan(Object o) { + throw new UnsupportedOperationException("Unimplemented method 'lessThan'"); + } + + }; + + LamportTime lamportTime = new LamportTime(); + + assertThrows(IllegalArgumentException.class, () -> LamportTime.synchronize(lamportTime, logicalTime)); + assertThrows(IllegalArgumentException.class, () -> LamportTime.synchronize(logicalTime, lamportTime)); + } } From d421d85fde25830ab702d50367e9776819bfc678 Mon Sep 17 00:00:00 2001 From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com> Date: Thu, 12 Dec 2024 14:48:19 -0300 Subject: [PATCH 20/21] MS: Memory Delete Test --- .../cst/memorystorage/MemoryStorageTest.java | 50 ++++++++++++++++++- 1 file changed, 48 insertions(+), 2 deletions(-) diff --git a/src/test/java/br/unicamp/cst/memorystorage/MemoryStorageTest.java b/src/test/java/br/unicamp/cst/memorystorage/MemoryStorageTest.java index d0d6d8a7..53c4f8e1 100644 --- a/src/test/java/br/unicamp/cst/memorystorage/MemoryStorageTest.java +++ b/src/test/java/br/unicamp/cst/memorystorage/MemoryStorageTest.java @@ -4,11 +4,13 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assumptions.assumeTrue; +import java.lang.ref.WeakReference; import java.lang.reflect.Field; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Set; +import java.util.function.Consumer; import java.util.Map; import org.junit.jupiter.api.AfterEach; @@ -174,7 +176,7 @@ public void redisArgsTest() throws Exception { } @Test - public void memoryTransferTest() throws Exception { + public void deleteMemoryTest() throws Exception { Memory memory1 = mind.createMemoryObject("Memory1", "INFO"); MemoryStorageCodelet msCodelet = new MemoryStorageCodelet(mind); @@ -237,7 +239,7 @@ public void msTest() throws Exception Memory mind2Memory1 = mind2.createMemoryObject("Memory1", ""); MemoryStorageCodelet msCodelet2 = new MemoryStorageCodelet(mind2); - msCodelet.setTimeStep(50); + msCodelet2.setTimeStep(50); mind2.insertCodelet(msCodelet2); mind2.start(); @@ -292,4 +294,48 @@ public void msTest() throws Exception assertEquals(true, mind2Memory1.getI()); } + @Test + public void unsubscribeTest() throws Exception + { + MemoryStorageCodelet msCodelet = new MemoryStorageCodelet(mind); + msCodelet.setTimeStep(50); + + mind.createMemoryObject("Memory", "NODE1_INFO"); + + mind.insertCodelet(msCodelet); + mind.start(); + + Thread.sleep(sleepTime); + + mind.getRawMemory().shutDown(); + System.gc(); + + Thread.sleep(sleepTime); + + commands.publish("default_mind:memories:Memory:update", ""); + + Thread.sleep(sleepTime); + + Field memoriesField = msCodelet.getClass().getDeclaredField("memories"); + memoriesField.setAccessible(true); + HashMap> memories = (HashMap>) memoriesField.get(msCodelet); + + Field listenersField = msCodelet.getClass().getDeclaredField("listeners"); + listenersField.setAccessible(true); + HashMap> listeners = (HashMap>) listenersField.get(msCodelet); + + assertEquals(0, memories.size()); + assertEquals(2, listeners.size()); + + Memory memory2 = mind2.createMemoryObject("Memory", "NODE2_INFO"); + MemoryStorageCodelet msCodelet2 = new MemoryStorageCodelet(mind2); + msCodelet2.setTimeStep(50); + mind2.insertCodelet(msCodelet2); + mind2.start(); + + Thread.sleep(sleepTime); + + assertEquals("NODE2_INFO", memory2.getI()); + } + } From b25903bacf3a0ef9cf4438efc28adf1deaaaf3fb Mon Sep 17 00:00:00 2001 From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com> Date: Thu, 12 Dec 2024 14:53:35 -0300 Subject: [PATCH 21/21] MS: Fix tests names --- .../java/br/unicamp/cst/memorystorage/MemoryStorageTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/br/unicamp/cst/memorystorage/MemoryStorageTest.java b/src/test/java/br/unicamp/cst/memorystorage/MemoryStorageTest.java index 53c4f8e1..c9bc24b5 100644 --- a/src/test/java/br/unicamp/cst/memorystorage/MemoryStorageTest.java +++ b/src/test/java/br/unicamp/cst/memorystorage/MemoryStorageTest.java @@ -176,7 +176,7 @@ public void redisArgsTest() throws Exception { } @Test - public void deleteMemoryTest() throws Exception { + public void transferMemoryTest() throws Exception { Memory memory1 = mind.createMemoryObject("Memory1", "INFO"); MemoryStorageCodelet msCodelet = new MemoryStorageCodelet(mind);