Skip to content

Commit

Permalink
MS documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
EltonCN committed Dec 2, 2024
1 parent 0588cd5 commit abb63f2
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 4 deletions.
43 changes: 42 additions & 1 deletion src/cst_python/memory_storage/logical_time.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,18 @@


class LogicalTime(abc.ABC):
'''
A logical time for distributed communication.
'''

@abc.abstractmethod
def increment(self) -> "LogicalTime":
'''
Returns a time with the self time incremented by one.
Returns:
LogicalTime: incremented time.
'''
...


Expand All @@ -18,11 +27,31 @@ def __str__(self) -> str:
@classmethod
@abc.abstractmethod
def from_str(cls, string:str) -> "LogicalTime":
'''
Creates a instance from a string.
Args:
string (str): String to create time,
generated with str(LogicalTime).
Returns:
LogicalTime: Created time.
'''
...

@classmethod
@abc.abstractmethod
def syncronize(cls, time0, time1) -> "LogicalTime":
def syncronize(cls, time0:"LogicalTime", time1:"LogicalTime") -> "LogicalTime":
'''
Compares two times, and return the current time.
Args:
time0 (LogicalTime): first time to compare.
time1 (LogicalTime): second time to compare.
Returns:
LogicalTime: current time.
'''
...

@abc.abstractmethod
Expand All @@ -48,6 +77,9 @@ def __ge__(self, other) -> bool:

@functools.total_ordering
class LamportTime(LogicalTime):
'''
Logical time implementation using Lamport times.
'''

#Methods that total_ordering will overwrite
__le__ = object.__lt__ # type: ignore
Expand All @@ -56,6 +88,12 @@ class LamportTime(LogicalTime):


def __init__(self, initial_time:int=0):
'''
LamportTime initializer.
Args:
initial_time (int, optional): time to start the clock. Defaults to 0.
'''
super().__init__()
self._time = initial_time

Expand All @@ -77,6 +115,9 @@ def from_str(cls, string:str) -> "LamportTime":

@classmethod
def syncronize(cls, time0, time1) -> "LamportTime":
if not (isinstance(time0, LamportTime) and isinstance(time1, LamportTime)):
raise ValueError("LamportTime can only synchonize LamportTime instances")

new_time = 0
if time0 < time1:
new_time = time1._time
Expand Down
26 changes: 24 additions & 2 deletions src/cst_python/memory_storage/memory_encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,25 @@
from cst_python.core.entities import Memory

class MemoryEncoder(json.JSONEncoder):
'''
Encodes and decodes Memories.
'''
def default(self, memory:Memory):
return MemoryEncoder.to_dict(memory)

@staticmethod
def to_dict(memory:Memory, jsonify_info:bool=False):
def to_dict(memory:Memory, jsonify_info:bool=False) -> dict[str, Any]:
'''
Encodes a memory to a dict.
Args:
memory (Memory): memory to encode.
jsonify_info (bool, optional): if True, dumps the info to JSON
before return. Defaults to False.
Returns:
dict[str, Any]: the encoded memory.
'''
data = {
"timestamp": memory.get_timestamp(),
"evaluation": memory.get_evaluation(),
Expand All @@ -22,8 +36,16 @@ def to_dict(memory:Memory, jsonify_info:bool=False):

return data


@staticmethod
def load_memory(memory:Memory, memory_dict:dict[str,Any], load_json:bool=True):
def load_memory(memory:Memory, memory_dict:dict[str,Any]):
'''
Load a memory from a dict.
Args:
memory (Memory): memory to store the loaded info.
memory_dict (dict[str,Any]): dict encoded memory.
'''
memory.set_evaluation(float(memory_dict["evaluation"]))
memory.set_id(int(memory_dict["id"]))

Expand Down
70 changes: 69 additions & 1 deletion src/cst_python/memory_storage/memory_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,30 @@
logger.setLevel(logging.DEBUG)

class MemoryStorageCodelet(Codelet):
'''
Synchonizes local memories with a Redis database.
When using MemoryStorage, each local CST instance is called a node.
The collection of synchonized nodes is a mind.
A single Redis instance can support multiple minds with unique names
'''

def __init__(self, mind:Mind,
node_name:Optional[str]=None, mind_name:Optional[str]=None,
request_timeout:float=500e-3, **redis_args) -> None:
'''
MemoryStorageCodelet initializer.
Args:
mind (Mind): agent mind, used to monitor memories.
node_name (Optional[str], optional): name of the local node in the network.
If None, creates a unique name with 'node{int}'. Defaults to None.
mind_name (Optional[str], optional): name of the network mind.
If None, uses 'default_mind'. Defaults to None.
request_timeout (float, optional): time before timeout when
requesting a memory synchonization. Defaults to 500e-3.
'''
super().__init__()

self._mind = mind
Expand All @@ -38,6 +59,7 @@ def __init__(self, mind:Mind,
self._pubsub = self._client.pubsub()
self._pubsub_thread : redis.client.PubSubWorkerThread = self._pubsub.run_in_thread(daemon=True)

# Creates node name
if node_name is None:
node_name = "node"
base_name = node_name
Expand All @@ -54,12 +76,15 @@ def __init__(self, mind:Mind,

self._client.sadd(f"{mind_name}:nodes", node_name)

# Creates transfer channels subscription
transfer_service_addr = f"{self._mind_name}:nodes:{node_name}:transfer_memory"
self._pubsub.subscribe(**{transfer_service_addr:self._handler_transfer_memory})

transfer_done_addr = f"{self._mind_name}:nodes:{node_name}:transfer_done"
self._pubsub.subscribe(**{transfer_done_addr:self._handler_notify_transfer})

# Initalize variables

self._last_update : dict[str, int] = {}
self._memory_logical_time : dict[str, LogicalTime] = {}
self._waiting_retrieve : set[str] = set()
Expand Down Expand Up @@ -131,6 +156,16 @@ def proc(self) -> None:
self.update_memory(memory_name)

def update_memory(self, memory_name:str) -> None:
'''
Updates a memory, sending or retrieving the memory data
to/from the database.
Performs a time comparison with the local data and storage
data to decide whether to send or retrieve the data.
Args:
memory_name (str): name of the memory to synchonize.
'''
logger.info(f"Updating memory [{memory_name}@{self._node_name}]")

if memory_name not in self._memories:
Expand All @@ -154,10 +189,16 @@ def update_memory(self, memory_name:str) -> None:


def _send_memory(self, memory:Memory) -> None:
'''
Sends a memory data to the storage.
Args:
memory (Memory): memory to send.
'''
memory_name = memory.get_name()
logger.info(f"Sending memory [{memory_name}@{self._node_name}]")

memory_dict = MemoryEncoder.to_dict(memory, jsonify_info=True)
memory_dict = cast(dict[str|bytes, int|float|str], MemoryEncoder.to_dict(memory, jsonify_info=True))
memory_dict["owner"] = ""
memory_dict["logical_time"] = str(self._memory_logical_time[memory_name])

Expand All @@ -169,6 +210,14 @@ def _send_memory(self, memory:Memory) -> None:


def _retrieve_memory(self, memory:Memory) -> None:
'''
Retrieves a memory data from the storage.
Blocks the application, it is advisable to use a separate thread to call the method.
Args:
memory (Memory): memory to retrieve data.
'''
memory_name = memory.get_name()
logger.info(f"Retrieving memory [{memory_name}@{self._node_name}]")

Expand Down Expand Up @@ -201,6 +250,13 @@ def _retrieve_memory(self, memory:Memory) -> None:
self._waiting_retrieve.remove(memory_name)

def _request_memory(self, memory_name:str, owner_name:str) -> None:
'''
Requests another node to send its local memory to storage.
Args:
memory_name (str): name of the memory to request.
owner_name (str): node owning the memory.
'''
logger.info(f"Requesting memory [{memory_name}@{owner_name} to {self._node_name}]")

request_addr = f"{self._mind_name}:nodes:{owner_name}:transfer_memory"
Expand All @@ -211,6 +267,12 @@ def _request_memory(self, memory_name:str, owner_name:str) -> None:
self._client.publish(request_addr, request)

def _handler_notify_transfer(self, message:dict[str,str]) -> None:
'''
Handles a message in the notify transfer channel.
Args:
message (dict[str,str]): message received in the channel.
'''
data = data = json.loads(message["data"])
if "logical_time" in data:
message_time = LamportTime.from_str(data["logical_time"])
Expand All @@ -224,6 +286,12 @@ def _handler_notify_transfer(self, message:dict[str,str]) -> None:


def _handler_transfer_memory(self, message:dict[str,str]) -> None:
'''
Handles a message in the transfer memory channel.
Args:
message (dict[str,str]): message received in the channel.
'''
data = json.loads(message["data"])
if "logical_time" in data:
message_time = LamportTime.from_str(data["logical_time"])
Expand Down

0 comments on commit abb63f2

Please sign in to comment.