diff --git a/dev/memory_storage.ipynb b/dev/memory_storage.ipynb new file mode 100644 index 0000000..06854c0 --- /dev/null +++ b/dev/memory_storage.ipynb @@ -0,0 +1,472 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 22, + "metadata": {}, + "outputs": [], + "source": [ + "import redis\n", + "import cst_python as cst\n", + "import json" + ] + }, + { + "cell_type": "code", + "execution_count": 23, + "metadata": {}, + "outputs": [], + "source": [ + "mind_name = \"default_mind\"" + ] + }, + { + "cell_type": "code", + "execution_count": 24, + "metadata": {}, + "outputs": [], + "source": [ + "client = redis.Redis(decode_responses=True)\n", + "pubsub = client.pubsub()" + ] + }, + { + "cell_type": "code", + "execution_count": 25, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "True" + ] + }, + "execution_count": 25, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "client.flushall()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Node1 publica que existe" + ] + }, + { + "cell_type": "code", + "execution_count": 26, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "1" + ] + }, + "execution_count": 26, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "client.lpush(f\"{mind_name}:nodes\", \"node1\")" + ] + }, + { + "cell_type": "code", + "execution_count": 27, + "metadata": {}, + "outputs": [], + "source": [ + "class MemoryEncoder(json.JSONEncoder):\n", + " def default(self, memory:cst.core.entities.Memory):\n", + " return MemoryEncoder.to_dict(memory)\n", + " \n", + " @staticmethod\n", + " def to_dict(memory:cst.core.entities.Memory):\n", + " data = {\n", + " \"timestamp\": memory.get_timestamp(),\n", + " \"evaluation\": memory.get_evaluation(),\n", + " \"I\": memory.get_info(),\n", + " \"name\": memory.get_name(),\n", + " \"id\": memory.get_id()\n", + " }\n", + "\n", + " return data" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Node1 checa se memória com id \"memory1\" existe. Como não, publica key:" + ] + }, + { + "cell_type": "code", + "execution_count": 28, + "metadata": {}, + "outputs": [], + "source": [ + "def update_memory(memory_name, memory_object:cst.MemoryObject, client:redis.Redis):\n", + " timestamp = float(client.hget(f\"{mind_name}:memories:{memory_name}\", \"timestamp\"))\n", + " \n", + " if memory_object.timestamp < timestamp:\n", + " print(\"Retrieve update\")\n", + " memory_dict = client.hgetall(f\"{mind_name}:memories:{memory_name}\")\n", + "\n", + " memory_object.set_evaluation(float(memory_dict[\"evaluation\"]))\n", + " memory_object.set_name(memory_dict[\"name\"])\n", + " memory_object.set_id(float(memory_dict[\"id\"]))\n", + "\n", + " info_json = memory_dict[\"I\"]\n", + " info = json.loads(info_json)\n", + "\n", + " memory_object.set_info(info)\n", + "\n", + " memory_object.timestamp = float(memory_dict[\"timestamp\"])\n", + " elif memory_object.timestamp > timestamp:\n", + " print(\"Send update\")\n", + " memory_dict = MemoryEncoder.to_dict(memory_object)\n", + " memory_dict[\"I\"] = json.dumps(memory_dict[\"I\"])\n", + "\n", + " client.hset(f\"{mind_name}:memories:{memory_name}\", mapping=memory_dict)\n", + " client.publish(f\"{mind_name}:memories:{memory_name}:update\", \"\")" + ] + }, + { + "cell_type": "code", + "execution_count": 29, + "metadata": {}, + "outputs": [], + "source": [ + "def create_memory(node, memory_name, client:redis.Redis, pubsub:redis.client.PubSub) -> cst.MemoryObject:\n", + " memory = cst.MemoryObject()\n", + "\n", + " if client.exists(f\"{mind_name}:memories:{memory_name}\"):\n", + " memory_dict = client.hgetall(f\"{mind_name}:memories:{memory_name}\")\n", + "\n", + " if memory_dict[\"owner\"] != \"\":\n", + " #Solicita memória\n", + " pass\n", + "\n", + "\n", + " #Copia memória\n", + " print(\"Copia\")\n", + "\n", + " memory_dict = client.hgetall(f\"{mind_name}:memories:{memory_name}\")\n", + "\n", + " memory.set_evaluation(float(memory_dict[\"evaluation\"]))\n", + " memory.set_name(memory_dict[\"name\"])\n", + " memory.set_id(float(memory_dict[\"id\"]))\n", + "\n", + " info_json = memory_dict[\"I\"]\n", + " info = json.loads(info_json)\n", + "\n", + " memory.set_info(info)\n", + "\n", + " memory.timestamp = float(memory_dict[\"timestamp\"])\n", + " else:\n", + " #Indica que memória existe\n", + " print(\"Cria\")\n", + " client.lpush(f\"{node}:memories\", memory_name)\n", + "\n", + " memory_dict = MemoryEncoder.to_dict(memory)\n", + " memory_dict[\"I\"] = json.dumps(memory_dict[\"I\"])\n", + " memory_dict[\"owner\"] = \"\" #node\n", + "\n", + " client.hset(f\"{mind_name}:memories:{memory_name}\", mapping=memory_dict)\n", + "\n", + " subscribe_func = lambda message : update_memory(memory_name, memory, client)\n", + " pubsub.subscribe(**{f\"{mind_name}:memories:{memory_name}:update\":subscribe_func})\n", + "\n", + " return memory" + ] + }, + { + "cell_type": "code", + "execution_count": 30, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Cria\n" + ] + } + ], + "source": [ + "memory1 = create_memory(\"node1\", \"memory1\", client, pubsub)" + ] + }, + { + "cell_type": "code", + "execution_count": 31, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "MemoryObject [idmemoryobject=0.0, timestamp=0.0, evaluation=0.0, I=None, name=]" + ] + }, + "execution_count": 31, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "memory1" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "node2 entra no jogo" + ] + }, + { + "cell_type": "code", + "execution_count": 32, + "metadata": {}, + "outputs": [], + "source": [ + "client2 = redis.Redis(decode_responses=True)\n", + "pubsub2 = client2.pubsub()" + ] + }, + { + "cell_type": "code", + "execution_count": 33, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "2" + ] + }, + "execution_count": 33, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "client2.lpush(f\"{mind_name}:nodes\", \"node2\")" + ] + }, + { + "cell_type": "code", + "execution_count": 34, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "['node2', 'node1']" + ] + }, + "execution_count": 34, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "nodes = client2.lrange(f\"{mind_name}:nodes\", 0, -1)\n", + "nodes" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "node2 tenta criar memória, percebe que existe e sincroniza" + ] + }, + { + "cell_type": "code", + "execution_count": 35, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'timestamp': '0.0',\n", + " 'evaluation': '0.0',\n", + " 'I': 'null',\n", + " 'name': '',\n", + " 'id': '0.0',\n", + " 'owner': ''}" + ] + }, + "execution_count": 35, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "client.hgetall(f\"{mind_name}:memories:memory1\")" + ] + }, + { + "cell_type": "code", + "execution_count": 36, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Copia\n" + ] + }, + { + "data": { + "text/plain": [ + "MemoryObject [idmemoryobject=0.0, timestamp=0.0, evaluation=0.0, I=None, name=]" + ] + }, + "execution_count": 36, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "node2_memory1 = create_memory(\"node2\", \"memory1\", client2, pubsub2)\n", + "\n", + "node2_memory1" + ] + }, + { + "cell_type": "code", + "execution_count": 37, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Cria\n" + ] + }, + { + "data": { + "text/plain": [ + "MemoryObject [idmemoryobject=0.0, timestamp=0.0, evaluation=0.0, I=None, name=]" + ] + }, + "execution_count": 37, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "memory2 = create_memory(\"node2\", \"memory2\", client2, pubsub2)\n", + "\n", + "memory2" + ] + }, + { + "cell_type": "code", + "execution_count": 38, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Send update\n" + ] + } + ], + "source": [ + "node2_memory1.set_info(\"INFO\")\n", + "update_memory(\"memory1\", node2_memory1, client2)" + ] + }, + { + "cell_type": "code", + "execution_count": 39, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{'type': 'subscribe', 'pattern': None, 'channel': 'default_mind:memories:memory1:update', 'data': 1}\n", + "Retrieve update\n", + "{'type': 'subscribe', 'pattern': None, 'channel': 'default_mind:memories:memory1:update', 'data': 1}\n", + "{'type': 'subscribe', 'pattern': None, 'channel': 'default_mind:memories:memory2:update', 'data': 2}\n" + ] + } + ], + "source": [ + "for _pubsub in [pubsub, pubsub2]:\n", + " msg = _pubsub.get_message()\n", + " while msg is not None:\n", + " print(msg)\n", + " msg = _pubsub.get_message()" + ] + }, + { + "cell_type": "code", + "execution_count": 40, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'timestamp': '1725638895.8791993',\n", + " 'evaluation': '0.0',\n", + " 'I': '\"INFO\"',\n", + " 'name': '',\n", + " 'id': '0.0',\n", + " 'owner': ''}" + ] + }, + "execution_count": 40, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "client.hgetall(f\"{mind_name}:memories:memory1\")" + ] + }, + { + "cell_type": "code", + "execution_count": 42, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.9" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/dev/memory_storage_codelet.ipynb b/dev/memory_storage_codelet.ipynb new file mode 100644 index 0000000..25f6692 --- /dev/null +++ b/dev/memory_storage_codelet.ipynb @@ -0,0 +1,732 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "import time\n", + "\n", + "import redis\n", + "\n", + "import cst_python as cst\n", + "from cst_python.memory_storage import MemoryStorageCodelet" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "True" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "client = redis.Redis(decode_responses=True)\n", + "client.flushall()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "```mermaid\n", + "flowchart LR\n", + "\n", + "update[Update Memory]\n", + "send[Send Memory]\n", + "retrieve[Retrieve Memory]\n", + "request[Request Memory]\n", + "handler_notify_transfer[Handler: Notify Transfer]\n", + "handler_transfer_memory[Handler: Transfer Memory]\n", + "\n", + "\n", + "update --> |\"timestamp(MS) < timestamp(local)\"| send\n", + "update --> |\"timestamp(MS) > timestamp(local)\"| retrieve\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "handler_transfer_memory --> send\n", + "\n", + "subgraph retrieveContext\n", + "retrieve --> |owner is not empty| request\n", + "\n", + "request -.->|Wait transfer event| handler_notify_transfer\n", + "\n", + "end\n", + "\n", + "```" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "mind = cst.Mind()\n", + "memory1 = mind.create_memory_object(\"Memory1\", \"\")" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "node03 Retrieve Memory1\n" + ] + } + ], + "source": [ + "ms_codelet = MemoryStorageCodelet(mind, \"node0\")\n", + "ms_codelet.time_step = 100\n", + "\n", + "mind.insert_codelet(ms_codelet)\n", + "mind.start()" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "MemoryObject [idmemoryobject=1, timestamp=1727456263799, evaluation=0.0, I=[1, 1, 1], name=Memory1]" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "memory1" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "-1" + ] + }, + "execution_count": 6, + "metadata": {}, + "output_type": "execute_result" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "node02 Updating memory Memory1\n", + "node02 Send memory Memory1\n", + "node02 Updating memory Memory1\n" + ] + } + ], + "source": [ + "memory1.set_info([1,1,1])" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [], + "source": [ + "time.sleep(1)" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'node0'}" + ] + }, + "execution_count": 8, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "client.smembers(\"default_mind:nodes\")" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'name': 'Memory1',\n", + " 'evaluation': '0.0',\n", + " 'I': '',\n", + " 'id': '0.0',\n", + " 'owner': 'node0'}" + ] + }, + "execution_count": 9, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "client.hgetall(\"default_mind:memories:Memory1\")" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [], + "source": [ + "mind2 = cst.Mind()\n", + "mind2_memory1 = mind2.create_memory_object(\"Memory1\", \"\")\n", + "mind2_ms_codelet = MemoryStorageCodelet(mind2)\n", + "mind2_ms_codelet.time_step = 100\n", + "mind2.insert_codelet(mind2_ms_codelet)\n", + "mind2.start()" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "node1 Retrieve Memory1\n", + "node1 Requesting Memory1\n", + "node0 Tranfering Memory1\n", + "node0 Send memory Memory1\n", + "node1 Updating memory Memory1\n", + "node0 Updating memory Memory1\n", + "node1 Send memory Memory1\n", + "node1 Updating memory Memory1\n", + "node0 Updating memory Memory1\n", + "node0 Retrieve Memory1\n", + "node0 INFO \"\"\n", + "node1 INFO \"\"\n" + ] + }, + { + "data": { + "text/plain": [ + "MemoryObject [idmemoryobject=0.0, timestamp=1726077369.7999365, evaluation=0.0, I=, name=Memory1]" + ] + }, + "execution_count": 11, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "mind2_memory1" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "'node1'" + ] + }, + "execution_count": 12, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "mind2_ms_codelet._node_name" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'node0', 'node1'}" + ] + }, + "execution_count": 13, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "client.smembers(\"default_mind:nodes\")" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "metadata": {}, + "outputs": [], + "source": [ + "time.sleep(1)" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "MemoryObject [idmemoryobject=0.0, timestamp=1726077369.7999365, evaluation=0.0, I=, name=Memory1]" + ] + }, + "execution_count": 15, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "mind2_memory1" + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'name': 'Memory1',\n", + " 'evaluation': '0.0',\n", + " 'I': '\"\"',\n", + " 'id': '0',\n", + " 'owner': '',\n", + " 'timestamp': '1726077369.5866976'}" + ] + }, + "execution_count": 16, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "client.hgetall(\"default_mind:memories:Memory1\")" + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "-1" + ] + }, + "execution_count": 17, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "memory1.set_info(\"INFO\")" + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "node0 Updating memory Memory1\n", + "node0 Send memory Memory1\n", + "node1 Updating memory Memory1\n", + "node0 Updating memory Memory1\n", + "node1 Retrieve Memory1\n", + "node1 INFO INFO \"INFO\"\n" + ] + } + ], + "source": [ + "time.sleep(1)" + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'name': 'Memory1',\n", + " 'evaluation': '0.0',\n", + " 'I': '\"INFO\"',\n", + " 'id': '0.0',\n", + " 'owner': '',\n", + " 'timestamp': '1726077370.926107'}" + ] + }, + "execution_count": 19, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "client.hgetall(\"default_mind:memories:Memory1\")" + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "MemoryObject [idmemoryobject=0.0, timestamp=1726077371.003417, evaluation=0.0, I=INFO, name=Memory1]" + ] + }, + "execution_count": 20, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "mind2_memory1" + ] + }, + { + "cell_type": "code", + "execution_count": 21, + "metadata": {}, + "outputs": [], + "source": [ + "time.sleep(1)" + ] + }, + { + "cell_type": "code", + "execution_count": 22, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'name': 'Memory1',\n", + " 'evaluation': '0.0',\n", + " 'I': '\"INFO\"',\n", + " 'id': '0.0',\n", + " 'owner': '',\n", + " 'timestamp': '1726077370.926107'}" + ] + }, + "execution_count": 22, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "client.hgetall(\"default_mind:memories:Memory1\")" + ] + }, + { + "cell_type": "code", + "execution_count": 23, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "node1 Updating memory Memory1\n", + "node1 Send memory Memory1\n", + "node0 Updating memory Memory1\n", + "node1 Updating memory Memory1\n", + "node0 Retrieve Memory1\n", + "node0 INFO INFO2 \"INFO2\"\n" + ] + } + ], + "source": [ + "mind2_memory1.set_info(\"INFO2\")\n", + "time.sleep(1)" + ] + }, + { + "cell_type": "code", + "execution_count": 24, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'name': 'Memory1',\n", + " 'evaluation': '0.0',\n", + " 'I': '\"INFO2\"',\n", + " 'id': '0.0',\n", + " 'owner': '',\n", + " 'timestamp': '1726077373.0085642'}" + ] + }, + "execution_count": 24, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "client.hgetall(\"default_mind:memories:Memory1\")" + ] + }, + { + "cell_type": "code", + "execution_count": 25, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "MemoryObject [idmemoryobject=0.0, timestamp=1726077373.1104536, evaluation=0.0, I=INFO2, name=Memory1]" + ] + }, + "execution_count": 25, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "memory1" + ] + }, + { + "cell_type": "code", + "execution_count": 26, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "node0 Updating memory Memory1\n", + "node0 Send memory Memory1\n", + "node0 Updating memory Memory1\n", + "node1 Updating memory Memory1\n", + "node1 Retrieve Memory1\n", + "node1 INFO 1 1\n" + ] + } + ], + "source": [ + "memory1.set_info(1)\n", + "time.sleep(1)" + ] + }, + { + "cell_type": "code", + "execution_count": 27, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "1" + ] + }, + "execution_count": 27, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "mind2_memory1.get_info()" + ] + }, + { + "cell_type": "code", + "execution_count": 28, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "node0 Updating memory Memory1\n", + "node0 Send memory Memory1\n", + "node0 Updating memory Memory1\n", + "node1 Updating memory Memory1\n", + "node1 Retrieve Memory1\n", + "node1 INFO 1 \"1\"\n" + ] + } + ], + "source": [ + "memory1.set_info(\"1\")\n", + "time.sleep(1)" + ] + }, + { + "cell_type": "code", + "execution_count": 29, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "'1'" + ] + }, + "execution_count": 29, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "mind2_memory1.get_info()" + ] + }, + { + "cell_type": "code", + "execution_count": 30, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "node0 Updating memory Memory1\n", + "node0 Send memory Memory1\n", + "node1 Updating memory Memory1\n", + "node0 Updating memory Memory1\n", + "node1 Retrieve Memory1\n", + "node1 INFO True true\n" + ] + } + ], + "source": [ + "memory1.set_info(True)\n", + "time.sleep(1)\n" + ] + }, + { + "cell_type": "code", + "execution_count": 31, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "(True, bool)" + ] + }, + "execution_count": 31, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "mind2_memory1.get_info(), type(mind2_memory1.get_info())" + ] + }, + { + "cell_type": "code", + "execution_count": 32, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "node0 Updating memory Memory1\n", + "node0 Send memory Memory1\n", + "node1 Updating memory Memory1\n", + "node0 Updating memory Memory1\n", + "node1 Retrieve Memory1\n", + "node1 INFO [1, 2, 3] [1, 2, 3]\n" + ] + }, + { + "data": { + "text/plain": [ + "([1, 2, 3], list)" + ] + }, + "execution_count": 32, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "memory1.set_info([1,2,3])\n", + "time.sleep(1)\n", + "mind2_memory1.get_info(), type(mind2_memory1.get_info())" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.9" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/dev/weak_test.py b/dev/weak_test.py new file mode 100644 index 0000000..1669116 --- /dev/null +++ b/dev/weak_test.py @@ -0,0 +1,18 @@ +import weakref + +class Dummy: + + def __init__(self, value): + self.value = value + +weak_dict = weakref.WeakValueDictionary() + +var = Dummy(1) + +weak_dict["var"] = var + +print("var" in weak_dict) + +del var + +print("var" in weak_dict) \ No newline at end of file diff --git a/src/cst_python/memory_storage/__init__.py b/src/cst_python/memory_storage/__init__.py new file mode 100644 index 0000000..5abd58d --- /dev/null +++ b/src/cst_python/memory_storage/__init__.py @@ -0,0 +1 @@ +from .memory_storage import MemoryStorageCodelet \ No newline at end of file diff --git a/src/cst_python/memory_storage/memory_encoder.py b/src/cst_python/memory_storage/memory_encoder.py new file mode 100644 index 0000000..7c5a9e9 --- /dev/null +++ b/src/cst_python/memory_storage/memory_encoder.py @@ -0,0 +1,32 @@ +import json +from typing import Any + +from cst_python.core.entities import Memory + +class MemoryEncoder(json.JSONEncoder): + def default(self, memory:Memory): + return MemoryEncoder.to_dict(memory) + + @staticmethod + def to_dict(memory:Memory, jsonify_info:bool=False): + data = { + "timestamp": memory.get_timestamp(), + "evaluation": memory.get_evaluation(), + "I": memory.get_info(), + "name": memory.get_name(), + "id": memory.get_id() + } + + if jsonify_info: + data["I"] = json.dumps(data["I"]) + + return data + + def load_memory(memory:Memory, memory_dict:dict[str,Any], load_json:bool=True): + memory.set_evaluation(float(memory_dict["evaluation"])) + memory.set_id(int(memory_dict["id"])) + + info_json = memory_dict["I"] + info = json.loads(info_json) + + memory.set_info(info) diff --git a/src/cst_python/memory_storage/memory_storage.py b/src/cst_python/memory_storage/memory_storage.py new file mode 100644 index 0000000..7abb474 --- /dev/null +++ b/src/cst_python/memory_storage/memory_storage.py @@ -0,0 +1,212 @@ +import json +import weakref +import json +import threading +from concurrent.futures import ThreadPoolExecutor +from typing import Optional, cast + +import redis + +from cst_python.core.entities import Codelet, Mind, Memory, MemoryObject +from .memory_encoder import MemoryEncoder + +class MemoryStorageCodelet(Codelet): + def __init__(self, mind:Mind, node_name:Optional[str]=None, mind_name:Optional[str]=None, request_timeout:float=500e-3) -> None: + super().__init__() + + self._mind = mind + self._request_timeout = request_timeout + + if mind_name is None: + mind_name = "default_mind" + self._mind_name = cast(str, mind_name) + + self._memories : weakref.WeakValueDictionary[str, Memory] = weakref.WeakValueDictionary() + + self._client = redis.Redis(decode_responses=True) + self._pubsub = self._client.pubsub() + self._pubsub_thread : redis.client.PubSubWorkerThread = self._pubsub.run_in_thread() + + base_name = node_name + if base_name is None: + base_name = "node" + + if self._client.sismember(f"{mind_name}:nodes", node_name): + node_number = self._client.scard(f"{mind_name}:nodes") + node_name = base_name+str(node_number) + while self._client.sismember(f"{mind_name}:nodes", node_name): + node_number += 1 + node_name = base_name+str(node_number) + + + self._node_name = cast(str, node_name) + + self._client.sadd(f"{mind_name}:nodes", node_name) + + transfer_service_addr = f"{self._mind_name}:nodes:{node_name}:transfer_memory" + self._pubsub.subscribe(**{transfer_service_addr:self._handler_transfer_memory}) + + transfer_done_addr = f"{self._mind_name}:nodes:{node_name}:transfer_done" + self._pubsub.subscribe(**{transfer_done_addr:self._handler_notify_transfer}) + + self._last_update : dict[str, int] = {} + self._waiting_retrieve : set[str] = set() + + self._retrieve_executor = ThreadPoolExecutor(3) + + self._waiting_request_events : dict[str, threading.Event] = {} + + self._request = None + + def calculate_activation(self) -> None: + pass + + def access_memory_objects(self) -> None: + pass + + def proc(self) -> None: + + #Check new memories + + mind_memories = {} + for memory in self._mind.raw_memory.all_memories: + if memory.get_name() == "": #No name -> No MS + continue + + mind_memories[memory.get_name()] = memory + + mind_memories_names = set(mind_memories.keys()) + memories_names = set(self._memories.keys()) + + #Check only not here (memories_names not in mind should be garbage collected) + difference = mind_memories_names - memories_names + for memory_name in difference: + memory : Memory = mind_memories[memory_name] + self._memories[memory_name] = memory + + if self._client.exists(f"{self._mind_name}:memories:{memory_name}"): + self._retrieve_executor.submit(self._retrieve_memory, memory) + + else: #Send impostor with owner + memory_impostor = {"name":memory.get_name(), + "evaluation" : 0.0, + "I": "", + "id" : 0, + "owner": self._node_name} + + self._client.hset(f"{self._mind_name}:memories:{memory_name}", mapping=memory_impostor) + + subscribe_func = lambda message : self.update_memory(memory_name) + self._pubsub.subscribe(**{f"{self._mind_name}:memories:{memory_name}:update":subscribe_func}) + + #Update memories + to_update = self._last_update.keys() + for memory_name in to_update: + if memory_name not in self._memories: + del self._last_update[memory_name] + continue + + memory = self._memories[memory_name] + if memory.get_timestamp() > self._last_update[memory_name]: + self.update_memory(memory_name) + + def update_memory(self, memory_name:str) -> None: + print(self._node_name, "Updating memory", memory_name) + + if memory_name not in self._memories: + self._pubsub.unsubscribe(f"{self._mind_name}:memories:{memory_name}:update") + + timestamp = float(self._client.hget(f"{self._mind_name}:memories:{memory_name}", "timestamp")) + memory = self._memories[memory_name] + memory_timestamp = memory.get_timestamp() + + if memory_timestamp < timestamp: + self._retrieve_executor.submit(self._retrieve_memory, memory) + + elif memory_timestamp> timestamp: + self._send_memory(memory) + + self._last_update[memory_name] = memory.get_timestamp() + + def _send_memory(self, memory:Memory) -> None: + memory_name = memory.get_name() + print(self._node_name, "Send memory", memory_name) + + memory_dict = MemoryEncoder.to_dict(memory, jsonify_info=True) + memory_dict["owner"] = "" + + + self._client.hset(f"{self._mind_name}:memories:{memory_name}", mapping=memory_dict) + self._client.publish(f"{self._mind_name}:memories:{memory_name}:update", "") + + self._last_update[memory_name] = memory.get_timestamp() + + + def _retrieve_memory(self, memory:Memory) -> None: + memory_name = memory.get_name() + + print(self._node_name, "Retrieve", memory_name) + + if memory_name in self._waiting_retrieve: + return + self._waiting_retrieve.add(memory_name) + + memory_dict = self._client.hgetall(f"{self._mind_name}:memories:{memory_name}") + + if memory_dict["owner"] != "": + event = threading.Event() + self._waiting_request_events[memory_name] = event + self._request_memory(memory_name, memory_dict["owner"]) + + if not event.wait(timeout=self._request_timeout): + print(self._node_name, "Request failed", memory_name) + #Request failed + self._send_memory(memory) + return + + memory_dict = self._client.hgetall(f"{self._mind_name}:memories:{memory_name}") + + MemoryEncoder.load_memory(memory, memory_dict) + + self._last_update[memory_name] = memory.get_timestamp() + + self._waiting_retrieve.remove(memory_name) + + def _request_memory(self, memory_name:str, owner_name:str) -> None: + print(self._node_name, "Requesting", memory_name) + + request_addr = f"{self._mind_name}:nodes:{owner_name}:transfer_memory" + + request_dict = {"memory_name":memory_name, "node":self._node_name} + request = json.dumps(request_dict) + self._client.publish(request_addr, request) + + def _handler_notify_transfer(self, message:str) -> None: + memory_name = message["data"] + if memory_name in self._waiting_request_events: + event = self._waiting_request_events[memory_name] + event.set() + del self._waiting_request_events[memory_name] + + def _handler_transfer_memory(self, message) -> None: + request = json.loads(message["data"]) + + memory_name = request["memory_name"] + requesting_node = request["node"] + + print(self._node_name, "Tranfering", memory_name) + + if memory_name in self._memories: + memory = self._memories[memory_name] + else: + memory = MemoryObject() + memory.set_name(memory_name) + + self._send_memory(memory) + + response_addr = f"{self._mind_name}:nodes:{requesting_node}:transfer_done" + self._client.publish(response_addr, memory_name) + + def __del__(self) -> None: + self._pubsub_thread.stop() + self._retrieve_executor.shutdown(cancel_futures=True) \ No newline at end of file