From 7a335af6a76741d4181c220a05eed0198587524e Mon Sep 17 00:00:00 2001 From: Fabien Arcellier Date: Sat, 2 Mar 2024 11:29:42 +0100 Subject: [PATCH 1/4] feat: declare optional schema on streamsync state --- docs/docs/.vitepress/config.ts | 1 + docs/docs/state-schema.md | 123 +++++++++++++++ src/streamsync/__init__.py | 44 ++++-- src/streamsync/app_runner.py | 14 +- src/streamsync/core.py | 274 ++++++++++++++++++++++++++++----- tests/test_core.py | 116 ++++++++++++-- tests/test_init_state.py | 100 ++++++++++++ 7 files changed, 602 insertions(+), 70 deletions(-) create mode 100644 docs/docs/state-schema.md create mode 100644 tests/test_init_state.py diff --git a/docs/docs/.vitepress/config.ts b/docs/docs/.vitepress/config.ts index 0823e3c79..bc8dbe3c7 100644 --- a/docs/docs/.vitepress/config.ts +++ b/docs/docs/.vitepress/config.ts @@ -40,6 +40,7 @@ export default { { text: "Page routes", link: "/page-routes" }, { text: "Sessions", link: "/sessions" }, { text: "Custom server", link: "/custom-server" }, + { text: "State schema", link: "/state-schema" }, ], }, { diff --git a/docs/docs/state-schema.md b/docs/docs/state-schema.md new file mode 100644 index 000000000..1bcd63df7 --- /dev/null +++ b/docs/docs/state-schema.md @@ -0,0 +1,123 @@ +# State schema + +Schema declaration on the [Application state](./application-state) allows Streamsync to handle complex serialization +scenario and empower your IDE and toolchains to provide autocomplete and type checking. + +## Schema declaration + +```python +import streamsync as ss + +class AppSchema(ss.StreamsyncState): + counter: int + +initial_state = ss.init_state({ + "counter": 0 +}, schema=AppSchema) + +# Event handler +# It receives the session state as an argument and mutates it +def increment(state: AppSchema): + state.counter += 1 +``` + +Access to an attribute by its key is always possible. + +```python +def increment(state: AppSchema): + state['counter'] += 1 +``` + +Attributes missing from the schema remain accessible by their key. + +```python +initial_state = ss.init_state({ + "counter": 0, + "message": None +}, schema=AppSchema) + +def increment(state: AppSchema): + state['message'] = "Hello pigeon" +``` + +## Schema composition + +Schema composition allows you to model a complex Application state. + +```python +class MyappSchema(ss.State): + title: str + +class AppSchema(ss.StreamsyncState): + my_app: MyappSchema + counter: int + +initial_state = ss.init_state({ + "counter": 0, + "my_app": { + "title": "Nested value" + } +}, schema=AppSchema) +``` + +## Multi-level dictionary + +Some components like Vega require specifying a graph in the form of a multi-level dictionary. + +A schema allows you to specify to streamsync that an attribute which contains a dictionary +must be treated as a dictionary and not as a group of state. + +```python +class AppSchema(ss.StreamsyncState): + vegas_graph: dict + +# Without schema, this handler is execute only once +def handle_vega_graph(state: AppSchema): + graph = state.vega_graph + graph['data']['values'][0]['b'] += 1000 + state.vega_graph = graph + +initial_state = ss.init_state({ + "vegas_graph": { + "data": { + "values": [ + {"a": "C", "b": 2}, {"a": "C", "b": 7}, {"a": "C", "b": 4}, + {"a": "D", "b": 1}, {"a": "D", "b": 2}, {"a": "D", "b": 6}, + {"a": "E", "b": 8}, {"a": "E", "b": 4}, {"a": "E", "b": 7} + ] + }, + "mark": "bar", + "encoding": { + "x": {"field": "a", "type": "nominal"}, + "y": {"aggregate": "average", "field": "b", "type": "quantitative"} + } + }, +}, schema=AppSchema) +``` + +## Type checking + +A schema allows you to check the integrity of your backend using the type system. +The code below will raise an error with mypy. + +```bash +$ mypy apps/myapp/main.py +apps/myapp/main.py:7: error: "AppSchema" has no attribute "countr"; maybe "counter"? [attr-defined] +``` + +Here is the code, can you spot the error ? + +```python +import streamsync as ss + +class AppSchema(ss.StreamsyncState): + counter: int + +def increment(state: AppSchema): + state.countr += 1 + +initial_state = ss.init_state({ + "counter": 26, +}, schema=AppSchema) +``` + diff --git a/src/streamsync/__init__.py b/src/streamsync/__init__.py index 5acc1a35c..958116d5d 100644 --- a/src/streamsync/__init__.py +++ b/src/streamsync/__init__.py @@ -1,9 +1,11 @@ import importlib.metadata -from typing import Any, Dict, Optional, Union +from typing import Union, Optional, Dict, Any, Type, TypeVar, cast from streamsync.core import (BytesWrapper, Config, FileWrapper, Readable, base_component_tree, initial_state, session_manager, session_verifier) +from streamsync.core import Readable, FileWrapper, BytesWrapper, Config, StreamsyncState +from streamsync.core import new_initial_state, base_component_tree, session_manager, session_verifier from streamsync.ui import StreamsyncUIManager VERSION = importlib.metadata.version("streamsync") @@ -31,17 +33,7 @@ def pack_bytes(raw_data, mime_type: Optional[str] = None): return BytesWrapper(raw_data, mime_type) - -def init_state(state_dict: Dict[str, Any]): - """ - Sets the initial state, which will be used as the starting point for - every session. - """ - - initial_state.user_state.state = {} - initial_state.user_state.ingest(state_dict) - return initial_state - +S = TypeVar('S', bound=StreamsyncState) def init_ui() -> StreamsyncUIManager: """Initializes and returns an instance of StreamsyncUIManager. @@ -58,8 +50,36 @@ def init_ui() -> StreamsyncUIManager: **Example**:: + >>> import streamsync as ss + >>> >>> with ss.init_ui() as ui: >>> with ui.Page({"key": "hello"}): >>> ui.Text({"text": "Hello pigeons"}) """ return StreamsyncUIManager() + + +def init_state(state_dict: Dict[str, Any], schema: Optional[Type[S]] = None) -> Union[S, StreamsyncState]: + """ + Sets the initial state, which will be used as the starting point for + every session. + + initial_state.user_state.state = {} + initial_state.user_state.ingest(state_dict) + return initial_state + + + + >>> import streamsync as ss + + >>> initial_state = ss.init_state({ + >>> "counter": 0, + >>> }, schema=AppSchema) + """ + concrete_schema = cast(Type[S], StreamsyncState if schema is None else schema) + if not issubclass(concrete_schema, StreamsyncState): + raise ValueError("Root schema must inherit from StreamsyncState") + + _initial_state: S = new_initial_state(concrete_schema) + _initial_state.ingest(state_dict) + return _initial_state diff --git a/src/streamsync/app_runner.py b/src/streamsync/app_runner.py index 11caf4f4a..621a33301 100644 --- a/src/streamsync/app_runner.py +++ b/src/streamsync/app_runner.py @@ -290,7 +290,7 @@ def _execute_user_code(self) -> None: captured_stdout = f.getvalue() if captured_stdout: - streamsync.initial_state.add_log_entry( + streamsync.core.initial_state.add_log_entry( "info", "Stdout message during initialisation", captured_stdout) def _apply_configuration(self) -> None: @@ -326,7 +326,7 @@ def _main(self) -> None: try: streamsync.base_component_tree.ingest(self.bmc_components) except BaseException: - streamsync.initial_state.add_log_entry( + streamsync.core.initial_state.add_log_entry( "error", "UI Components Error", "Couldn't load components. An exception was raised.", tb.format_exc()) if self.mode == "run": terminate_early = True @@ -336,7 +336,7 @@ def _main(self) -> None: except BaseException: # Initialisation errors will be sent to all sessions via mail during session initialisation - streamsync.initial_state.add_log_entry( + streamsync.core.initial_state.add_log_entry( "error", "Code Error", "Couldn't execute code. An exception was raised.", tb.format_exc()) # Exit if in run mode @@ -344,6 +344,14 @@ def _main(self) -> None: if self.mode == "run": terminate_early = True + try: + streamsync.base_component_tree.ingest(self.bmc_components) + except BaseException: + streamsync.core.initial_state.add_log_entry( + "error", "UI Components Error", "Couldn't load components. An exception was raised.", tb.format_exc()) + if self.mode == "run": + terminate_early = True + if terminate_early: self._terminate_early() return diff --git a/src/streamsync/core.py b/src/streamsync/core.py index 504fd7acf..2af25316b 100644 --- a/src/streamsync/core.py +++ b/src/streamsync/core.py @@ -8,7 +8,7 @@ import sys import time import traceback -from typing import Any, Callable, Dict, List, Literal, Optional, Set, Tuple, Union +from typing import Any, Callable, Dict, List, Literal, Optional, Set, Tuple, Union, TypeVar, Type, Sequence, cast import urllib.request import base64 import io @@ -90,7 +90,7 @@ class StateSerialiser: """ def serialise(self, v: Any) -> Union[Dict, List, str, bool, int, float, None]: - if isinstance(v, StateProxy): + if isinstance(v, State): return self._serialise_dict_recursively(v.to_dict()) if isinstance(v, (FileWrapper, BytesWrapper)): return self._serialise_ss_wrapper(v) @@ -210,6 +210,9 @@ def ingest(self, raw_state: Dict) -> None: for key, raw_value in raw_state.items(): self.__setitem__(key, raw_value) + def items(self) -> Sequence[Tuple[str, Any]]: + return cast(Sequence[Tuple[str, Any]], self.state.items()) + def get(self, key) -> Any: return self.state.get(key) @@ -221,19 +224,7 @@ def __setitem__(self, key, raw_value) -> None: raise ValueError( f"State keys must be strings. Received {str(key)} ({type(key)}).") - # Items that are dictionaries are converted to StateProxy instances - - if isinstance(raw_value, dict): - value = StateProxy(raw_value) - elif isinstance(raw_value, StateProxy): - # Children StateProxies need to be reinitialised - # during an assignment to parent StateProxy - # to ensure proper mutation tracking - value = StateProxy(raw_value.state) - else: - value = raw_value - - self.state[key] = value + self.state[key] = raw_value self._apply_raw(f"+{key}") def __delitem__(self, key: str) -> None: @@ -311,8 +302,145 @@ def to_dict(self) -> Dict[str, Any]: return serialised -class StreamsyncState(): +def get_annotations(instance) -> Dict[str, Any]: + """ + Returns the annotations of the class in a way that works on python 3.9 and python 3.10 + """ + if isinstance(instance, type): + ann = instance.__dict__.get('__annotations__', None) + else: + ann = getattr(instance, '__annotations__', None) + + if ann is None: + ann = {} + return ann + + +class StateMeta(type): + """ + Constructs a class at runtime that extends StreamsyncState or State + with dynamic properties for each annotation of the class. + """ + + def __new__(cls, name, bases, attrs): + klass = super().__new__(cls, name, bases, attrs) + cls.bind_annotations_to_state_proxy(klass) + return klass + + @classmethod + def bind_annotations_to_state_proxy(cls, klass): + """ + Loops through the class annotations and creates properties dynamically for each one. + + >>> class MyState(State): + >>> counter: int + + will be transformed into + + >>> class MyState(State): + >>> + >>> @property + >>> def counter(self): + >>> return self._state_proxy["counter"] + >>> + >>> @counter.setter + >>> def counter(self, value): + >>> self._state_proxy["counter"] = value + + Annotations that reference a State are ignored. The link will be established through a State instance + when ingesting state data. + + >>> class MyAppState(State): + >>> title: str + + >>> class MyState(State): + >>> myapp: MyAppState # Nothing happens + """ + + annotations = get_annotations(klass) + for key, expected_type in annotations.items(): + if key == "_state_proxy": + raise AttributeError("_state_proxy is an reserved keyword for streamsync, don't use it in annotation.") + + if not(inspect.isclass(expected_type) and issubclass(expected_type, State)): + proxy = DictPropertyProxy("_state_proxy", key) + setattr(klass, key, proxy) + + +class State(metaclass=StateMeta): + """ + `State` represents a state of the application. + """ + + def __init__(self, raw_state: Dict[str, Any] = {}): + self._state_proxy: StateProxy = StateProxy(raw_state) + self.ingest(raw_state) + + def ingest(self, raw_state: Dict[str, Any]) -> None: + """ + hydrates a state from raw data by applying a schema when it is provided. + """ + self._state_proxy.state = {} + for key, value in raw_state.items(): + self._set_state_item(key, value) + + def to_dict(self) -> dict: + """ + Serializes state data as a dictionary + + Private attributes, prefixed with _, are ignored. + + >>> state = StreamsyncState({'message': "hello world"}) + >>> return state.to_dict() + """ + return self._state_proxy.to_dict() + + def __repr__(self) -> str: + return self._state_proxy.__repr__() + + def __getitem__(self, key: str) -> Any: + annotations = get_annotations(self) + expected_type = annotations.get(key) + if expected_type is not None and inspect.isclass(expected_type) and issubclass(expected_type, State): + return getattr(self, key) + else: + return self._state_proxy.__getitem__(key) + + def __setitem__(self, key: str, raw_value: Any) -> None: + self._set_state_item(key, raw_value) + + def __delitem__(self, key: str) -> Any: + return self._state_proxy.__delitem__(key) + + def remove(self, key: str) -> Any: + return self.__delitem__(key) + def __contains__(self, key: str) -> bool: + return self._state_proxy.__contains__(key) + + def _set_state_item(self, key: str, value: Any): + """ + """ + annotations = get_annotations(self) + expected_type = annotations.get(key, None) + expect_dict = expected_type is not None and inspect.isclass(expected_type) and issubclass(expected_type, dict) + if isinstance(value, dict) and not expect_dict: + """ + When the value is a dictionary and the attribute does not explicitly + expect a dictionary, we instantiate a new state to manage mutations. + """ + state = annotations[key](value) if key in annotations else State() + if not isinstance(state, State): + raise ValueError(f"Attribute {key} must inherit of State or requires a dict to accept dictionary") + + setattr(self, key, state) + state.ingest(value) + self._state_proxy[key] = state._state_proxy + else: + self._state_proxy[key] = value + + +class StreamsyncState(State): """ Root state. Comprises user configurable state and mail (notifications, log entries, etc). @@ -321,11 +449,12 @@ class StreamsyncState(): LOG_ENTRY_MAX_LEN = 8192 def __init__(self, raw_state: Dict[str, Any] = {}, mail: List[Any] = []): - self.user_state: StateProxy = StateProxy(raw_state) + super().__init__(raw_state) self.mail = copy.deepcopy(mail) - def __repr__(self) -> str: - return self.user_state.__repr__() + @property + def user_state(self) -> StateProxy: + return self._state_proxy @classmethod def get_new(cls): @@ -333,7 +462,19 @@ def get_new(cls): return initial_state.get_clone() - def get_clone(self): + def get_clone(self) -> 'StreamsyncState': + """ + get_clone clones the destination application state for the session. + + The class is rebuilt identically in the case where the user + has constructed a schema inherited from StreamsyncState + + >>> class AppSchema(StreamsyncState): + >>> counter: int + >>> + >>> root_state = AppSchema() + >>> clone_state = root_state.get_clone() # instance of AppSchema + """ try: cloned_user_state = copy.deepcopy(self.user_state.state) cloned_mail = copy.deepcopy(self.mail) @@ -344,22 +485,7 @@ def get_clone(self): "The state may contain unpickable objects, such as modules.", traceback.format_exc()) return substitute_state - return StreamsyncState(cloned_user_state, cloned_mail) - - def __getitem__(self, key: str) -> Any: - return self.user_state.__getitem__(key) - - def __setitem__(self, key: str, raw_value: Any) -> None: - self.user_state.__setitem__(key, raw_value) - - def __delitem__(self, key: str) -> Any: - return self.user_state.__delitem__(key) - - def remove(self, key: str) -> Any: - return self.__delitem__(key) - - def __contains__(self, key: str) -> bool: - return self.user_state.__contains__(key) + return self.__class__(cloned_user_state, cloned_mail) def add_mail(self, type: str, payload: Any) -> None: mail_item = { @@ -751,7 +877,7 @@ def get_context_data(self, instance_path: InstancePath) -> Dict[str, Any]: def set_state(self, expr: str, instance_path: InstancePath, value: Any) -> None: accessors = self.parse_expression(expr, instance_path) - state_ref: Any = self.ss.user_state + state_ref: StateProxy = self.ss.user_state for accessor in accessors[:-1]: state_ref = state_ref[accessor] @@ -1055,10 +1181,68 @@ def handle(self, ev: StreamsyncEvent) -> StreamsyncEventResult: return {"ok": ok, "result": result} -state_serialiser = StateSerialiser() -initial_state = StreamsyncState() -base_component_tree = ComponentTree() -session_manager = SessionManager() +class DictPropertyProxy: + """ + A descriptor based recipe that makes it possible to write shorthands + that forward attribute access from one object onto another. + + >>> class A: + >>> foo: int = DictPropertyProxy("proxy_state", "prop1") + >>> bar: int = DictPropertyProxy("proxy_state", "prop2") + >>> + >>> def __init__(self): + >>> self._state_proxy = StateProxy({"prop1": 1, "prop2": 2}) + >>> + >>> a = A() + >>> print(a.foo) + + This descriptor avoids writing the code below to establish a proxy + with a child instance + + >>> class A: + >>> + >>> def __init__(self): + >>> self._state_proxy = StateProxy({"prop1": 1, "prop2": 2}) + >>> + >>> @property + >>> def prop1(self): + >>> return self._state_proxy['prop1'] + >>> + >>> @foo.setter + >>> def prop1(self, value): + >>> self._state_proxy['prop1'] = value + >>> + """ + + def __init__(self, objectName, key): + self.objectName = objectName + self.key = key + + def __get__(self, instance, owner=None): + proxy = getattr(instance, self.objectName) + return proxy[self.key] + + def __set__(self, instance, value): + proxy = getattr(instance, self.objectName) + proxy[self.key] = value + +S = TypeVar("S", bound=StreamsyncState) + +def new_initial_state(klass: Type[S]) -> S: + """ + Initializes the initial state of the application and makes it globally accessible. + + The class used for the initial state must be a subclass of StreamsyncState. + + >>> class MyState(StreamsyncState): + >>> pass + >>> + >>> initial_state = new_initial_state(MyState) + """ + global initial_state + initial_state = klass() + + return initial_state def session_verifier(func: Callable) -> Callable: @@ -1071,3 +1255,11 @@ def wrapped(*args, **kwargs): session_manager.add_verifier(func) return wrapped + + + + +state_serialiser = StateSerialiser() +initial_state = StreamsyncState() +base_component_tree = ComponentTree() +session_manager = SessionManager() \ No newline at end of file diff --git a/tests/test_core.py b/tests/test_core.py index 73503bb86..4a85b5550 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -3,8 +3,10 @@ from typing import Dict import numpy as np -from streamsync.core import (BytesWrapper, Evaluator, EventDeserialiser, - FileWrapper, SessionManager, StateProxy, StateSerialiser, StateSerialiserException, StreamsyncState) +from streamsync.core import (BytesWrapper, ComponentTree, Evaluator, EventDeserialiser, + FileWrapper, SessionManager, State, StateSerialiser, StateSerialiserException, + StreamsyncState) + import streamsync as ss from streamsync.ss_types import StreamsyncEvent import pandas as pd @@ -55,8 +57,8 @@ class TestStateProxy: - sp = StateProxy(raw_state_dict) - sp_simple_dict = StateProxy(simple_dict) + sp = State(raw_state_dict)._state_proxy + sp_simple_dict = State(simple_dict)._state_proxy @classmethod def count_initial_mutations(cls, d, count=0): @@ -101,10 +103,6 @@ def test_mutations(self) -> None: assert m.get("+state\\.with\\.dots.photo\\.jpeg") == "Corrupted" assert len(m) == 1 - self.sp["new.state.with.dots"] = {"test": "test"} - m = self.sp.get_mutations_as_dict() - assert len(m) == 2 - d = self.sp.to_dict() assert d.get("age") == 2 assert d.get("interests") == ["lamps", "cars", "dogs"] @@ -127,12 +125,6 @@ def test_dictionary_removal(self) -> None: assert "+items" in m assert "-items.Lettuce" in m - # Non-explicit removal test - items = self.sp_simple_dict.state["items"].state - items = {k: v for k, v in items.items() if k != "Apple"} - self.sp_simple_dict["items"] = items - m = self.sp_simple_dict.get_mutations_as_dict() - assert "+items.Cucumber" in m def test_private_members(self) -> None: d = self.sp.to_dict() @@ -142,6 +134,102 @@ def test_private_members(self) -> None: class TestState: + def test_set_dictionary_in_a_state_should_transform_it_in_state_proxy_and_trigger_mutation(self): + """ + Tests that writing a dictionary in a State without schema is transformed into a StateProxy and + triggers mutations to update the interface + + >>> _state = streamsync.init_state({'app': {}}) + >>> _state["app"] = {"hello": "world"} + """ + _state = State() + + # When + _state["new.state.with.dots"] = {"test": "test"} + + m = _state._state_proxy.get_mutations_as_dict() + assert m == { + r"+new\.state\.with\.dots": None, + r"+new\.state\.with\.dots.test": "test" + } + + def test_set_dictionary_in_a_state_with_schema_should_transform_it_in_state_proxy_and_trigger_mutation(self): + class SimpleSchema(State): + app: dict + + _state = SimpleSchema() + + # When + _state["app"] = {"hello": "world"} + + m = _state._state_proxy.get_mutations_as_dict() + assert m == { + r"+app": {"hello": "world"}, + } + + def test_replace_dictionary_content_in_a_state_with_schema_should_transform_it_in_state_proxy_and_trigger_mutation(self): + """ + Tests that replacing a dictionary content in a State without schema trigger mutations on all the children. + + >>> _state = State({'items': {}}) + >>> _state["items"] = {k: v for k, v in _state["items"].items() if k != "Apple"} + """ + + _state = State({"items": { + "Apple": {"name": "Apple", "type": "fruit"}, + "Cucumber": {"name": "Cucumber", "type": "vegetable"}, + "Lettuce": {"name": "Lettuce", "type": "vegetable"} + }}) + + # When + items = _state['items'] + items = {k: v for k, v in items.items() if k != "Apple"} + _state["items"] = items + + # Then + m = _state._state_proxy.get_mutations_as_dict() + assert m == { + '+items': None, + '+items.Cucumber': None, + '+items.Cucumber.name': 'Cucumber', + '+items.Cucumber.type': 'vegetable', + '+items.Lettuce': None, + '+items.Lettuce.name': 'Lettuce', + '+items.Lettuce.type': 'vegetable' + } + + def test_changing_a_value_in_substate_is_accessible_and_mutations_are_present(self): + """ + Tests that the change of values in a child state is readable whatever the access mode and + that mutations are triggered + + >>> _state = ComplexSchema({'app': {'title': ''}}) + >>> _state.app.title = 'world' + """ + class AppState(State): + title: str + + class ComplexSchema(State): + app: AppState + + _state = ComplexSchema({'app': {'title': ''}}) + + # When + _state.app.title = 'world' + + # Then + assert _state.app.title == 'world' + assert _state['app']['title'] == 'world' + assert _state.app['title'] == 'world' + assert _state['app'].title == 'world' + assert _state._state_proxy.get_mutations_as_dict() == { + '+app': None, + '+app.title': 'world', + } + + +class TestStreamsyncState: + # Initialised manually base_s = StreamsyncState(raw_state_dict) diff --git a/tests/test_init_state.py b/tests/test_init_state.py new file mode 100644 index 000000000..55d3d2069 --- /dev/null +++ b/tests/test_init_state.py @@ -0,0 +1,100 @@ +import contextlib +from typing import Optional + +import streamsync.core +from streamsync.core import StreamsyncState, State + +@contextlib.contextmanager +def use_dedicated_streamsync_initial_state(): + """ + + Returns + ------- + + """ + previous_state = streamsync.core.initial_state + yield + streamsync.core.initial_state = previous_state + +def test_init_state_should_build_simple_streamsync_state_without_schema(): + """ + Tests that `streamsync.init_state` without schema returns a StreamsyncState object. + """ + with use_dedicated_streamsync_initial_state(): + # When + state = streamsync.init_state({}) + + # Then + assert isinstance(state, StreamsyncState) + + +def test_init_state_with_schema_should_inherits_streamsync_state(): + """ + Tests that `streamsync.init_state` with schema returns a StreamsyncState object. + """ + class SimpleSchema(StreamsyncState): + value: int + message: Optional[str] + + with use_dedicated_streamsync_initial_state(): + # When + state = streamsync.init_state({'value': 1, 'message': None, 'hello': 2}, schema=SimpleSchema) + + # Then + assert isinstance(state, SimpleSchema) + assert state.value == 1 + assert state.message is None + assert state['hello'] == 2 + + +def test_init_state_should_build_a_state_with_a_schema_that_contains_a_substate(): + """ + Tests that `streamsync.init_state` constructs an instance with schema that contains a substate. + """ + class AppState(State): + title: str + + class ComplexSchema(StreamsyncState): + app: AppState + value: int + message: Optional[str] + + with use_dedicated_streamsync_initial_state(): + # When + state = streamsync.init_state({'app': {'title': 'hello'}, 'value': 1, 'message': None, 'hello': 2}, schema=ComplexSchema) + + # Then + assert isinstance(state, ComplexSchema) + assert isinstance(state.app, AppState) + assert state.app.title == 'hello' + assert state['hello'] == 2 + + # A State Proxy instance is directly linked to the State Proxy instance of a child state + assert state._state_proxy.state['app'] is state.app._state_proxy + + + + + +def test_init_state_with_state_accept_raw_values_that_deviates_from_the_expected_type(): + """ + Tests that `streamsync.init_state` with schema accepts in raw state values that deviate from the expected type. + """ + class AppState(State): + year: int + + class ComplexSchema(StreamsyncState): + app: AppState + value: int + message: Optional[str] + + with use_dedicated_streamsync_initial_state(): + # When + state = streamsync.init_state({'app': {'year': 'hello'}, 'value': 1, 'message': None, 'hello': 2}, schema=ComplexSchema) + + # Then + assert isinstance(state, ComplexSchema) + assert state.value == 1 + assert state.app.year == 'hello' + assert state.message is None + assert state['hello'] == 2 \ No newline at end of file From 723630b52215f023d704404d501a58d0a06045e8 Mon Sep 17 00:00:00 2001 From: Fabien Arcellier Date: Tue, 5 Mar 2024 20:54:39 +0100 Subject: [PATCH 2/4] feat: declare optional schema on streamsync state * fix: issue on dictionary assignment --- src/streamsync/core.py | 29 ++++++++++++++++++++++++++--- tests/test_core.py | 19 +++++++++++++++++-- 2 files changed, 43 insertions(+), 5 deletions(-) diff --git a/src/streamsync/core.py b/src/streamsync/core.py index 2af25316b..04a9dca93 100644 --- a/src/streamsync/core.py +++ b/src/streamsync/core.py @@ -238,8 +238,28 @@ def remove(self, key) -> None: def _apply_raw(self, key) -> None: self.mutated.add(key) - def apply(self, key) -> None: - self._apply_raw(f"+{key}") + def apply_mutation_marker(self, key: Optional[str] = None, recursive: bool = False) -> None: + """ + Adds the mutation marker to a state. The mutation marker is used to track changes in the state. + + >>> self.apply_mutation_marker() + + Add the mutation marker on a specific field + + >>> self.apply_mutation_marker("field") + + Add the mutation marker to a state and all of its children + + >>> self.apply_mutation_marker(recursive=True) + """ + keys = [key] if key is not None else self.state.keys() + + for k in keys: + self._apply_raw(f"+{k}") + if recursive is True: + value = self.state[k] + if isinstance(value, StateProxy): + value.apply_mutation_marker(recursive=True) @staticmethod def escape_key(key): @@ -260,7 +280,7 @@ def carry_mutation_flag(base_key, child_key): serialised_value = None if isinstance(value, StateProxy): - if value.initial_assignment: + if f"+{key}" in self.mutated: serialised_mutations[f"+{escaped_key}"] = serialised_value value.initial_assignment = False child_mutations = value.get_mutations_as_dict() @@ -437,6 +457,9 @@ def _set_state_item(self, key: str, value: Any): state.ingest(value) self._state_proxy[key] = state._state_proxy else: + if isinstance(value, StateProxy): + value.apply_mutation_marker(recursive=True) + self._state_proxy[key] = value diff --git a/tests/test_core.py b/tests/test_core.py index 4a85b5550..7052e9967 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -109,7 +109,7 @@ def test_mutations(self) -> None: assert d.get("features").get("height") == "short" assert d.get("state.with.dots").get("photo.jpeg") == "Corrupted" - self.sp.apply("age") + self.sp.apply_mutation_marker("age") m = self.sp.get_mutations_as_dict() assert m.get("+age") == 2 @@ -171,16 +171,31 @@ def test_replace_dictionary_content_in_a_state_with_schema_should_transform_it_i """ Tests that replacing a dictionary content in a State without schema trigger mutations on all the children. + This processing must work after initialization and after recovering the mutations the first time. + >>> _state = State({'items': {}}) >>> _state["items"] = {k: v for k, v in _state["items"].items() if k != "Apple"} """ - _state = State({"items": { "Apple": {"name": "Apple", "type": "fruit"}, "Cucumber": {"name": "Cucumber", "type": "vegetable"}, "Lettuce": {"name": "Lettuce", "type": "vegetable"} }}) + m = _state._state_proxy.get_mutations_as_dict() + assert m == { + '+items': None, + '+items.Apple': None, + '+items.Apple.name': "Apple", + '+items.Apple.type': "fruit", + '+items.Cucumber': None, + '+items.Cucumber.name': 'Cucumber', + '+items.Cucumber.type': 'vegetable', + '+items.Lettuce': None, + '+items.Lettuce.name': 'Lettuce', + '+items.Lettuce.type': 'vegetable' + } + # When items = _state['items'] items = {k: v for k, v in items.items() if k != "Apple"} From 1f7d38c608278438c5de9b5119acd0913a93d261 Mon Sep 17 00:00:00 2001 From: Fabien Arcellier Date: Tue, 5 Mar 2024 21:17:11 +0100 Subject: [PATCH 3/4] feat: declare optional schema on streamsync state * refact: add unit test on apply_mutation_marker method --- tests/test_core.py | 72 +++++++++++++++++++++++++++++++++++++--------- 1 file changed, 59 insertions(+), 13 deletions(-) diff --git a/tests/test_core.py b/tests/test_core.py index 7052e9967..ed6f65f92 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1,5 +1,6 @@ import json import math +import unittest from typing import Dict import numpy as np @@ -55,10 +56,11 @@ session.session_component_tree.ingest(sc) -class TestStateProxy: +class TestStateProxy(unittest.TestCase): - sp = State(raw_state_dict)._state_proxy - sp_simple_dict = State(simple_dict)._state_proxy + def setUp(self): + self.sp = State(raw_state_dict)._state_proxy + self.sp_simple_dict = State(simple_dict)._state_proxy @classmethod def count_initial_mutations(cls, d, count=0): @@ -109,15 +111,59 @@ def test_mutations(self) -> None: assert d.get("features").get("height") == "short" assert d.get("state.with.dots").get("photo.jpeg") == "Corrupted" - self.sp.apply_mutation_marker("age") - m = self.sp.get_mutations_as_dict() - - assert m.get("+age") == 2 del self.sp["best_feature"] m = self.sp.get_mutations_as_dict() assert "-best_feature" in m + def test_apply_mutation_marker(self) -> None: + self.sp.get_mutations_as_dict() + self.sp_simple_dict.get_mutations_as_dict() + + # Apply the mutation to a specific key + self.sp.apply_mutation_marker("age") + m = self.sp.get_mutations_as_dict() + assert m == { + '+age': 1 + } + + # Apply the mutation to the state as a whole + self.sp.apply_mutation_marker() + m = self.sp.get_mutations_as_dict() + assert m == { + '+age': 1, + '+best_feature': 'eyes', + '+counter': 4, + '+features': None, + '+interests': ['lamps', 'cars'], + '+name': 'Robert', + '+state\\.with\\.dots': None, + '+utfࠀ': 23 + } + + self.sp_simple_dict.apply_mutation_marker() + m = self.sp_simple_dict.get_mutations_as_dict() + assert m == { + '+items': None + } + + # Apply the mutation to the state as a whole and on all its children + self.sp_simple_dict.apply_mutation_marker(recursive=True) + m = self.sp_simple_dict.get_mutations_as_dict() + assert m == { + '+items': None, + '+items.Apple': None, + '+items.Apple.name': 'Apple', + '+items.Apple.type': 'fruit', + '+items.Cucumber': None, + '+items.Cucumber.name': 'Cucumber', + '+items.Cucumber.type': 'vegetable', + '+items.Lettuce': None, + '+items.Lettuce.name': 'Lettuce', + '+items.Lettuce.type': 'vegetable' + } + + def test_dictionary_removal(self) -> None: # Explicit removal test del self.sp_simple_dict["items"]["Lettuce"] @@ -139,8 +185,8 @@ def test_set_dictionary_in_a_state_should_transform_it_in_state_proxy_and_trigge Tests that writing a dictionary in a State without schema is transformed into a StateProxy and triggers mutations to update the interface - >>> _state = streamsync.init_state({'app': {}}) - >>> _state["app"] = {"hello": "world"} + #>>> _state = streamsync.init_state({'app': {}}) + #>>> _state["app"] = {"hello": "world"} """ _state = State() @@ -173,8 +219,8 @@ def test_replace_dictionary_content_in_a_state_with_schema_should_transform_it_i This processing must work after initialization and after recovering the mutations the first time. - >>> _state = State({'items': {}}) - >>> _state["items"] = {k: v for k, v in _state["items"].items() if k != "Apple"} + #>>> _state = State({'items': {}}) + #>>> _state["items"] = {k: v for k, v in _state["items"].items() if k != "Apple"} """ _state = State({"items": { "Apple": {"name": "Apple", "type": "fruit"}, @@ -218,8 +264,8 @@ def test_changing_a_value_in_substate_is_accessible_and_mutations_are_present(se Tests that the change of values in a child state is readable whatever the access mode and that mutations are triggered - >>> _state = ComplexSchema({'app': {'title': ''}}) - >>> _state.app.title = 'world' + #>>> _state = ComplexSchema({'app': {'title': ''}}) + #>>> _state.app.title = 'world' """ class AppState(State): title: str From 677be06406b6b0db26e2e85f59a0d3b9badd3c92 Mon Sep 17 00:00:00 2001 From: Fabien Arcellier Date: Thu, 7 Mar 2024 15:48:11 +0100 Subject: [PATCH 4/4] feat: declare optional schema on streamsync state * fix: issue on substate assignation --- src/streamsync/__init__.py | 5 +- src/streamsync/core.py | 95 +++++++++++++++++++++++++++++++------- tests/test_core.py | 29 ++++++++++++ 3 files changed, 110 insertions(+), 19 deletions(-) diff --git a/src/streamsync/__init__.py b/src/streamsync/__init__.py index 958116d5d..62a556fee 100644 --- a/src/streamsync/__init__.py +++ b/src/streamsync/__init__.py @@ -59,7 +59,7 @@ def init_ui() -> StreamsyncUIManager: return StreamsyncUIManager() -def init_state(state_dict: Dict[str, Any], schema: Optional[Type[S]] = None) -> Union[S, StreamsyncState]: +def init_state(raw_state: Dict[str, Any], schema: Optional[Type[S]] = None) -> Union[S, StreamsyncState]: """ Sets the initial state, which will be used as the starting point for every session. @@ -80,6 +80,5 @@ def init_state(state_dict: Dict[str, Any], schema: Optional[Type[S]] = None) -> if not issubclass(concrete_schema, StreamsyncState): raise ValueError("Root schema must inherit from StreamsyncState") - _initial_state: S = new_initial_state(concrete_schema) - _initial_state.ingest(state_dict) + _initial_state: S = new_initial_state(concrete_schema, raw_state) return _initial_state diff --git a/src/streamsync/core.py b/src/streamsync/core.py index 04a9dca93..0b020babe 100644 --- a/src/streamsync/core.py +++ b/src/streamsync/core.py @@ -8,7 +8,8 @@ import sys import time import traceback -from typing import Any, Callable, Dict, List, Literal, Optional, Set, Tuple, Union, TypeVar, Type, Sequence, cast +from typing import Any, Callable, Dict, List, Literal, Optional, Set, Tuple, Union, TypeVar, Type, Sequence, cast, \ + Generator import urllib.request import base64 import io @@ -321,6 +322,24 @@ def to_dict(self) -> Dict[str, Any]: serialised[key] = serialised_value return serialised + def to_raw_state(self): + """ + Converts a StateProxy and its children into a python dictionary. + + >>> state = State({'a': 1, 'c': {'a': 1, 'b': 3}}) + >>> _raw_state = state._state_proxy.to_raw_state() + >>> {'a': 1, 'c': {'a': 1, 'b': 3}} + + :return: a python dictionary that represents the raw state + """ + raw_state = {} + for key, value in self.state.items(): + if isinstance(value, StateProxy): + value = value.to_raw_state() + raw_state[key] = value + + return raw_state + def get_annotations(instance) -> Dict[str, Any]: """ @@ -399,9 +418,16 @@ def __init__(self, raw_state: Dict[str, Any] = {}): def ingest(self, raw_state: Dict[str, Any]) -> None: """ hydrates a state from raw data by applying a schema when it is provided. + The existing content in the state is erased. + + + >>> state = StreamsyncState({'message': "hello world"}) + >>> state.ingest({'a': 1, 'b': 2}) + >>> {'a': 1, 'b': 2} """ self._state_proxy.state = {} for key, value in raw_state.items(): + assert not isinstance(value, StateProxy), f"state proxy datatype is not expected in ingest operation, {locals()}" self._set_state_item(key, value) def to_dict(self) -> dict: @@ -415,18 +441,37 @@ def to_dict(self) -> dict: """ return self._state_proxy.to_dict() + + def to_raw_state(self) -> dict: + """ + Converts a StateProxy and its children into a python dictionary that can be used to recreate the + state from scratch. + + >>> state = StreamsyncState({'a': 1, 'c': {'a': 1, 'b': 3}}) + >>> raw_state = state.to_raw_state() + >>> "{'a': 1, 'c': {'a': 1, 'b': 3}}" + + :return: a python dictionary that represents the raw state + """ + return self._state_proxy.to_raw_state() + def __repr__(self) -> str: return self._state_proxy.__repr__() def __getitem__(self, key: str) -> Any: - annotations = get_annotations(self) - expected_type = annotations.get(key) - if expected_type is not None and inspect.isclass(expected_type) and issubclass(expected_type, State): - return getattr(self, key) - else: - return self._state_proxy.__getitem__(key) + + # Essential to support operation like + # state['item']['a'] = state['item']['b'] + if hasattr(self, key): + value = getattr(self, key) + if isinstance(value, State): + return value + + return self._state_proxy.__getitem__(key) def __setitem__(self, key: str, raw_value: Any) -> None: + assert not isinstance(raw_value, StateProxy), f"state proxy datatype is not expected, {locals()}" + self._set_state_item(key, raw_value) def __delitem__(self, key: str) -> Any: @@ -435,12 +480,26 @@ def __delitem__(self, key: str) -> Any: def remove(self, key: str) -> Any: return self.__delitem__(key) + def items(self) -> Generator[Tuple[str, Any], None, None]: + for k, v in self._state_proxy.items(): + if isinstance(v, StateProxy): + # We don't want to expose StateProxy to the user, so + # we replace it with relative State + yield k, getattr(self, k) + else: + yield k, v + def __contains__(self, key: str) -> bool: return self._state_proxy.__contains__(key) def _set_state_item(self, key: str, value: Any): """ """ + + """ + At this level, the values that arrive are either States which encapsulate a StateProxy, or another datatype. + If there is a StateProxy, it is a fault in the code. + """ annotations = get_annotations(self) expected_type = annotations.get(key, None) expect_dict = expected_type is not None and inspect.isclass(expected_type) and issubclass(expected_type, dict) @@ -457,10 +516,11 @@ def _set_state_item(self, key: str, value: Any): state.ingest(value) self._state_proxy[key] = state._state_proxy else: - if isinstance(value, StateProxy): - value.apply_mutation_marker(recursive=True) - - self._state_proxy[key] = value + if isinstance(value, State): + value._state_proxy.apply_mutation_marker(recursive=True) + self._state_proxy[key] = value._state_proxy + else: + self._state_proxy[key] = value class StreamsyncState(State): @@ -495,11 +555,11 @@ def get_clone(self) -> 'StreamsyncState': >>> class AppSchema(StreamsyncState): >>> counter: int >>> - >>> root_state = AppSchema() + >>> root_state = AppSchema({'counter': 1}) >>> clone_state = root_state.get_clone() # instance of AppSchema """ try: - cloned_user_state = copy.deepcopy(self.user_state.state) + cloned_user_state = copy.deepcopy(self.user_state.to_raw_state()) cloned_mail = copy.deepcopy(self.mail) except BaseException: substitute_state = StreamsyncState() @@ -1251,7 +1311,7 @@ def __set__(self, instance, value): S = TypeVar("S", bound=StreamsyncState) -def new_initial_state(klass: Type[S]) -> S: +def new_initial_state(klass: Type[S], raw_state: dict) -> S: """ Initializes the initial state of the application and makes it globally accessible. @@ -1260,10 +1320,13 @@ def new_initial_state(klass: Type[S]) -> S: >>> class MyState(StreamsyncState): >>> pass >>> - >>> initial_state = new_initial_state(MyState) + >>> initial_state = new_initial_state(MyState, {}) """ global initial_state - initial_state = klass() + if raw_state is None: + raw_state = {} + + initial_state = klass(raw_state) return initial_state diff --git a/tests/test_core.py b/tests/test_core.py index ed6f65f92..5ae3cf81c 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -177,6 +177,14 @@ def test_private_members(self) -> None: assert d.get("_private") is None assert d.get("_private_unserialisable") is None + def test_to_raw_state(self) -> None: + """ + Test that `to_raw_state` returns the state in its original format + """ + assert self.sp.to_raw_state() == raw_state_dict + assert self.sp_simple_dict.to_raw_state() == simple_dict + + class TestState: @@ -288,6 +296,27 @@ class ComplexSchema(State): '+app.title': 'world', } + def test_remove_then_replace_nested_dictionary_should_trigger_mutation(self): + """ + Tests that deleting a key from a substate, then replacing it, triggers the expected mutations + """ + # Assign + _state = State({"nested": {"a": 1, "b": 2, "c": {"d": 3, "e": 4}}}) + m = _state._state_proxy.get_mutations_as_dict() + + # Acts + del _state["nested"]["c"]["e"] + _state['nested']['c'] = _state['nested']['c'] + + # Assert + m = _state._state_proxy.get_mutations_as_dict() + assert m == { + '+nested.c': None, + '+nested.c.d': 3, + '-nested.c.e': None + } + assert _state.to_dict() == {"nested": {"a": 1, "b": 2, "c": {"d": 3}}} + class TestStreamsyncState: