Skip to content

Commit

Permalink
chore: linking ComponentManager to session
Browse files Browse the repository at this point in the history
Making ComponentManager a part of session in order to link the state of the UI to each particular session instead of the whole app.
  • Loading branch information
mmikita95 committed Feb 19, 2024
1 parent e2c1f2e commit cfd554a
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 37 deletions.
3 changes: 1 addition & 2 deletions src/streamsync/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import importlib.metadata
from typing import Union, Optional, Dict, Any
from streamsync.core import Readable, FileWrapper, BytesWrapper, Config
from streamsync.core import initial_state, component_manager, session_manager, session_verifier
from streamsync.core import initial_state, session_manager, session_verifier

VERSION = importlib.metadata.version("streamsync")

component_manager
session_manager
Config
session_verifier
Expand Down
28 changes: 12 additions & 16 deletions src/streamsync/app_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from watchdog.observers.polling import PollingObserver

from pydantic import ValidationError
from streamsync.core import StreamsyncSession
from streamsync.core import ComponentManager, StreamsyncSession
from streamsync.ss_types import (AppProcessServerRequest, AppProcessServerRequestPacket, AppProcessServerResponse, AppProcessServerResponsePacket, ComponentUpdateRequest, ComponentUpdateRequestPayload,
EventRequest, EventResponsePayload, InitSessionRequest, InitSessionRequestPayload, InitSessionResponse, InitSessionResponsePayload, StateEnquiryRequest, StateEnquiryResponsePayload, StreamsyncEvent)
import watchdog.observers
Expand Down Expand Up @@ -134,7 +134,7 @@ def _handle_session_init(self, payload: InitSessionRequestPayload) -> InitSessio
import traceback as tb

session = streamsync.session_manager.get_new_session(
payload.cookies, payload.headers, payload.proposedSessionId)
payload.cookies, payload.headers, payload.proposedSessionId, self.components)
if session is None:
raise MessageHandlingException("Session rejected.")

Expand All @@ -149,7 +149,7 @@ def _handle_session_init(self, payload: InitSessionRequestPayload) -> InitSessio
userState=user_state,
sessionId=session.session_id,
mail=session.session_state.mail,
components=streamsync.component_manager.to_dict(),
components=session.component_manager.to_dict(),
userFunctions=self._get_user_functions()
)

Expand Down Expand Up @@ -208,10 +208,6 @@ def _handle_state_enquiry(self, session: StreamsyncSession) -> StateEnquiryRespo

return res_payload

def _handle_component_update(self, payload: ComponentUpdateRequestPayload) -> None:
import streamsync
streamsync.component_manager.ingest(payload.components)

def _handle_message(self, session_id: str, request: AppProcessServerRequest) -> AppProcessServerResponse:
"""
Handles messages from the main process to the app's isolated process.
Expand Down Expand Up @@ -261,7 +257,7 @@ def _handle_message(self, session_id: str, request: AppProcessServerRequest) ->
if self.mode == "edit" and type == "componentUpdate":
cu_req_payload = ComponentUpdateRequestPayload.parse_obj(
request.payload)
self._handle_component_update(cu_req_payload)
session.component_manager.ingest(cu_req_payload.components)
return AppProcessServerResponse(
status="ok",
status_message=None,
Expand Down Expand Up @@ -336,14 +332,14 @@ def _main(self) -> None:
if self.mode == "run":
terminate_early = True

try:
streamsync.component_manager.ingest(self.components)
except BaseException:
streamsync.initial_state.add_log_entry(
"error", "UI Components Error", "Couldn't load components. An exception was raised.", tb.format_exc())
if self.mode == "run":
terminate_early = True

# try:
# streamsync.component_manager.ingest(self.components)
# except BaseException:
# streamsync.initial_state.add_log_entry(
# "error", "UI Components Error", "Couldn't load components. An exception was raised.", tb.format_exc())
# if self.mode == "run":
# terminate_early = True
if terminate_early:
self._terminate_early()
return
Expand Down
35 changes: 22 additions & 13 deletions src/streamsync/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import re
import json
import math
import traceback as tb
from streamsync.ss_types import Readable, InstancePath, StreamsyncEvent, StreamsyncEventResult, StreamsyncFileItem


Expand Down Expand Up @@ -558,8 +559,8 @@ class EventDeserialiser:
Its main goal is to deserialise incoming content in a controlled and predictable way,
applying sanitisation of inputs where relevant."""

