diff --git a/src/cst_python/memory_storage/logical_time.py b/src/cst_python/memory_storage/logical_time.py index 86e835e..d8ce2cd 100644 --- a/src/cst_python/memory_storage/logical_time.py +++ b/src/cst_python/memory_storage/logical_time.py @@ -5,9 +5,18 @@ class LogicalTime(abc.ABC): + ''' + A logical time for distributed communication. + ''' @abc.abstractmethod def increment(self) -> "LogicalTime": + ''' + Returns a time with the self time incremented by one. + + Returns: + LogicalTime: incremented time. + ''' ... @@ -18,11 +27,31 @@ def __str__(self) -> str: @classmethod @abc.abstractmethod def from_str(cls, string:str) -> "LogicalTime": + ''' + Creates a instance from a string. + + Args: + string (str): String to create time, + generated with str(LogicalTime). + + Returns: + LogicalTime: Created time. + ''' ... @classmethod @abc.abstractmethod - def syncronize(cls, time0, time1) -> "LogicalTime": + def syncronize(cls, time0:"LogicalTime", time1:"LogicalTime") -> "LogicalTime": + ''' + Compares two times, and return the current time. + + Args: + time0 (LogicalTime): first time to compare. + time1 (LogicalTime): second time to compare. + + Returns: + LogicalTime: current time. + ''' ... @abc.abstractmethod @@ -48,6 +77,9 @@ def __ge__(self, other) -> bool: @functools.total_ordering class LamportTime(LogicalTime): + ''' + Logical time implementation using Lamport times. + ''' #Methods that total_ordering will overwrite __le__ = object.__lt__ # type: ignore @@ -56,6 +88,12 @@ class LamportTime(LogicalTime): def __init__(self, initial_time:int=0): + ''' + LamportTime initializer. + + Args: + initial_time (int, optional): time to start the clock. Defaults to 0. + ''' super().__init__() self._time = initial_time @@ -77,6 +115,9 @@ def from_str(cls, string:str) -> "LamportTime": @classmethod def syncronize(cls, time0, time1) -> "LamportTime": + if not (isinstance(time0, LamportTime) and isinstance(time1, LamportTime)): + raise ValueError("LamportTime can only synchonize LamportTime instances") + new_time = 0 if time0 < time1: new_time = time1._time diff --git a/src/cst_python/memory_storage/memory_encoder.py b/src/cst_python/memory_storage/memory_encoder.py index 2a58dca..030b8ae 100644 --- a/src/cst_python/memory_storage/memory_encoder.py +++ b/src/cst_python/memory_storage/memory_encoder.py @@ -4,11 +4,25 @@ from cst_python.core.entities import Memory class MemoryEncoder(json.JSONEncoder): + ''' + Encodes and decodes Memories. + ''' def default(self, memory:Memory): return MemoryEncoder.to_dict(memory) @staticmethod - def to_dict(memory:Memory, jsonify_info:bool=False): + def to_dict(memory:Memory, jsonify_info:bool=False) -> dict[str, Any]: + ''' + Encodes a memory to a dict. + + Args: + memory (Memory): memory to encode. + jsonify_info (bool, optional): if True, dumps the info to JSON + before return. Defaults to False. + + Returns: + dict[str, Any]: the encoded memory. + ''' data = { "timestamp": memory.get_timestamp(), "evaluation": memory.get_evaluation(), @@ -22,8 +36,16 @@ def to_dict(memory:Memory, jsonify_info:bool=False): return data + @staticmethod - def load_memory(memory:Memory, memory_dict:dict[str,Any], load_json:bool=True): + def load_memory(memory:Memory, memory_dict:dict[str,Any]): + ''' + Load a memory from a dict. + + Args: + memory (Memory): memory to store the loaded info. + memory_dict (dict[str,Any]): dict encoded memory. + ''' memory.set_evaluation(float(memory_dict["evaluation"])) memory.set_id(int(memory_dict["id"])) diff --git a/src/cst_python/memory_storage/memory_storage.py b/src/cst_python/memory_storage/memory_storage.py index 1895bd5..cf8d22c 100644 --- a/src/cst_python/memory_storage/memory_storage.py +++ b/src/cst_python/memory_storage/memory_storage.py @@ -17,9 +17,30 @@ logger.setLevel(logging.DEBUG) class MemoryStorageCodelet(Codelet): + ''' + Synchonizes local memories with a Redis database. + + When using MemoryStorage, each local CST instance is called a node. + + The collection of synchonized nodes is a mind. + A single Redis instance can support multiple minds with unique names + ''' + def __init__(self, mind:Mind, node_name:Optional[str]=None, mind_name:Optional[str]=None, request_timeout:float=500e-3, **redis_args) -> None: + ''' + MemoryStorageCodelet initializer. + + Args: + mind (Mind): agent mind, used to monitor memories. + node_name (Optional[str], optional): name of the local node in the network. + If None, creates a unique name with 'node{int}'. Defaults to None. + mind_name (Optional[str], optional): name of the network mind. + If None, uses 'default_mind'. Defaults to None. + request_timeout (float, optional): time before timeout when + requesting a memory synchonization. Defaults to 500e-3. + ''' super().__init__() self._mind = mind @@ -38,6 +59,7 @@ def __init__(self, mind:Mind, self._pubsub = self._client.pubsub() self._pubsub_thread : redis.client.PubSubWorkerThread = self._pubsub.run_in_thread(daemon=True) + # Creates node name if node_name is None: node_name = "node" base_name = node_name @@ -54,12 +76,15 @@ def __init__(self, mind:Mind, self._client.sadd(f"{mind_name}:nodes", node_name) + # Creates transfer channels subscription transfer_service_addr = f"{self._mind_name}:nodes:{node_name}:transfer_memory" self._pubsub.subscribe(**{transfer_service_addr:self._handler_transfer_memory}) transfer_done_addr = f"{self._mind_name}:nodes:{node_name}:transfer_done" self._pubsub.subscribe(**{transfer_done_addr:self._handler_notify_transfer}) + # Initalize variables + self._last_update : dict[str, int] = {} self._memory_logical_time : dict[str, LogicalTime] = {} self._waiting_retrieve : set[str] = set() @@ -131,6 +156,16 @@ def proc(self) -> None: self.update_memory(memory_name) def update_memory(self, memory_name:str) -> None: + ''' + Updates a memory, sending or retrieving the memory data + to/from the database. + + Performs a time comparison with the local data and storage + data to decide whether to send or retrieve the data. + + Args: + memory_name (str): name of the memory to synchonize. + ''' logger.info(f"Updating memory [{memory_name}@{self._node_name}]") if memory_name not in self._memories: @@ -154,10 +189,16 @@ def update_memory(self, memory_name:str) -> None: def _send_memory(self, memory:Memory) -> None: + ''' + Sends a memory data to the storage. + + Args: + memory (Memory): memory to send. + ''' memory_name = memory.get_name() logger.info(f"Sending memory [{memory_name}@{self._node_name}]") - memory_dict = MemoryEncoder.to_dict(memory, jsonify_info=True) + memory_dict = cast(dict[str|bytes, int|float|str], MemoryEncoder.to_dict(memory, jsonify_info=True)) memory_dict["owner"] = "" memory_dict["logical_time"] = str(self._memory_logical_time[memory_name]) @@ -169,6 +210,14 @@ def _send_memory(self, memory:Memory) -> None: def _retrieve_memory(self, memory:Memory) -> None: + ''' + Retrieves a memory data from the storage. + + Blocks the application, it is advisable to use a separate thread to call the method. + + Args: + memory (Memory): memory to retrieve data. + ''' memory_name = memory.get_name() logger.info(f"Retrieving memory [{memory_name}@{self._node_name}]") @@ -201,6 +250,13 @@ def _retrieve_memory(self, memory:Memory) -> None: self._waiting_retrieve.remove(memory_name) def _request_memory(self, memory_name:str, owner_name:str) -> None: + ''' + Requests another node to send its local memory to storage. + + Args: + memory_name (str): name of the memory to request. + owner_name (str): node owning the memory. + ''' logger.info(f"Requesting memory [{memory_name}@{owner_name} to {self._node_name}]") request_addr = f"{self._mind_name}:nodes:{owner_name}:transfer_memory" @@ -211,6 +267,12 @@ def _request_memory(self, memory_name:str, owner_name:str) -> None: self._client.publish(request_addr, request) def _handler_notify_transfer(self, message:dict[str,str]) -> None: + ''' + Handles a message in the notify transfer channel. + + Args: + message (dict[str,str]): message received in the channel. + ''' data = data = json.loads(message["data"]) if "logical_time" in data: message_time = LamportTime.from_str(data["logical_time"]) @@ -224,6 +286,12 @@ def _handler_notify_transfer(self, message:dict[str,str]) -> None: def _handler_transfer_memory(self, message:dict[str,str]) -> None: + ''' + Handles a message in the transfer memory channel. + + Args: + message (dict[str,str]): message received in the channel. + ''' data = json.loads(message["data"]) if "logical_time" in data: message_time = LamportTime.from_str(data["logical_time"])