diff --git a/.github/workflows/java-ci.yml b/.github/workflows/java-ci.yml index 4e0d1401..d0e330f0 100644 --- a/.github/workflows/java-ci.yml +++ b/.github/workflows/java-ci.yml @@ -17,6 +17,22 @@ jobs: matrix: java: [ '8', '11', '17'] name: JDK ${{ matrix.Java }} build + + services: + # Label used to access the service container + redis: + # Docker Hub image + image: redis + # Set health checks to wait until redis has started + options: >- + --health-cmd "redis-cli ping" + --health-interval 10s + --health-timeout 5s + --health-retries 5 + ports: + # Maps port 6379 on service container to the host + - 6379:6379 + steps: - uses: actions/checkout@v4.1.1 - name: Set up Java diff --git a/build.gradle b/build.gradle index c0c25090..c5d81970 100644 --- a/build.gradle +++ b/build.gradle @@ -51,6 +51,7 @@ dependencies { testImplementation 'org.junit.jupiter:junit-jupiter-api:5.10.1' testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.10.1' testRuntimeOnly("org.junit.platform:junit-platform-launcher") + api 'io.lettuce:lettuce-core:6.3.2.RELEASE' } diff --git a/src/main/java/br/unicamp/cst/memorystorage/LamportTime.java b/src/main/java/br/unicamp/cst/memorystorage/LamportTime.java new file mode 100644 index 00000000..8a95df09 --- /dev/null +++ b/src/main/java/br/unicamp/cst/memorystorage/LamportTime.java @@ -0,0 +1,89 @@ +package br.unicamp.cst.memorystorage; + +/** + * Logical time implementation using Lamport times. + */ +public class LamportTime implements LogicalTime { + + private int time; + + /** + * LamportTime constructor. + * + * @param initialTime time to start the clock. + */ + public LamportTime(int initialTime) + { + this.time = initialTime; + } + + /** + * LamportTime constructor. + */ + public LamportTime() + { + this(0); + } + + /** + * Get the clock internal time. + * @return clock internal time. + */ + public int getTime() + { + return this.time; + } + + @Override + public LamportTime increment() { + return new LamportTime(this.time+1); + } + + public static LamportTime fromString(String string) + { + return new LamportTime(Integer.parseInt(string)); + } + + public String toString() + { + return Integer.toString(time); + } + + + public static LamportTime synchronize(LogicalTime time0, LogicalTime time1) { + if(!(LamportTime.class.isInstance(time0) && LamportTime.class.isInstance(time1))){ + throw new IllegalArgumentException("LamportTime can only synchonize LamportTime instances"); + } + + LamportTime lamportTime0 = (LamportTime) time0; + LamportTime lamportTime1 = (LamportTime) time1; + + int newTime = 0; + if(time0.lessThan(time1)){ + newTime = lamportTime1.getTime(); + } + else{ + newTime = lamportTime0.getTime(); + } + + newTime += 1; + + return new LamportTime(newTime); + + } + + @Override + public boolean lessThan(Object o) { + LamportTime otherTime = (LamportTime) o; + + return this.time < otherTime.getTime(); + } + + @Override + public boolean equals(Object o) + { + LamportTime otherTime = (LamportTime) o; + return this.time == otherTime.getTime(); + } + +} diff --git a/src/main/java/br/unicamp/cst/memorystorage/LogicalTime.java b/src/main/java/br/unicamp/cst/memorystorage/LogicalTime.java new file mode 100644 index 00000000..57b575b4 --- /dev/null +++ b/src/main/java/br/unicamp/cst/memorystorage/LogicalTime.java @@ -0,0 +1,44 @@ +package br.unicamp.cst.memorystorage; + +/** + * A logical time for distributed communication. + */ +public interface LogicalTime { + + /** + * Returns a time with the self time incremented by one. + * + * @return incremented time. + */ + public abstract LogicalTime increment(); + + /** + * Creates a instance from a string. + * + * @param string String to create time, generated with LogicalTime.toString(). + * @return Created time. + */ + public static LogicalTime fromString(String string) + { + throw new IllegalStateException("fromString not implemented in the subclass"); + } + + @Override + public abstract String toString(); + + /** + * Compares two times, and return the current time. + * + * @param time0 first time to compare. + * @param time1 second time to compare. + * @return current time. + */ + public static LogicalTime synchronize(LogicalTime time0, LogicalTime time1) + { + throw new IllegalStateException("synchronize not implemented in the subclass"); + } + + @Override + public abstract boolean equals(Object o); + public abstract boolean lessThan(Object o); +} diff --git a/src/main/java/br/unicamp/cst/memorystorage/MemoryEncoder.java b/src/main/java/br/unicamp/cst/memorystorage/MemoryEncoder.java new file mode 100644 index 00000000..84215ffc --- /dev/null +++ b/src/main/java/br/unicamp/cst/memorystorage/MemoryEncoder.java @@ -0,0 +1,56 @@ +package br.unicamp.cst.memorystorage; + +import java.util.HashMap; +import java.util.Map; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.ToNumberPolicy; + +import br.unicamp.cst.core.entities.Memory; + +/** + * Encodes and decodes Memories. + */ +public class MemoryEncoder { + static Gson gson = new GsonBuilder().setObjectToNumberStrategy(ToNumberPolicy.LONG_OR_DOUBLE).create(); + + private MemoryEncoder() { + + } + + /** + * Encodes a memory to a Map. + * + * @param memory memory to encode. + * @return the encoded memory. + */ + public static Map toDict(Memory memory) { + Map data = new HashMap<>(); + + data.put("name", memory.getName()); + data.put("evaluation", memory.getEvaluation().toString()); + data.put("id", memory.getId().toString()); + data.put("timestamp", memory.getTimestamp().toString()); + + data.put("I", gson.toJson(memory.getI())); + + return data; + } + + /** + * Load a memory from a Map. + * + * @param memory memory to store the loaded info. + * @param memoryDict map encoded memory. + */ + public static void loadMemory(Memory memory, Map memoryDict) { + memory.setEvaluation(Double.parseDouble(memoryDict.get("evaluation"))); + memory.setId(Long.parseLong(memoryDict.get("id"))); + + String infoJSON = memoryDict.get("I"); + Object info = gson.fromJson(infoJSON, Object.class); + memory.setI(info); + } + +} diff --git a/src/main/java/br/unicamp/cst/memorystorage/MemoryStorageCodelet.java b/src/main/java/br/unicamp/cst/memorystorage/MemoryStorageCodelet.java new file mode 100644 index 00000000..b8bd6542 --- /dev/null +++ b/src/main/java/br/unicamp/cst/memorystorage/MemoryStorageCodelet.java @@ -0,0 +1,504 @@ +package br.unicamp.cst.memorystorage; + +import java.lang.ref.WeakReference; +import java.lang.reflect.Type; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; +import java.util.logging.Level; +import java.util.logging.Logger; + +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; + +import br.unicamp.cst.core.entities.Codelet; +import br.unicamp.cst.core.entities.Memory; +import br.unicamp.cst.core.entities.MemoryObject; +import br.unicamp.cst.core.entities.Mind; +import io.lettuce.core.RedisClient; +import io.lettuce.core.RedisFuture; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.api.async.RedisAsyncCommands; +import io.lettuce.core.pubsub.RedisPubSubAdapter; +import io.lettuce.core.pubsub.RedisPubSubListener; +import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; + +/** + * Synchonizes local memories with a Redis database. + * + * When using MemoryStorage, each local CST instance is called a node. + * Memories with the same name in participating nodes are synchronized. + * + * The collection of synchonized nodes is a mind.A single Redis instance can support multiple minds with unique names + */ +public class MemoryStorageCodelet extends Codelet { + private static final String LOGICAL_TIME_FIELD = "logical_time"; + private static final String OWNER_FIELD = "owner"; + private static final String MEMORY_NAME_FIELD = "memory_name"; + + private static final String MEMORY_PATH_TEMPLATE = "%s:memories:%s"; + + private static final Logger LOGGER = Logger.getLogger(MemoryStorageCodelet.class.getName()); + + private RedisClient redisClient; + private StatefulRedisConnection connection; + private RedisAsyncCommands commands; + private StatefulRedisPubSubConnection pubsubConnection; + HashMap> listeners; + + private Mind mind; + private String nodeName; + + private String mindName; + private double requestTimeout; + + private HashMap> memories; + private HashMap lastUpdate; + private HashMap memoryLogicalTime; + private HashSet waitingRetrieval; + private HashMap> waitingRequestEvents; + private ExecutorService retrieveExecutor; + + private LamportTime currentTime; + private Gson gson; + + /** + * MemoryStorageCodelet constructor. + * + * @param mind agent mind, used to monitor memories. + * @throws ExecutionException + * @throws InterruptedException + */ + public MemoryStorageCodelet(Mind mind) throws ExecutionException, InterruptedException { + this(mind, "node", "default_mind", 500.0e-3, RedisClient.create("redis://localhost")); + } + + /** + * MemoryStorageCodelet constructor. + * + * @param mind agent mind, used to monitor memories. + * @param redisClient redis client to connect. + * @throws ExecutionException + * @throws InterruptedException + */ + public MemoryStorageCodelet(Mind mind, RedisClient redisClient) throws ExecutionException, InterruptedException { + this(mind, "node", "default_mind", 500.0e-3, redisClient); + } + + /** + * MemoryStorageCodelet constructor. + * + * @param mind agent mind, used to monitor memories. + * @param mindName name of the network mind. + * @throws ExecutionException + * @throws InterruptedException + */ + public MemoryStorageCodelet(Mind mind, String mindName) throws ExecutionException, InterruptedException { + this(mind, "node", mindName, 500.0e-3, RedisClient.create("redis://localhost")); + } + + /*** + * MemoryStorageCodelet constructor. + * + * @param mind agent mind, used to monitor memories. + * @param mindName name of the network mind. + * @param redisClient redis client to connect. + * @throws ExecutionException + * @throws InterruptedException + */ + public MemoryStorageCodelet(Mind mind, String mindName, RedisClient redisClient) + throws ExecutionException, InterruptedException { + this(mind, "node", mindName, 500.0e-3, redisClient); + } + + /** + * MemoryStorageCodelet constructor. + * + * @param mind agent mind, used to monitor memories. + * @param nodeName name of the local node in the network. + * @param mindName name of the network mind. + * @param requestTimeout time before timeout when requesting a memory synchonization. + * @param redisClient redis client to connect. + * @throws ExecutionException + * @throws InterruptedException + */ + public MemoryStorageCodelet(Mind mind, String nodeName, String mindName, + double requestTimeout, RedisClient redisClient) throws ExecutionException, InterruptedException { + listeners = new HashMap<>(); + memories = new HashMap<>(); + lastUpdate = new HashMap<>(); + memoryLogicalTime = new HashMap<>(); + waitingRetrieval = new HashSet<>(); + waitingRequestEvents = new HashMap<>(); + retrieveExecutor = Executors.newFixedThreadPool(3); + currentTime = new LamportTime(); + gson = new Gson(); + + this.mind = mind; + this.mindName = mindName; + this.requestTimeout = requestTimeout; + + this.redisClient = redisClient; + connection = redisClient.connect(); + commands = connection.async(); + pubsubConnection = redisClient.connectPubSub(); + + String baseName = nodeName; + + String mindNodesPath = String.format("%s:nodes", mindName); + boolean isMemberResult = commands.sismember(mindNodesPath, nodeName).get(); + if (isMemberResult) { + Long nodeNumber = commands.scard(mindNodesPath).get(); + nodeName = baseName + Long.toString(nodeNumber); + + isMemberResult = commands.sismember(mindNodesPath, nodeName).get(); + while (isMemberResult) { + nodeNumber += 1; + nodeName = baseName + Long.toString(nodeNumber); + isMemberResult = commands.sismember(mindNodesPath, nodeName).get(); + } + } + + this.nodeName = nodeName; + + commands.sadd(mindNodesPath, nodeName); + + RedisPubSubListener listener = new RedisPubSubAdapter() { + @Override + public void message(String receivedChannel, String message) { + Consumer listener = MemoryStorageCodelet.this.listeners.get(receivedChannel); + + if (listener != null) { + listener.accept(message); + } + } + }; + + pubsubConnection.addListener(listener); + + Consumer handlerTransferMemory = this::handlerTransferMemory; + subscribe(String.format("%s:nodes:%s:transfer_memory", mindName, nodeName), handlerTransferMemory); + Consumer handlerNotifyTransfer = this::handlerNotifyTransfer; + subscribe(String.format("%s:nodes:%s:transfer_done", mindName, nodeName), handlerNotifyTransfer); + } + + /** + * Gets the name of the node. + * + * @return node name. + */ + public String getNodeName() { + return nodeName; + } + + @Override + public void accessMemoryObjects() { // NOSONAR + } + + @Override + public void calculateActivation() { // NOSONAR + } + + @Override + public void proc() { + HashMap mindMemories = new HashMap<>(); + HashSet mindMemoriesNames = new HashSet<>(); + for (Memory memory : mind.getRawMemory().getAllMemoryObjects()) { + String memoryName = memory.getName(); + if (!memoryName.equals("")) { + mindMemories.put(memoryName, memory); + mindMemoriesNames.add(memoryName); + } + } + + Set memoriesNames = this.memories.keySet(); + + mindMemoriesNames.removeAll(memoriesNames); + Set difference = mindMemoriesNames; + for (String memoryName : difference) { + Memory memory = mindMemories.get(memoryName); + memories.put(memoryName, new WeakReference<>(memory)); + + String memoryPath = String.format(MEMORY_PATH_TEMPLATE, mindName, memoryName); + RedisFuture existFuture = commands.exists(memoryPath); + try { + if (existFuture.get() > 0) { + retrieveExecutor.execute(() -> retrieveMemory(memory)); + } else { + Map impostor = new HashMap<>(); + impostor.put("name", memoryName); + impostor.put("evaluation", "0.0"); + impostor.put("I", ""); + impostor.put("id", "0"); + impostor.put(OWNER_FIELD, nodeName); + impostor.put(LOGICAL_TIME_FIELD, currentTime.toString()); + + commands.hset(memoryPath, impostor); + currentTime = currentTime.increment(); + } + } catch (ExecutionException | InterruptedException e) { // NOSONAR + LOGGER.log(Level.SEVERE, "Can't send memory to Redis server"); + } + + String subscribeUpdatePath = String.format("%s:memories:%s:update", mindName, memoryName); + Consumer handlerUpdate = message -> this.updateMemory(memoryName); + subscribe(subscribeUpdatePath, handlerUpdate); + + } + + Set toUpdate = lastUpdate.keySet(); + for (String memoryName : toUpdate) { + if (!memories.containsKey(memoryName) || memories.get(memoryName).get() == null) { + lastUpdate.remove(memoryName); + memoryLogicalTime.remove(memoryName); + continue; + } + Memory memory = memories.get(memoryName).get(); + + if (memory.getTimestamp() > lastUpdate.get(memoryName)) { + memoryLogicalTime.put(memoryName, currentTime); + updateMemory(memoryName); + } + } + } + + /** + * Updates a memory, sending or retrieving the memory data + * to/from the database. + * + * Performs a time comparison with the local data and storage + * data to decide whether to send or retrieve the data. + * + * @param memoryName name of the memory to synchonize. + */ + private void updateMemory(String memoryName) { + String memoryUpdatePath = String.format("%s:memories:%s:update", mindName, memoryName); + + if (!memories.containsKey(memoryName)) { + unsubscribe(memoryUpdatePath); + return; + } + Memory memory = memories.get(memoryName).get(); + if (memory == null) { + unsubscribe(memoryUpdatePath); + memories.remove(memoryName); + return; + } + + try { + String memoryPath = String.format(MEMORY_PATH_TEMPLATE, mindName, memoryName); + String messageTimeStr = commands.hget(memoryPath, LOGICAL_TIME_FIELD).get(); + LamportTime messageTime = LamportTime.fromString(messageTimeStr); + + LamportTime memoryTime = memoryLogicalTime.get(memoryName); + + if (memoryTime.lessThan(messageTime)) { + retrieveExecutor.execute(() -> retrieveMemory(memory)); + } else if (messageTime.lessThan(memoryTime)) { + sendMemory(memory); + } + + lastUpdate.put(memoryName, memory.getTimestamp()); + } catch (ExecutionException | InterruptedException e) { // NOSONAR + LOGGER.log(Level.SEVERE, "Can't retrieve information from Redis server"); + } + } + + /** + * Sends a memory data to the storage. + * + * @param memory memory to send. + */ + private void sendMemory(Memory memory) { + String memoryName = memory.getName(); + + Map memoryDict = MemoryEncoder.toDict(memory); + memoryDict.put(OWNER_FIELD, ""); + memoryDict.put(LOGICAL_TIME_FIELD, memoryLogicalTime.get(memoryName).toString()); + + String memoryPath = String.format(MEMORY_PATH_TEMPLATE, mindName, memoryName); + commands.hset(memoryPath, memoryDict); + + String memoryUpdatePath = memoryPath + ":update"; + commands.publish(memoryUpdatePath, ""); + + currentTime = currentTime.increment(); + } + + /** + * Retrieves a memory data from the storage. + * + * Blocks the application, it is advisable to use a separate thread to call the method. + * + * @param memory memory to retrieve data. + */ + private void retrieveMemory(Memory memory) { + String memoryName = memory.getName(); + + if (waitingRetrieval.contains(memoryName)) { + return; + } + waitingRetrieval.add(memoryName); + + String memoryPath = String.format(MEMORY_PATH_TEMPLATE, mindName, memoryName); + Map memoryDict; + try { + memoryDict = commands.hgetall(memoryPath).get(); + String owner = memoryDict.get(OWNER_FIELD); + + if (!owner.equals("")) { + CompletableFuture event = new CompletableFuture<>(); + waitingRequestEvents.put(memoryName, event); + requestMemory(memoryName, owner); + + try { // NOSONAR + boolean eventResult = event.get((long) (requestTimeout * 1000), TimeUnit.MILLISECONDS); + if (!eventResult) { + sendMemory(memory); + } + } catch (TimeoutException e) { + sendMemory(memory); + } + + memoryDict = commands.hgetall(memoryPath).get(); + } + + MemoryEncoder.loadMemory(memory, memoryDict); + + LamportTime messageTime = LamportTime.fromString(memoryDict.get(LOGICAL_TIME_FIELD)); + currentTime = LamportTime.synchronize(messageTime, currentTime); + + Long timestamp = memory.getTimestamp(); + lastUpdate.put(memoryName, timestamp); + memoryLogicalTime.put(memoryName, messageTime); + + waitingRetrieval.remove(memoryName); + } catch (ExecutionException | InterruptedException e) { // NOSONAR + LOGGER.log(Level.SEVERE, "Can't send memory to Redis server"); + } + } + + /** + * Requests another node to send its local memory to storage. + * + * @param memoryName name of the memory to request. + * @param ownerName node owning the memory. + */ + private void requestMemory(String memoryName, String ownerName) { + String requestAddr = String.format("%s:nodes:%s:transfer_memory", mindName, ownerName); + + HashMap requestDict = new HashMap<>(); + requestDict.put(MEMORY_NAME_FIELD, memoryName); + requestDict.put("node", nodeName); + + Map fullRequestDict = new HashMap<>(); + fullRequestDict.put("request", requestDict); + fullRequestDict.put(LOGICAL_TIME_FIELD, currentTime.toString()); + + String request = gson.toJson(fullRequestDict); + + commands.publish(requestAddr, request); + } + + /** + * Handles a message in the notify transfer channel. + * + * @param message message received in the channel. + */ + private void handlerNotifyTransfer(String message) { + Type type = new TypeToken>() { + }.getType(); + Map data = gson.fromJson(message, type); + + if (data.containsKey(LOGICAL_TIME_FIELD)) { + LamportTime messageTime = LamportTime.fromString(data.get(LOGICAL_TIME_FIELD)); + currentTime = LamportTime.synchronize(messageTime, currentTime); + } + + String memoryName = data.get(MEMORY_NAME_FIELD); + + CompletableFuture event = waitingRequestEvents.get(memoryName); + if (event != null) { + event.complete(true); + + waitingRequestEvents.remove(memoryName); + } + } + + /** + * Handles a message in the transfer memory channel. + * + * @param message message received in the channel. + */ + @SuppressWarnings("unchecked") + private void handlerTransferMemory(String message) { + Type type = new TypeToken>() { + }.getType(); + Map data = gson.fromJson(message, type); + + if (data.containsKey(LOGICAL_TIME_FIELD)) { + LamportTime messageTime = LamportTime.fromString((String) data.get(LOGICAL_TIME_FIELD)); + currentTime = LamportTime.synchronize(messageTime, currentTime); + } + + Map request; + try { + request = (Map) data.get("request"); + } catch (ClassCastException e) { + LOGGER.warning("Transfer memory request is not valid"); + return; + } + + String memoryName = request.get(MEMORY_NAME_FIELD); + String requestingNode = request.get("node"); + + Memory memory; + WeakReference memoryReference = memories.get(memoryName); + if (memoryReference == null || memoryReference.get() == null) { + memory = new MemoryObject(); + memory.setName(memoryName); + } else { + memory = memoryReference.get(); + } + + memoryLogicalTime.put(memoryName, currentTime); + + sendMemory(memory); + + Map response = new HashMap<>(); + response.put(MEMORY_NAME_FIELD, memoryName); + response.put(LOGICAL_TIME_FIELD, currentTime.toString()); + String responseString = gson.toJson(response); + + String responseAddr = String.format("%s:nodes:%s:transfer_done", mindName, requestingNode); + commands.publish(responseAddr, responseString); + } + + private void subscribe(String channel, Consumer handler) { + listeners.put(channel, handler); + pubsubConnection.async().subscribe(channel); + } + + private void unsubscribe(String channel) { + pubsubConnection.async().unsubscribe(channel); + listeners.remove(channel); + } + + @Override + public synchronized void stop() { + pubsubConnection.close(); + retrieveExecutor.shutdownNow(); + redisClient.shutdown(); + + super.stop(); + } + +} diff --git a/src/test/java/br/unicamp/cst/memorystorage/ExternalTest.java b/src/test/java/br/unicamp/cst/memorystorage/ExternalTest.java new file mode 100644 index 00000000..b0af0d0d --- /dev/null +++ b/src/test/java/br/unicamp/cst/memorystorage/ExternalTest.java @@ -0,0 +1,108 @@ +package br.unicamp.cst.memorystorage; + +import br.unicamp.cst.core.entities.MemoryObject; +import br.unicamp.cst.core.entities.Mind; +import io.lettuce.core.RedisClient; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.api.async.RedisAsyncCommands; + +/** + * Code created to test the communication of CST-Java Memory Storage with other instances. + * At the moment, it must be executed manually. + */ +public class ExternalTest { + + private static final long SLEEP_TIME = (long) (0.75 * 1000); + + public static void main(String[] args){ + RedisClient redisClient = RedisClient.create("redis://localhost"); + StatefulRedisConnection connection = redisClient.connect(); + RedisAsyncCommands commands = connection.async(); + try { + commands.flushall().get(); + } catch (Exception e) { + e.printStackTrace(); + System.exit(1); + } + + Mind mind = new Mind(); + MemoryObject memory1 = mind.createMemoryObject("Memory1", false); + MemoryStorageCodelet ms; + try{ + ms = new MemoryStorageCodelet(mind); + ms.setTimeStep(100); + } catch(Exception e) + { + e.printStackTrace(); + System.exit(1); + return; + } + + long lastTimestamp = memory1.getTimestamp(); + + mind.insertCodelet(ms); + mind.start(); + + + try{ + boolean valid = false; + + for(int i = 0; i<30; i++) + { + Thread.sleep(100); + + if (lastTimestamp != memory1.getTimestamp()) { + boolean info = (boolean) memory1.getI(); + if(info) + { + valid = true; + break; + } + } + } + + if(!valid) + { + System.err.print("Could not communicate with the other CST node"); + System.exit(1); + } + + memory1.setI("JAVA_INFO"); + + Thread.sleep(SLEEP_TIME); + + String stringInfo = (String) memory1.getI(); + assert stringInfo.equals("OTHER_INFO"); + + memory1.setI(1); + Thread.sleep(SLEEP_TIME); + + Long longInfo = (Long) memory1.getI(); + assert longInfo == -1; + + memory1.setI(1.0); + + Thread.sleep(SLEEP_TIME); + + Double doubleInfo = (Double) memory1.getI(); + assert doubleInfo == 5.0; + + //while (true) { + // if (lastTimestamp != memory1.getTimestamp()) { + // System.out.println(); + // System.out.println(memory1); + // + // lastTimestamp = memory1.getTimestamp(); + // } + //} + } catch(Exception e) + { + e.printStackTrace(); + System.exit(1); + return; + } + + + } + +} diff --git a/src/test/java/br/unicamp/cst/memorystorage/LamportTimeTest.java b/src/test/java/br/unicamp/cst/memorystorage/LamportTimeTest.java new file mode 100644 index 00000000..596cb001 --- /dev/null +++ b/src/test/java/br/unicamp/cst/memorystorage/LamportTimeTest.java @@ -0,0 +1,112 @@ +package br.unicamp.cst.memorystorage; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import org.junit.jupiter.api.Test; + +public class LamportTimeTest { + + @Test + public void initialTimeTest() + { + LamportTime time0 = new LamportTime(123); + + assertEquals(123, time0.getTime()); + } + + @Test + public void stringTest() + { + LamportTime time0 = new LamportTime(456); + + assertEquals("456", time0.toString()); + } + + @Test + public void fromStringTest() + { + LamportTime time0 = new LamportTime(987); + + assertTrue(time0.equals(LamportTime.fromString(time0.toString()))); + } + + @Test + public void incrementTest() + { + LamportTime time0 = new LamportTime(); + int time0Time = time0.getTime(); + + LamportTime time1 = time0.increment(); + + assertEquals(time0Time, time0.getTime()); + assertEquals(time0Time+1, time1.getTime()); + } + + @Test + public void synchronizeTest() + { + LamportTime time0 = new LamportTime(-10); + LamportTime time1 = new LamportTime(55); + + LamportTime timeS = LamportTime.synchronize(time0, time1); + + assertTrue(time0.lessThan(timeS)); + assertTrue(time1.lessThan(timeS)); + assertEquals(56, timeS.getTime()); + + LamportTime timeS2 = LamportTime.synchronize(time1, time0); + + assertTrue(time0.lessThan(timeS2)); + assertTrue(time1.lessThan(timeS2)); + assertEquals(56, timeS2.getTime()); + } + + @Test + public void equalsTest() + { + LamportTime time0 = new LamportTime(0); + LamportTime time1 = new LamportTime(1); + LamportTime time2 = new LamportTime(1); + + assertNotEquals(time0, time1); + assertEquals(time1, time2); + } + + @Test + public void lessThanTest() + { + LamportTime time0 = new LamportTime(0); + LamportTime time1 = new LamportTime(1); + LamportTime time2 = new LamportTime(2); + + assertTrue(time0.lessThan(time1)); + assertFalse(time2.lessThan(time1)); + } + + @Test + public void synchonizeNonLamportTest() + { + LogicalTime logicalTime = new LogicalTime() { + + @Override + public LogicalTime increment() { + throw new UnsupportedOperationException("Unimplemented method 'increment'"); + } + + @Override + public boolean lessThan(Object o) { + throw new UnsupportedOperationException("Unimplemented method 'lessThan'"); + } + + }; + + LamportTime lamportTime = new LamportTime(); + + assertThrows(IllegalArgumentException.class, () -> LamportTime.synchronize(lamportTime, logicalTime)); + assertThrows(IllegalArgumentException.class, () -> LamportTime.synchronize(logicalTime, lamportTime)); + + } +} diff --git a/src/test/java/br/unicamp/cst/memorystorage/LogicalTimeTest.java b/src/test/java/br/unicamp/cst/memorystorage/LogicalTimeTest.java new file mode 100644 index 00000000..0392783c --- /dev/null +++ b/src/test/java/br/unicamp/cst/memorystorage/LogicalTimeTest.java @@ -0,0 +1,24 @@ +package br.unicamp.cst.memorystorage; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import org.junit.jupiter.api.Test; + +public class LogicalTimeTest { + + @Test + public void fromStringNotImplementedTest() + { + assertThrows(IllegalStateException.class, () -> LogicalTime.fromString("null")); + } + + @Test + public void synchronizeNotImplementedTest() + { + LamportTime time = new LamportTime(); + + assertThrows(IllegalStateException.class, () -> LogicalTime.synchronize(time, time)); + } + +} diff --git a/src/test/java/br/unicamp/cst/memorystorage/MemoryEncoderTest.java b/src/test/java/br/unicamp/cst/memorystorage/MemoryEncoderTest.java new file mode 100644 index 00000000..fc99d720 --- /dev/null +++ b/src/test/java/br/unicamp/cst/memorystorage/MemoryEncoderTest.java @@ -0,0 +1,56 @@ +package br.unicamp.cst.memorystorage; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.Map; +import java.util.HashMap; +import java.util.List; + +import org.junit.jupiter.api.Test; + +import br.unicamp.cst.core.entities.Memory; +import br.unicamp.cst.core.entities.MemoryObject; + +public class MemoryEncoderTest { + + @Test + public void toDictTest() + { + MemoryObject memory = new MemoryObject(); + memory.setName("MemoryName"); + memory.setI(123.456); + memory.setId(123l); + memory.setEvaluation(0.5); + + Map memoryDict = MemoryEncoder.toDict(memory); + + assertEquals("123.456", memoryDict.get("I")); + assertEquals(memory.getTimestamp().toString(), memoryDict.get("timestamp")); + assertEquals(Float.toString(0.5f), memoryDict.get("evaluation")); + assertEquals("MemoryName", memoryDict.get("name")); + assertEquals(memory.getId().toString(), memoryDict.get("id")); + } + + @Test + public void loadMemoryTest() + { + Memory memory = new MemoryObject(); + Map memoryDict = new HashMap(); + memoryDict.put("evaluation", "0.5"); + memoryDict.put("id", "123"); + memoryDict.put("I", "[5, 3, 4]"); + + MemoryEncoder.loadMemory(memory, memoryDict); + + assertEquals(0.5, memory.getEvaluation()); + assertEquals(123, memory.getId()); + + List info = (List) memory.getI(); + + assertEquals(3, info.size()); + assertEquals(5, info.get(0)); + assertEquals(3, info.get(1)); + assertEquals(4, info.get(2)); + + } +} diff --git a/src/test/java/br/unicamp/cst/memorystorage/MemoryStorageTest.java b/src/test/java/br/unicamp/cst/memorystorage/MemoryStorageTest.java new file mode 100644 index 00000000..c9bc24b5 --- /dev/null +++ b/src/test/java/br/unicamp/cst/memorystorage/MemoryStorageTest.java @@ -0,0 +1,341 @@ +package br.unicamp.cst.memorystorage; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assumptions.assumeTrue; + +import java.lang.ref.WeakReference; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Set; +import java.util.function.Consumer; +import java.util.Map; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import br.unicamp.cst.core.entities.Memory; +import br.unicamp.cst.core.entities.Mind; +import br.unicamp.cst.core.entities.RawMemory; +import io.lettuce.core.RedisClient; +import io.lettuce.core.RedisConnectionException; +import io.lettuce.core.RedisURI; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.api.async.RedisAsyncCommands; + +public class MemoryStorageTest { + + private static RedisClient client; + private static StatefulRedisConnection connection; + private static RedisAsyncCommands commands; + + private Mind mind; + private Mind mind2; + private Mind mind3; + + private List startTimes; + private long sleepTime; + + @BeforeAll + public static void initAll() throws Exception { + client = RedisClient.create("redis://localhost"); + + try { + connection = client.connect(); + } catch (RedisConnectionException e) { + assumeTrue(false); + } + + commands = connection.async(); + } + + @BeforeEach + public void init() throws Exception { + commands.flushall().get(); + + startTimes = new ArrayList<>(); + startTimes.add(0d); + startTimes.add(1e3); + + sleepTime = (long) (0.75 * 1000); + + mind = new Mind(); + mind2 = new Mind(); + mind3 = new Mind(); + + Field field = RawMemory.class.getDeclaredField("lastid"); + field.setAccessible(true); + field.setLong(null, 0); + } + + @AfterEach + public void tearDown() throws Exception { + mind.shutDown(); + mind2.shutDown(); + mind3.shutDown(); + + commands.flushall().get(); + } + + @Test + public void testMindName() throws Exception{ + MemoryStorageCodelet msCodelet = new MemoryStorageCodelet(mind, "Mind1"); + msCodelet.setTimeStep(50); + mind.insertCodelet(msCodelet); + + RedisURI uri = RedisURI.Builder + .redis("localhost", 6379) + .build(); + RedisClient client = RedisClient.create(uri); + + MemoryStorageCodelet msCodelet2 = new MemoryStorageCodelet(mind, "Mind2", client); + msCodelet2.setTimeStep(50); + mind.insertCodelet(msCodelet2); + + mind.start(); + + Set members = commands.smembers("Mind1:nodes").get(); + assertEquals(1, members.size()); + assertTrue(members.contains("node")); + + Set members2 = commands.smembers("Mind2:nodes").get(); + assertEquals(1, members2.size()); + assertTrue(members2.contains("node")); + } + + @Test + public void nodeEnterTest() throws Exception { + RedisURI uri = RedisURI.Builder + .redis("localhost", 6379) + .build(); + RedisClient client = RedisClient.create(uri); + + MemoryStorageCodelet msCodelet = new MemoryStorageCodelet(mind); + msCodelet.setTimeStep(50); + mind.insertCodelet(msCodelet); + mind.start(); + + Thread.sleep(sleepTime); + + assertEquals("node", msCodelet.getNodeName()); + + Set members = commands.smembers("default_mind:nodes").get(); + assertEquals(1, members.size()); + assertTrue(members.contains("node")); + + MemoryStorageCodelet msCodelet2 = new MemoryStorageCodelet(mind2,"node2", "default_mind", 500.0e-3, client ); + msCodelet.setTimeStep(50); + mind2.insertCodelet(msCodelet2); + mind2.start(); + + Thread.sleep(sleepTime); + + assertEquals("node2", msCodelet2.getNodeName()); + + members = commands.smembers("default_mind:nodes").get(); + assertEquals(2, members.size()); + assertTrue(members.contains("node")); + assertTrue(members.contains("node2")); + + MemoryStorageCodelet msCodelet3 = new MemoryStorageCodelet(mind3); + msCodelet3.setTimeStep(50); + mind3.insertCodelet(msCodelet3); + mind3.start(); + + Thread.sleep(sleepTime); + + assertEquals("node3", msCodelet3.getNodeName()); + + members = commands.smembers("default_mind:nodes").get(); + assertEquals(3, members.size()); + assertTrue(members.contains("node")); + assertTrue(members.contains("node2")); + assertTrue(members.contains("node3")); + } + + @Test + public void redisArgsTest() throws Exception { + RedisURI uri = RedisURI.Builder + .redis("localhost", 6379) + .build(); + RedisClient client = RedisClient.create(uri); + + MemoryStorageCodelet msCodelet = new MemoryStorageCodelet(mind, client); + mind.insertCodelet(msCodelet); + mind.start(); + + Thread.sleep(sleepTime); + + Set members = commands.smembers("default_mind:nodes").get(); + assertEquals(1, members.size()); + assertTrue(members.contains("node")); + } + + @Test + public void transferMemoryTest() throws Exception { + Memory memory1 = mind.createMemoryObject("Memory1", "INFO"); + + MemoryStorageCodelet msCodelet = new MemoryStorageCodelet(mind); + msCodelet.setTimeStep(50); + mind.insertCodelet(msCodelet); + mind.start(); + + Thread.sleep(sleepTime); + + assertTrue(commands.exists("default_mind:memories:Memory1").get() >= 1); + Map result = commands.hgetall("default_mind:memories:Memory1").get(); + + Map expectedResult = new HashMap<>(); + expectedResult.put("name", "Memory1"); + expectedResult.put("evaluation", "0.0"); + expectedResult.put("I", ""); + expectedResult.put("id", "0"); + expectedResult.put("owner", "node"); + expectedResult.put("logical_time", "0"); + + assertEquals(expectedResult, result); + + String request = "{'request':{'memory_name':'Memory1', 'node':'node1'}, 'logical_time':'0'}"; + commands.publish("default_mind:nodes:node:transfer_memory", request); + + Thread.sleep(sleepTime); + + result = commands.hgetall("default_mind:memories:Memory1").get(); + + expectedResult = new HashMap<>(); + expectedResult.put("name", "Memory1"); + expectedResult.put("evaluation", "0.0"); + expectedResult.put("I", "\"INFO\""); + expectedResult.put("id", "0"); + expectedResult.put("owner", ""); + + result.remove("logical_time"); + result.remove("timestamp"); + + assertEquals(expectedResult, result); + } + + @Test + public void msTest() throws Exception + { + Memory memory1 = mind.createMemoryObject("Memory1", ""); + + MemoryStorageCodelet msCodelet = new MemoryStorageCodelet(mind); + msCodelet.setTimeStep(50); + mind.insertCodelet(msCodelet); + mind.start(); + + assertEquals("", memory1.getI()); + + int[] info = {1, 1, 1}; + memory1.setI(info); + + Thread.sleep(sleepTime); + + Memory mind2Memory1 = mind2.createMemoryObject("Memory1", ""); + + MemoryStorageCodelet msCodelet2 = new MemoryStorageCodelet(mind2); + msCodelet2.setTimeStep(50); + mind2.insertCodelet(msCodelet2); + mind2.start(); + + assertEquals("", mind2Memory1.getI()); + + Thread.sleep(sleepTime); + + for(int i = 0; i<3; i++) + { + assertEquals(info[i], ((int[]) memory1.getI())[i]); + assertEquals(info[i], Math.toIntExact((long)((ArrayList) mind2Memory1.getI()).get(i))); + } + + + Map result = commands.hgetall("default_mind:memories:Memory1").get(); + + Map expectedResult = new HashMap<>(); + expectedResult.put("name", "Memory1"); + expectedResult.put("evaluation", "0.0"); + expectedResult.put("I", "[1,1,1]"); + expectedResult.put("id", "0"); + expectedResult.put("owner", ""); + + assertTrue(result.containsKey("logical_time")); + assertTrue(result.containsKey("timestamp")); + result.remove("logical_time"); + result.remove("timestamp"); + assertEquals(result, expectedResult); + + memory1.setI("INFO"); + Thread.sleep(sleepTime); + + assertEquals("INFO", memory1.getI()); + assertEquals("INFO", mind2Memory1.getI()); + + mind2Memory1.setI("INFO2"); + Thread.sleep(sleepTime); + + assertEquals("INFO2", memory1.getI()); + assertEquals("INFO2", mind2Memory1.getI()); + + memory1.setI(1); + Thread.sleep(sleepTime); + + assertEquals(1, memory1.getI()); + assertEquals(1, Math.toIntExact((long) mind2Memory1.getI())); + + memory1.setI(true); + Thread.sleep(sleepTime); + + assertEquals(true, memory1.getI()); + assertEquals(true, mind2Memory1.getI()); + } + + @Test + public void unsubscribeTest() throws Exception + { + MemoryStorageCodelet msCodelet = new MemoryStorageCodelet(mind); + msCodelet.setTimeStep(50); + + mind.createMemoryObject("Memory", "NODE1_INFO"); + + mind.insertCodelet(msCodelet); + mind.start(); + + Thread.sleep(sleepTime); + + mind.getRawMemory().shutDown(); + System.gc(); + + Thread.sleep(sleepTime); + + commands.publish("default_mind:memories:Memory:update", ""); + + Thread.sleep(sleepTime); + + Field memoriesField = msCodelet.getClass().getDeclaredField("memories"); + memoriesField.setAccessible(true); + HashMap> memories = (HashMap>) memoriesField.get(msCodelet); + + Field listenersField = msCodelet.getClass().getDeclaredField("listeners"); + listenersField.setAccessible(true); + HashMap> listeners = (HashMap>) listenersField.get(msCodelet); + + assertEquals(0, memories.size()); + assertEquals(2, listeners.size()); + + Memory memory2 = mind2.createMemoryObject("Memory", "NODE2_INFO"); + MemoryStorageCodelet msCodelet2 = new MemoryStorageCodelet(mind2); + msCodelet2.setTimeStep(50); + mind2.insertCodelet(msCodelet2); + mind2.start(); + + Thread.sleep(sleepTime); + + assertEquals("NODE2_INFO", memory2.getI()); + } + +}