def __init__(self, session_state: StreamsyncState):
self.evaluator = Evaluator(session_state)
def __init__(self, session_id: str, session_state: StreamsyncState):
self.evaluator = Evaluator(session_id, session_state)

def transform(self, ev: StreamsyncEvent) -> None:
# Events without payloads are safe
Expand Down Expand Up @@ -723,7 +724,8 @@ class Evaluator:

template_regex = re.compile(r"[\\]?@{([\w\s.]*)}")

def __init__(self, session_state: StreamsyncState):
def __init__(self, session_id: str, session_state: StreamsyncState):
self.session_id = session_id
self.ss = session_state

def evaluate_field(self, instance_path: InstancePath, field_key: str, as_json=False, default_field_value="") -> Any:
Expand All @@ -744,8 +746,9 @@ def replacer(matched):
return json.dumps(serialised_value)
return str(serialised_value)

session = session_manager.get_session(self.session_id)
component_id = instance_path[-1]["componentId"]
component = component_manager.components[component_id]
component = session.component_manager.components[component_id]
field_value = component.content.get(field_key) or default_field_value
replaced = self.template_regex.sub(replacer, field_value)

Expand All @@ -756,11 +759,11 @@ def replacer(matched):

def get_context_data(self, instance_path: InstancePath) -> Dict[str, Any]:
context: Dict[str, Any] = {}

