From be524759327d5594bbbf6baf2de41b480f9663ab Mon Sep 17 00:00:00 2001 From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com> Date: Fri, 30 Aug 2024 18:37:39 -0300 Subject: [PATCH] Mechanism exploration --- dev/memory_storage.ipynb | 465 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 465 insertions(+) create mode 100644 dev/memory_storage.ipynb diff --git a/dev/memory_storage.ipynb b/dev/memory_storage.ipynb new file mode 100644 index 0000000..97486ca --- /dev/null +++ b/dev/memory_storage.ipynb @@ -0,0 +1,465 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "import redis\n", + "import cst_python as cst\n", + "import json" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "client = redis.Redis(decode_responses=True)\n", + "pubsub = client.pubsub()" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "True" + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "client.flushall()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Node1 publica que existe" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "1" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "client.lpush(\"nodes\", \"node1\")" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [], + "source": [ + "class MemoryEncoder(json.JSONEncoder):\n", + " def default(self, memory:cst.core.entities.Memory):\n", + " return MemoryEncoder.to_dict(memory)\n", + " \n", + " @staticmethod\n", + " def to_dict(memory:cst.core.entities.Memory):\n", + " data = {\n", + " \"timestamp\": memory.get_timestamp(),\n", + " \"evaluation\": memory.get_evaluation(),\n", + " \"I\": memory.get_info(),\n", + " \"name\": memory.get_name(),\n", + " \"id\": memory.get_id()\n", + " }\n", + "\n", + " return data" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Node1 checa se memória com id \"memory1\" existe. Como não, publica key:" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [ + "def update_memory(memory_name, memory_object:cst.MemoryObject, client:redis.Redis):\n", + " timestamp = float(client.hget(f\"memories:{memory_name}\", \"timestamp\"))\n", + " \n", + " if memory_object.timestamp < timestamp:\n", + " print(\"Retrieve update\")\n", + " memory_dict = client.hgetall(f\"memories:{memory_name}\")\n", + "\n", + " memory_object.set_evaluation(float(memory_dict[\"evaluation\"]))\n", + " memory_object.set_name(memory_dict[\"name\"])\n", + " memory_object.set_id(float(memory_dict[\"id\"]))\n", + "\n", + " info_json = memory_dict[\"I\"]\n", + " info = json.loads(info_json)\n", + "\n", + " memory_object.set_info(info)\n", + "\n", + " memory_object.timestamp = float(memory_dict[\"timestamp\"])\n", + " elif memory_object.timestamp > timestamp:\n", + " print(\"Send update\")\n", + " memory_dict = MemoryEncoder.to_dict(memory_object)\n", + " memory_dict[\"I\"] = json.dumps(memory_dict[\"I\"])\n", + "\n", + " client.hset(f\"memories:{memory_name}\", mapping=memory_dict)\n", + " client.publish(f\"memories:{memory_name}:update\", \"\")" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [], + "source": [ + "def create_memory(node, memory_name, client:redis.Redis, pubsub:redis.client.PubSub) -> cst.MemoryObject:\n", + " nodes = client.lrange(\"nodes\", 0, -1)\n", + "\n", + " memory_exist = False\n", + " memory_node = \"\"\n", + "\n", + " for n in nodes:\n", + " if n == node:\n", + " continue\n", + " \n", + " if memory_name in client.lrange(f\"{n}:memories\", 0, -1):\n", + " memory_exist = True\n", + " memory_node = n\n", + "\n", + " break\n", + "\n", + " memory = cst.MemoryObject()\n", + "\n", + " if memory_exist:\n", + " #Copia memória\n", + " print(\"Copia\")\n", + "\n", + " memory_dict = client.hgetall(f\"memories:{memory_name}\")\n", + "\n", + " memory.set_evaluation(float(memory_dict[\"evaluation\"]))\n", + " memory.set_name(memory_dict[\"name\"])\n", + " memory.set_id(float(memory_dict[\"id\"]))\n", + "\n", + " info_json = memory_dict[\"I\"]\n", + " info = json.loads(info_json)\n", + "\n", + " memory.set_info(info)\n", + "\n", + " memory.timestamp = float(memory_dict[\"timestamp\"])\n", + " \n", + " else:\n", + " #Indica que memória existe\n", + " print(\"Cria\")\n", + " client.lpush(f\"{node}:memories\", memory_name)\n", + "\n", + " memory_dict = MemoryEncoder.to_dict(memory)\n", + " memory_dict[\"I\"] = json.dumps(memory_dict[\"I\"])\n", + "\n", + " client.hset(f\"memories:{memory_name}\", mapping=memory_dict)\n", + "\n", + " subscribe_func = lambda message : update_memory(memory_name, memory, client)\n", + " pubsub.subscribe(**{f\"memories:{memory_name}:update\":subscribe_func})\n", + "\n", + " return memory" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Cria\n" + ] + } + ], + "source": [ + "memory1 = create_memory(\"node1\", \"memory1\", client, pubsub)" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "MemoryObject [idmemoryobject=0.0, timestamp=0.0, evaluation=0.0, I=None, name=]" + ] + }, + "execution_count": 9, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "memory1" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "node2 entra no jogo" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [], + "source": [ + "client2 = redis.Redis()\n", + "pubsub2 = client2.pubsub()" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "2" + ] + }, + "execution_count": 11, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "client2.lpush(\"nodes\", \"node2\")" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "[b'node2', b'node1']" + ] + }, + "execution_count": 12, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "nodes = client2.lrange(\"nodes\", 0, -1)\n", + "nodes" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "node2 tenta criar memória, percebe que existe e sincroniza" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Cria\n" + ] + }, + { + "data": { + "text/plain": [ + "MemoryObject [idmemoryobject=0.0, timestamp=0.0, evaluation=0.0, I=None, name=]" + ] + }, + "execution_count": 13, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "node2_memory1 = create_memory(\"node2\", \"memory1\", client2, pubsub2)\n", + "\n", + "node2_memory1" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Cria\n" + ] + }, + { + "data": { + "text/plain": [ + "MemoryObject [idmemoryobject=0.0, timestamp=0.0, evaluation=0.0, I=None, name=]" + ] + }, + "execution_count": 14, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "memory2 = create_memory(\"node2\", \"memory2\", client2, pubsub2)\n", + "\n", + "memory2" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Send update\n" + ] + } + ], + "source": [ + "node2_memory1.set_info(\"INFO\")\n", + "update_memory(\"memory1\", node2_memory1, client2)" + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{'type': 'subscribe', 'pattern': None, 'channel': 'memories:memory1:update', 'data': 1}\n", + "Retrieve update\n", + "{'type': 'subscribe', 'pattern': None, 'channel': b'memories:memory1:update', 'data': 1}\n", + "{'type': 'subscribe', 'pattern': None, 'channel': b'memories:memory2:update', 'data': 2}\n" + ] + } + ], + "source": [ + "for _pubsub in [pubsub, pubsub2]:\n", + " msg = _pubsub.get_message()\n", + " while msg is not None:\n", + " print(msg)\n", + " msg = _pubsub.get_message()" + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'timestamp': '1725053432.9742534',\n", + " 'evaluation': '0.0',\n", + " 'I': '\"INFO\"',\n", + " 'name': '',\n", + " 'id': '0.0'}" + ] + }, + "execution_count": 17, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "client.hgetall(\"memories:memory1\")" + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "MemoryObject [idmemoryobject=0.0, timestamp=1725053432.9742534, evaluation=0.0, I=INFO, name=]" + ] + }, + "execution_count": 18, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "memory1" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.9" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +}