Skip to content

Commit

Permalink
chore: Moving components to session
Browse files Browse the repository at this point in the history
  • Loading branch information
ramedina86 committed Feb 19, 2024
1 parent 6d4728e commit 5a979f6
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 22 deletions.
4 changes: 2 additions & 2 deletions src/streamsync/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import importlib.metadata
from typing import Union, Optional, Dict, Any
from streamsync.core import Readable, FileWrapper, BytesWrapper, Config
from streamsync.core import initial_state, component_manager, session_manager, session_verifier
from streamsync.core import initial_state, base_component_tree, session_manager, session_verifier

VERSION = importlib.metadata.version("streamsync")

component_manager
base_component_tree
session_manager
Config
session_verifier
Expand Down
20 changes: 10 additions & 10 deletions src/streamsync/app_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def __init__(self,
app_path: str,
mode: str,
run_code: str,
components: Dict,
bmc_components: Dict,
is_app_process_server_ready: multiprocessing.synchronize.Event,
is_app_process_server_failed: multiprocessing.synchronize.Event):
super().__init__(name="AppProcess")
Expand All @@ -78,7 +78,7 @@ def __init__(self,
self.app_path = app_path
self.mode = mode
self.run_code = run_code
self.components = components
self.bmc_components = bmc_components
self.is_app_process_server_ready = is_app_process_server_ready
self.is_app_process_server_failed = is_app_process_server_failed
self.logger = logging.getLogger("app")
Expand Down Expand Up @@ -149,7 +149,7 @@ def _handle_session_init(self, payload: InitSessionRequestPayload) -> InitSessio
userState=user_state,
sessionId=session.session_id,
mail=session.session_state.mail,
components=streamsync.component_manager.to_dict(),
components=session.session_component_tree.to_dict(),
userFunctions=self._get_user_functions()
)

Expand Down Expand Up @@ -210,7 +210,7 @@ def _handle_state_enquiry(self, session: StreamsyncSession) -> StateEnquiryRespo

def _handle_component_update(self, payload: ComponentUpdateRequestPayload) -> None:
import streamsync
streamsync.component_manager.ingest(payload.components)
streamsync.base_component_tree.ingest(payload.components)

def _handle_message(self, session_id: str, request: AppProcessServerRequest) -> AppProcessServerResponse:
"""
Expand Down Expand Up @@ -337,7 +337,7 @@ def _main(self) -> None:
terminate_early = True

try:
streamsync.component_manager.ingest(self.components)
streamsync.base_component_tree.ingest(self.bmc_components)
except BaseException:
streamsync.initial_state.add_log_entry(
"error", "UI Components Error", "Couldn't load components. An exception was raised.", tb.format_exc())
Expand Down Expand Up @@ -535,7 +535,7 @@ def __init__(self, app_path: str, mode: str):
self.client_conn: Optional[multiprocessing.connection.Connection] = None
self.app_process: Optional[AppProcess] = None
self.run_code: Optional[str] = None
self.components: Optional[Dict] = None
self.bmc_components: Optional[Dict] = None
self.is_app_process_server_ready = multiprocessing.Event()
self.is_app_process_server_failed = multiprocessing.Event()
self.app_process_listener: Optional[AppProcessListener] = None
Expand Down Expand Up @@ -589,7 +589,7 @@ def signal_handler(sig, frame):
pass

self.run_code = self._load_persisted_script()
self.components = self._load_persisted_components()
self.bmc_components = self._load_persisted_components()

if self.mode == "edit":
self._set_observer()
Expand Down Expand Up @@ -676,7 +676,7 @@ async def update_components(self, session_id: str, payload: ComponentUpdateReque
if self.mode != "edit":
raise PermissionError(
"Cannot update components in non-update mode.")
self.components = payload.components
self.bmc_components = payload.components
file_contents = {
"metadata": {
"streamsync_version": VERSION
Expand Down Expand Up @@ -744,7 +744,7 @@ def shut_down(self) -> None:
def _start_app_process(self) -> None:
if self.run_code is None:
raise ValueError("Cannot start app process. Code hasn't been set.")
if self.components is None:
if self.bmc_components is None:
raise ValueError(
"Cannot start app process. Components haven't been set.")
self.is_app_process_server_ready.clear()
Expand All @@ -758,7 +758,7 @@ def _start_app_process(self) -> None:
app_path=self.app_path,
mode=self.mode,
run_code=self.run_code,
components=self.components,
bmc_components=self.bmc_components,
is_app_process_server_ready=self.is_app_process_server_ready,
is_app_process_server_failed=self.is_app_process_server_failed)
self.app_process.start()
Expand Down
45 changes: 35 additions & 10 deletions src/streamsync/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,14 +504,17 @@ def to_dict(self) -> Dict:
return c_dict


class ComponentManager:
class ComponentTree:

def __init__(self) -> None:
self.counter: int = 0
self.components: Dict[str, Component] = {}
root_component = Component("root", "root", {})
self.attach(root_component)

def get_component(self, component_id: str) -> Component:
return self.components.get(component_id)

def get_descendents(self, parent_id: str) -> List[Component]:
children = list(filter(lambda c: c.parentId == parent_id,
self.components.values()))
Expand Down Expand Up @@ -549,6 +552,25 @@ def to_dict(self) -> Dict:
return active_components


class SessionComponentTree(ComponentTree):

def __init__(self, base_component_tree: ComponentTree):
super().__init__()
self.base_component_tree = base_component_tree

def get_component(self, component_id: str) -> Component:
base_component = self.base_component_tree.get_component(component_id)
if base_component:
return base_component
return self.components.get(component_id)

def to_dict(self) -> Dict:
active_components = {}
for id, component in {**self.components, **self.base_component_tree.components}.items():
active_components[id] = component.to_dict()
return active_components


class EventDeserialiser:

"""Applies transformations to the payload of an incoming event, depending on its type.
Expand All @@ -558,8 +580,8 @@ class EventDeserialiser:
Its main goal is to deserialise incoming content in a controlled and predictable way,
applying sanitisation of inputs where relevant."""

def __init__(self, session_state: StreamsyncState):
self.evaluator = Evaluator(session_state)
def __init__(self, session_state: StreamsyncState, session_component_tree: SessionComponentTree):
self.evaluator = Evaluator(session_state, session_component_tree)

def transform(self, ev: StreamsyncEvent) -> None:
# Events without payloads are safe
Expand Down Expand Up @@ -723,8 +745,9 @@ class Evaluator:

template_regex = re.compile(r"[\\]?@{([\w\s.]*)}")

def __init__(self, session_state: StreamsyncState):
def __init__(self, session_state: StreamsyncState, session_component_tree: ComponentTree):
self.ss = session_state
self.ct = session_component_tree

def evaluate_field(self, instance_path: InstancePath, field_key: str, as_json=False, default_field_value="") -> Any:
def replacer(matched):
Expand All @@ -745,7 +768,7 @@ def replacer(matched):
return str(serialised_value)

component_id = instance_path[-1]["componentId"]
component = component_manager.components[component_id]
component = self.ct.get_component(component_id)
field_value = component.content.get(field_key) or default_field_value
replaced = self.template_regex.sub(replacer, field_value)

Expand All @@ -760,7 +783,7 @@ def get_context_data(self, instance_path: InstancePath) -> Dict[str, Any]:
for i in range(len(instance_path)):
path_item = instance_path[i]
component_id = path_item["componentId"]
component = component_manager.components[component_id]
component = self.ct.get_component(component_id)
if component.type != "repeater":
continue
if i + 1 >= len(instance_path):
Expand Down Expand Up @@ -878,6 +901,7 @@ def __init__(self, session_id: str, cookies: Optional[Dict[str, str]], headers:
new_state = StreamsyncState.get_new()
new_state.user_state.mutated = set()
self.session_state = new_state
self.session_component_tree = SessionComponentTree(base_component_tree)
self.event_handler = EventHandler(self)

def update_last_active_timestamp(self) -> None:
Expand Down Expand Up @@ -977,8 +1001,9 @@ class EventHandler:
def __init__(self, session: StreamsyncSession) -> None:
self.session = session
self.session_state = session.session_state
self.deser = EventDeserialiser(self.session_state)
self.evaluator = Evaluator(self.session_state)
self.session_component_tree = session.session_component_tree
self.deser = EventDeserialiser(self.session_state, self.session_component_tree)
self.evaluator = Evaluator(self.session_state, self.session_component_tree)


def _handle_binding(self, event_type, target_component, instance_path, payload) -> None:
Expand Down Expand Up @@ -1075,7 +1100,7 @@ def handle(self, ev: StreamsyncEvent) -> StreamsyncEventResult:
try:
instance_path = ev.instancePath
target_id = instance_path[-1]["componentId"]
target_component = component_manager.components[target_id]
target_component = self.session_component_tree.get_component(target_id)

self._handle_binding(ev.type, target_component, instance_path, ev.payload)
result = self._call_handler_callable(
Expand All @@ -1091,7 +1116,7 @@ def handle(self, ev: StreamsyncEvent) -> StreamsyncEventResult:


state_serialiser = StateSerialiser()
component_manager = ComponentManager()
base_component_tree = ComponentTree()
initial_state = StreamsyncState()
session_manager = SessionManager()

Expand Down

0 comments on commit 5a979f6

Please sign in to comment.