From cfd554af0b8931b7b2a73a1ff1d04f4c8db17c89 Mon Sep 17 00:00:00 2001 From: mmikita95 Date: Mon, 19 Feb 2024 10:04:16 +0300 Subject: [PATCH] chore: linking ComponentManager to session Making ComponentManager a part of session in order to link the state of the UI to each particular session instead of the whole app. --- src/streamsync/__init__.py | 3 +-- src/streamsync/app_runner.py | 28 ++++++++++++---------------- src/streamsync/core.py | 35 ++++++++++++++++++++++------------- tests/test_core.py | 13 +++++++------ 4 files changed, 42 insertions(+), 37 deletions(-) diff --git a/src/streamsync/__init__.py b/src/streamsync/__init__.py index e81861765..de17e3259 100644 --- a/src/streamsync/__init__.py +++ b/src/streamsync/__init__.py @@ -1,11 +1,10 @@ import importlib.metadata from typing import Union, Optional, Dict, Any from streamsync.core import Readable, FileWrapper, BytesWrapper, Config -from streamsync.core import initial_state, component_manager, session_manager, session_verifier +from streamsync.core import initial_state, session_manager, session_verifier VERSION = importlib.metadata.version("streamsync") -component_manager session_manager Config session_verifier diff --git a/src/streamsync/app_runner.py b/src/streamsync/app_runner.py index e817093e3..1e95e9b5d 100644 --- a/src/streamsync/app_runner.py +++ b/src/streamsync/app_runner.py @@ -18,7 +18,7 @@ from watchdog.observers.polling import PollingObserver from pydantic import ValidationError -from streamsync.core import StreamsyncSession +from streamsync.core import ComponentManager, StreamsyncSession from streamsync.ss_types import (AppProcessServerRequest, AppProcessServerRequestPacket, AppProcessServerResponse, AppProcessServerResponsePacket, ComponentUpdateRequest, ComponentUpdateRequestPayload, EventRequest, EventResponsePayload, InitSessionRequest, InitSessionRequestPayload, InitSessionResponse, InitSessionResponsePayload, StateEnquiryRequest, StateEnquiryResponsePayload, StreamsyncEvent) import watchdog.observers @@ -134,7 +134,7 @@ def _handle_session_init(self, payload: InitSessionRequestPayload) -> InitSessio import traceback as tb session = streamsync.session_manager.get_new_session( - payload.cookies, payload.headers, payload.proposedSessionId) + payload.cookies, payload.headers, payload.proposedSessionId, self.components) if session is None: raise MessageHandlingException("Session rejected.") @@ -149,7 +149,7 @@ def _handle_session_init(self, payload: InitSessionRequestPayload) -> InitSessio userState=user_state, sessionId=session.session_id, mail=session.session_state.mail, - components=streamsync.component_manager.to_dict(), + components=session.component_manager.to_dict(), userFunctions=self._get_user_functions() ) @@ -208,10 +208,6 @@ def _handle_state_enquiry(self, session: StreamsyncSession) -> StateEnquiryRespo return res_payload - def _handle_component_update(self, payload: ComponentUpdateRequestPayload) -> None: - import streamsync - streamsync.component_manager.ingest(payload.components) - def _handle_message(self, session_id: str, request: AppProcessServerRequest) -> AppProcessServerResponse: """ Handles messages from the main process to the app's isolated process. @@ -261,7 +257,7 @@ def _handle_message(self, session_id: str, request: AppProcessServerRequest) -> if self.mode == "edit" and type == "componentUpdate": cu_req_payload = ComponentUpdateRequestPayload.parse_obj( request.payload) - self._handle_component_update(cu_req_payload) + session.component_manager.ingest(cu_req_payload.components) return AppProcessServerResponse( status="ok", status_message=None, @@ -336,14 +332,14 @@ def _main(self) -> None: if self.mode == "run": terminate_early = True - try: - streamsync.component_manager.ingest(self.components) - except BaseException: - streamsync.initial_state.add_log_entry( - "error", "UI Components Error", "Couldn't load components. An exception was raised.", tb.format_exc()) - if self.mode == "run": - terminate_early = True - + # try: + # streamsync.component_manager.ingest(self.components) + # except BaseException: + # streamsync.initial_state.add_log_entry( + # "error", "UI Components Error", "Couldn't load components. An exception was raised.", tb.format_exc()) + # if self.mode == "run": + # terminate_early = True + if terminate_early: self._terminate_early() return diff --git a/src/streamsync/core.py b/src/streamsync/core.py index e13521487..9db6b9897 100644 --- a/src/streamsync/core.py +++ b/src/streamsync/core.py @@ -16,6 +16,7 @@ import re import json import math +import traceback as tb from streamsync.ss_types import Readable, InstancePath, StreamsyncEvent, StreamsyncEventResult, StreamsyncFileItem @@ -558,8 +559,8 @@ class EventDeserialiser: Its main goal is to deserialise incoming content in a controlled and predictable way, applying sanitisation of inputs where relevant.""" - def __init__(self, session_state: StreamsyncState): - self.evaluator = Evaluator(session_state) + def __init__(self, session_id: str, session_state: StreamsyncState): + self.evaluator = Evaluator(session_id, session_state) def transform(self, ev: StreamsyncEvent) -> None: # Events without payloads are safe @@ -723,7 +724,8 @@ class Evaluator: template_regex = re.compile(r"[\\]?@{([\w\s.]*)}") - def __init__(self, session_state: StreamsyncState): + def __init__(self, session_id: str, session_state: StreamsyncState): + self.session_id = session_id self.ss = session_state def evaluate_field(self, instance_path: InstancePath, field_key: str, as_json=False, default_field_value="") -> Any: @@ -744,8 +746,9 @@ def replacer(matched): return json.dumps(serialised_value) return str(serialised_value) + session = session_manager.get_session(self.session_id) component_id = instance_path[-1]["componentId"] - component = component_manager.components[component_id] + component = session.component_manager.components[component_id] field_value = component.content.get(field_key) or default_field_value replaced = self.template_regex.sub(replacer, field_value) @@ -756,11 +759,11 @@ def replacer(matched): def get_context_data(self, instance_path: InstancePath) -> Dict[str, Any]: context: Dict[str, Any] = {} - + session = session_manager.get_session(self.session_id) for i in range(len(instance_path)): path_item = instance_path[i] component_id = path_item["componentId"] - component = component_manager.components[component_id] + component = session.component_manager.components[component_id] if component.type != "repeater": continue if i + 1 >= len(instance_path): @@ -870,7 +873,7 @@ class StreamsyncSession: Represents a session. """ - def __init__(self, session_id: str, cookies: Optional[Dict[str, str]], headers: Optional[Dict[str, str]]) -> None: + def __init__(self, session_id: str, cookies: Optional[Dict[str, str]], headers: Optional[Dict[str, str]], components: Optional[Dict[str, Any]]) -> None: self.session_id = session_id self.cookies = cookies self.headers = headers @@ -879,6 +882,13 @@ def __init__(self, session_id: str, cookies: Optional[Dict[str, str]], headers: new_state.user_state.mutated = set() self.session_state = new_state self.event_handler = EventHandler(self) + self.component_manager = ComponentManager() + if components: + try: + self.component_manager.ingest(self.components) + except BaseException: + self.session_state.add_log_entry( + "error", "UI Components Error", "Couldn't load components. An exception was raised.", tb.format_exc()) def update_last_active_timestamp(self) -> None: self.last_active_timestamp = int(time.time()) @@ -928,7 +938,7 @@ def _check_proposed_session_id(self, proposed_session_id: Optional[str]) -> bool return True return False - def get_new_session(self, cookies: Optional[Dict] = None, headers: Optional[Dict] = None, proposed_session_id: Optional[str] = None) -> Optional[StreamsyncSession]: + def get_new_session(self, cookies: Optional[Dict] = None, headers: Optional[Dict] = None, proposed_session_id: Optional[str] = None, components: Optional[Dict[str, Any]] = None) -> Optional[StreamsyncSession]: if not self._check_proposed_session_id(proposed_session_id): return None if not self._verify_before_new_session(cookies, headers): @@ -939,7 +949,7 @@ def get_new_session(self, cookies: Optional[Dict] = None, headers: Optional[Dict else: new_id = proposed_session_id new_session = StreamsyncSession( - new_id, cookies, headers) + new_id, cookies, headers, components) self.sessions[new_id] = new_session return new_session @@ -977,8 +987,8 @@ class EventHandler: def __init__(self, session: StreamsyncSession) -> None: self.session = session self.session_state = session.session_state - self.deser = EventDeserialiser(self.session_state) - self.evaluator = Evaluator(self.session_state) + self.deser = EventDeserialiser(self.session.session_id, self.session_state) + self.evaluator = Evaluator(self.session.session_id, self.session_state) def _handle_binding(self, event_type, target_component, instance_path, payload) -> None: @@ -1075,7 +1085,7 @@ def handle(self, ev: StreamsyncEvent) -> StreamsyncEventResult: try: instance_path = ev.instancePath target_id = instance_path[-1]["componentId"] - target_component = component_manager.components[target_id] + target_component = self.session.component_manager.components[target_id] self._handle_binding(ev.type, target_component, instance_path, ev.payload) result = self._call_handler_callable( @@ -1091,7 +1101,6 @@ def handle(self, ev: StreamsyncEvent) -> StreamsyncEventResult: state_serialiser = StateSerialiser() -component_manager = ComponentManager() initial_state = StreamsyncState() session_manager = SessionManager() diff --git a/tests/test_core.py b/tests/test_core.py index 3e8957952..9b561a05a 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -48,7 +48,8 @@ ss.Config.is_mail_enabled_for_log = True ss.init_state(raw_state_dict) -ss.component_manager.ingest(sc) +session = ss.session_manager.get_new_session() +session.component_manager.ingest(sc) class TestStateProxy: @@ -237,7 +238,7 @@ class TestEventDeserialiser: root_instance_path = [{"componentId": "root", "instanceNumber": 0}] session_state = StreamsyncState(raw_state_dict) - ed = EventDeserialiser(session_state) + ed = EventDeserialiser(session.session_id, session_state) def test_unknown_no_payload(self) -> None: ev = StreamsyncEvent( @@ -598,7 +599,7 @@ def test_evaluate_field_simple(self) -> None: st = StreamsyncState({ "counter": 8 }) - e = Evaluator(st) + e = Evaluator(session.session_id, st) evaluated = e.evaluate_field(instance_path, "text") assert evaluated == "The counter is 8" @@ -622,7 +623,7 @@ def test_evaluate_field_repeater(self) -> None: "ts": "TypeScript" } }) - e = Evaluator(st) + e = Evaluator(session.session_id, st) assert e.evaluate_field( instance_path_0, "text") == "The id is c and the name is C" assert e.evaluate_field( @@ -633,7 +634,7 @@ def test_set_state(self) -> None: {"componentId": "root", "instanceNumber": 0} ] st = StreamsyncState(raw_state_dict) - e = Evaluator(st) + e = Evaluator(session.session_id, st) e.set_state("name", instance_path, "Roger") e.set_state("dynamic_prop", instance_path, "height") e.set_state("features[dynamic_prop]", instance_path, "toddler height") @@ -647,7 +648,7 @@ def test_evaluate_expression(self) -> None: {"componentId": "root", "instanceNumber": 0} ] st = StreamsyncState(raw_state_dict) - e = Evaluator(st) + e = Evaluator(session.session_id, st) assert e.evaluate_expression("features.eyes", instance_path) == "green" assert e.evaluate_expression("best_feature", instance_path) == "eyes" assert e.evaluate_expression("features[best_feature]", instance_path) == "green"