session = session_manager.get_session(self.session_id)
for i in range(len(instance_path)):
path_item = instance_path[i]
component_id = path_item["componentId"]
component = component_manager.components[component_id]
component = session.component_manager.components[component_id]
if component.type != "repeater":
continue
if i + 1 >= len(instance_path):
Expand Down Expand Up @@ -870,7 +873,7 @@ class StreamsyncSession:
Represents a session.
"""

def __init__(self, session_id: str, cookies: Optional[Dict[str, str]], headers: Optional[Dict[str, str]]) -> None:
def __init__(self, session_id: str, cookies: Optional[Dict[str, str]], headers: Optional[Dict[str, str]], components: Optional[Dict[str, Any]]) -> None:
self.session_id = session_id
self.cookies = cookies
self.headers = headers
Expand All @@ -879,6 +882,13 @@ def __init__(self, session_id: str, cookies: Optional[Dict[str, str]], headers:
new_state.user_state.mutated = set()
self.session_state = new_state
self.event_handler = EventHandler(self)
self.component_manager = ComponentManager()
if components:
try:
self.component_manager.ingest(self.components)
except BaseException:
self.session_state.add_log_entry(
"error", "UI Components Error", "Couldn't load components. An exception was raised.", tb.format_exc())

def update_last_active_timestamp(self) -> None:
self.last_active_timestamp = int(time.time())
Expand Down Expand Up @@ -928,7 +938,7 @@ def _check_proposed_session_id(self, proposed_session_id: Optional[str]) -> bool
return True
return False

def get_new_session(self, cookies: Optional[Dict] = None, headers: Optional[Dict] = None, proposed_session_id: Optional[str] = None) -> Optional[StreamsyncSession]:
def get_new_session(self, cookies: Optional[Dict] = None, headers: Optional[Dict] = None, proposed_session_id: Optional[str] = None, components: Optional[Dict[str, Any]] = None) -> Optional[StreamsyncSession]:
if not self._check_proposed_session_id(proposed_session_id):
return None
if not self._verify_before_new_session(cookies, headers):
Expand All @@ -939,7 +949,7 @@ def get_new_session(self, cookies: Optional[Dict] = None, headers: Optional[Dict
else:
new_id = proposed_session_id
new_session = StreamsyncSession(
new_id, cookies, headers)
new_id, cookies, headers, components)
self.sessions[new_id] = new_session
return new_session

Expand Down Expand Up @@ -977,8 +987,8 @@ class EventHandler:
def __init__(self, session: StreamsyncSession) -> None:
self.session = session
self.session_state = session.session_state
self.deser = EventDeserialiser(self.session_state)
self.evaluator = Evaluator(self.session_state)
self.deser = EventDeserialiser(self.session.session_id, self.session_state)
self.evaluator = Evaluator(self.session.session_id, self.session_state)


def _handle_binding(self, event_type, target_component, instance_path, payload) -> None:
Expand Down Expand Up @@ -1075,7 +1085,7 @@ def handle(self, ev: StreamsyncEvent) -> StreamsyncEventResult:
try:
instance_path = ev.instancePath
target_id = instance_path[-1]["componentId"]
target_component = component_manager.components[target_id]
target_component = self.session.component_manager.components[target_id]

self._handle_binding(ev.type, target_component, instance_path, ev.payload)
result = self._call_handler_callable(
Expand All @@ -1091,7 +1101,6 @@ def handle(self, ev: StreamsyncEvent) -> StreamsyncEventResult:


state_serialiser = StateSerialiser()
component_manager = ComponentManager()
initial_state = StreamsyncState()
session_manager = SessionManager()

Expand Down
13 changes: 7 additions & 6 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@

ss.Config.is_mail_enabled_for_log = True
ss.init_state(raw_state_dict)
ss.component_manager.ingest(sc)
session = ss.session_manager.get_new_session()
session.component_manager.ingest(sc)


class TestStateProxy:
Expand Down Expand Up @@ -237,7 +238,7 @@ class TestEventDeserialiser:

root_instance_path = [{"componentId": "root", "instanceNumber": 0}]
session_state = StreamsyncState(raw_state_dict)
ed = EventDeserialiser(session_state)
ed = EventDeserialiser(session.session_id, session_state)

def test_unknown_no_payload(self) -> None:
ev = StreamsyncEvent(
Expand Down Expand Up @@ -598,7 +599,7 @@ def test_evaluate_field_simple(self) -> None:
st = StreamsyncState({
"counter": 8
})
e = Evaluator(st)
e = Evaluator(session.session_id, st)
evaluated = e.evaluate_field(instance_path, "text")
assert evaluated == "The counter is 8"

Expand All @@ -622,7 +623,7 @@ def test_evaluate_field_repeater(self) -> None:
"ts": "TypeScript"
}
})
e = Evaluator(st)
e = Evaluator(session.session_id, st)
assert e.evaluate_field(
instance_path_0, "text") == "The id is c and the name is C"
assert e.evaluate_field(
Expand All @@ -633,7 +634,7 @@ def test_set_state(self) -> None:
{"componentId": "root", "instanceNumber": 0}
]
st = StreamsyncState(raw_state_dict)
e = Evaluator(st)
e = Evaluator(session.session_id, st)
e.set_state("name", instance_path, "Roger")
e.set_state("dynamic_prop", instance_path, "height")
e.set_state("features[dynamic_prop]", instance_path, "toddler height")
Expand All @@ -647,7 +648,7 @@ def test_evaluate_expression(self) -> None:
{"componentId": "root", "instanceNumber": 0}
]
st = StreamsyncState(raw_state_dict)
e = Evaluator(st)
e = Evaluator(session.session_id, st)
assert e.evaluate_expression("features.eyes", instance_path) == "green"
assert e.evaluate_expression("best_feature", instance_path) == "eyes"
assert e.evaluate_expression("features[best_feature]", instance_path) == "green"
Expand Down

0 comments on commit cfd554a

Please sign in to comment.