Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat:declare optional schema on streamsync state #278

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/docs/.vitepress/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ export default {
{ text: "Page routes", link: "/page-routes" },
{ text: "Sessions", link: "/sessions" },
{ text: "Custom server", link: "/custom-server" },
{ text: "State schema", link: "/state-schema" },
],
},
{
Expand Down
123 changes: 123 additions & 0 deletions docs/docs/state-schema.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
# State schema

Schema declaration on the [Application state](./application-state) allows Streamsync to handle complex serialization
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some minor comments about style, e.g. British spelling, capitalisation of some words, etc, but I'll take care of them. Nice docs though!

scenario and empower your IDE and toolchains to provide autocomplete and type checking.

## Schema declaration

```python
import streamsync as ss

class AppSchema(ss.StreamsyncState):
counter: int

initial_state = ss.init_state({
"counter": 0
}, schema=AppSchema)

# Event handler
# It receives the session state as an argument and mutates it
def increment(state: AppSchema):
state.counter += 1
```

Access to an attribute by its key is always possible.

```python
def increment(state: AppSchema):
state['counter'] += 1
```

Attributes missing from the schema remain accessible by their key.

```python
initial_state = ss.init_state({
"counter": 0,
"message": None
}, schema=AppSchema)

def increment(state: AppSchema):
state['message'] = "Hello pigeon"
```

## Schema composition

Schema composition allows you to model a complex Application state.

```python
class MyappSchema(ss.State):
title: str

class AppSchema(ss.StreamsyncState):
my_app: MyappSchema
counter: int

initial_state = ss.init_state({
"counter": 0,
"my_app": {
"title": "Nested value"
}
}, schema=AppSchema)
```

## Multi-level dictionary

Some components like Vega require specifying a graph in the form of a multi-level dictionary.

A schema allows you to specify to streamsync that an attribute which contains a dictionary
must be treated as a dictionary and not as a group of state.

```python
class AppSchema(ss.StreamsyncState):
vegas_graph: dict

# Without schema, this handler is execute only once
def handle_vega_graph(state: AppSchema):
graph = state.vega_graph
graph['data']['values'][0]['b'] += 1000
state.vega_graph = graph

initial_state = ss.init_state({
"vegas_graph": {
"data": {
"values": [
{"a": "C", "b": 2}, {"a": "C", "b": 7}, {"a": "C", "b": 4},
{"a": "D", "b": 1}, {"a": "D", "b": 2}, {"a": "D", "b": 6},
{"a": "E", "b": 8}, {"a": "E", "b": 4}, {"a": "E", "b": 7}
]
},
"mark": "bar",
"encoding": {
"x": {"field": "a", "type": "nominal"},
"y": {"aggregate": "average", "field": "b", "type": "quantitative"}
}
},
}, schema=AppSchema)
```

## Type checking

A schema allows you to check the integrity of your backend using the type system.
The code below will raise an error with mypy.

```bash
$ mypy apps/myapp/main.py
apps/myapp/main.py:7: error: "AppSchema" has no attribute "countr"; maybe "counter"? [attr-defined]
```

Here is the code, can you spot the error ?

```python
import streamsync as ss

class AppSchema(ss.StreamsyncState):
counter: int

def increment(state: AppSchema):
state.countr += 1

initial_state = ss.init_state({
"counter": 26,
}, schema=AppSchema)
```

43 changes: 31 additions & 12 deletions src/streamsync/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import importlib.metadata
from typing import Any, Dict, Optional, Union
from typing import Union, Optional, Dict, Any, Type, TypeVar, cast

from streamsync.core import (BytesWrapper, Config, FileWrapper, Readable,
base_component_tree, initial_state,
session_manager, session_verifier)
from streamsync.core import Readable, FileWrapper, BytesWrapper, Config, StreamsyncState
from streamsync.core import new_initial_state, base_component_tree, session_manager, session_verifier
from streamsync.ui import StreamsyncUIManager

VERSION = importlib.metadata.version("streamsync")
Expand Down Expand Up @@ -31,17 +33,7 @@ def pack_bytes(raw_data, mime_type: Optional[str] = None):

return BytesWrapper(raw_data, mime_type)


def init_state(state_dict: Dict[str, Any]):
"""
Sets the initial state, which will be used as the starting point for
every session.
"""

initial_state.user_state.state = {}
initial_state.user_state.ingest(state_dict)
return initial_state

S = TypeVar('S', bound=StreamsyncState)

def init_ui() -> StreamsyncUIManager:
"""Initializes and returns an instance of StreamsyncUIManager.
Expand All @@ -58,8 +50,35 @@ def init_ui() -> StreamsyncUIManager:

**Example**::

>>> import streamsync as ss
>>>
>>> with ss.init_ui() as ui:
>>> with ui.Page({"key": "hello"}):
>>> ui.Text({"text": "Hello pigeons"})
"""
return StreamsyncUIManager()


def init_state(raw_state: Dict[str, Any], schema: Optional[Type[S]] = None) -> Union[S, StreamsyncState]:
"""
Sets the initial state, which will be used as the starting point for
every session.

initial_state.user_state.state = {}
initial_state.user_state.ingest(state_dict)
return initial_state



>>> import streamsync as ss

>>> initial_state = ss.init_state({
>>> "counter": 0,
>>> }, schema=AppSchema)
"""
concrete_schema = cast(Type[S], StreamsyncState if schema is None else schema)
if not issubclass(concrete_schema, StreamsyncState):
raise ValueError("Root schema must inherit from StreamsyncState")

_initial_state: S = new_initial_state(concrete_schema, raw_state)
return _initial_state
14 changes: 11 additions & 3 deletions src/streamsync/app_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ def _execute_user_code(self) -> None:
captured_stdout = f.getvalue()

if captured_stdout:
streamsync.initial_state.add_log_entry(
streamsync.core.initial_state.add_log_entry(
"info", "Stdout message during initialisation", captured_stdout)

def _apply_configuration(self) -> None:
Expand Down Expand Up @@ -326,7 +326,7 @@ def _main(self) -> None:
try:
streamsync.base_component_tree.ingest(self.bmc_components)
except BaseException:
streamsync.initial_state.add_log_entry(
streamsync.core.initial_state.add_log_entry(
"error", "UI Components Error", "Couldn't load components. An exception was raised.", tb.format_exc())
if self.mode == "run":
terminate_early = True
Expand All @@ -336,14 +336,22 @@ def _main(self) -> None:
except BaseException:
# Initialisation errors will be sent to all sessions via mail during session initialisation

streamsync.initial_state.add_log_entry(
streamsync.core.initial_state.add_log_entry(
"error", "Code Error", "Couldn't execute code. An exception was raised.", tb.format_exc())

# Exit if in run mode

if self.mode == "run":
terminate_early = True

try:
streamsync.base_component_tree.ingest(self.bmc_components)
except BaseException:
streamsync.core.initial_state.add_log_entry(
"error", "UI Components Error", "Couldn't load components. An exception was raised.", tb.format_exc())
if self.mode == "run":
terminate_early = True

if terminate_early:
self._terminate_early()
return
Expand Down
Loading
Loading