Skip to content

Commit

Permalink
Merge pull request #229 from mmikita95/async-handlers
Browse files Browse the repository at this point in the history
Asynchronous handlers support
  • Loading branch information
ramedina86 authored Feb 12, 2024
2 parents e10ed00 + 70242a3 commit 45d0ee0
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 8 deletions.
31 changes: 27 additions & 4 deletions src/streamsync/core.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import asyncio
import contextlib
import copy
import datetime
import inspect
import logging
import secrets
import sys
import threading
import time
import traceback
from typing import Any, Callable, Dict, List, Literal, Optional, Set, Tuple, Union
Expand Down Expand Up @@ -978,6 +980,7 @@ def __init__(self, session: StreamsyncSession) -> None:
self.deser = EventDeserialiser(self.session_state)
self.evaluator = Evaluator(self.session_state)


def _handle_binding(self, event_type, target_component, instance_path, payload) -> None:
if not target_component.binding:
return
Expand All @@ -986,6 +989,22 @@ def _handle_binding(self, event_type, target_component, instance_path, payload)
return
self.evaluator.set_state(binding["stateRef"], instance_path, payload)

def _async_handler_executor(self, callable_handler, arg_values):
async_callable = self._async_handler_executor_internal(callable_handler, arg_values)
return asyncio.run(async_callable)

async def _async_handler_executor_internal(self, callable_handler, arg_values):
with contextlib.redirect_stdout(io.StringIO()) as f:
result = await callable_handler(*arg_values)
captured_stdout = f.getvalue()
return result, captured_stdout

def _sync_handler_executor(self, callable_handler, arg_values):
with contextlib.redirect_stdout(io.StringIO()) as f:
result = callable_handler(*arg_values)
captured_stdout = f.getvalue()
return result, captured_stdout

def _call_handler_callable(self, event_type, target_component, instance_path, payload) -> Any:
streamsyncuserapp = sys.modules.get("streamsyncuserapp")
if streamsyncuserapp is None:
Expand All @@ -1001,8 +1020,10 @@ def _call_handler_callable(self, event_type, target_component, instance_path, pa
raise ValueError(
f"""Invalid handler. Couldn't find the handler "{ handler }".""")
callable_handler = getattr(streamsyncuserapp, handler)
is_async_handler = inspect.iscoroutinefunction(callable_handler)

if not callable(callable_handler):
if (not callable(callable_handler)
and not is_async_handler):
raise ValueError(
"Invalid handler. The handler isn't a callable object.")

Expand All @@ -1025,9 +1046,11 @@ def _call_handler_callable(self, event_type, target_component, instance_path, pa
arg_values.append(session_info)

result = None
with contextlib.redirect_stdout(io.StringIO()) as f:
result = callable_handler(*arg_values)
captured_stdout = f.getvalue()
if is_async_handler:
result, captured_stdout = self._async_handler_executor(callable_handler, arg_values)
else:
result, captured_stdout = self._sync_handler_executor(callable_handler, arg_values)

if captured_stdout:
self.session_state.add_log_entry(
"info",
Expand Down
43 changes: 43 additions & 0 deletions tests/test_app_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,15 @@ class TestAppRunner:
{"componentId": "6010765e-9ac3-4570-84bf-913ae404e03a", "instanceNumber": 0},
{"componentId": "c282ad31-2487-4296-a944-508c167c43be", "instanceNumber": 0}
]

async_handler_click_path = [
{"componentId": "root", "instanceNumber": 0},
{"componentId": "bb4d0e86-619e-4367-a180-be28ab6059f4", "instanceNumber": 0},
{"componentId": "92a2c0c8-7ab4-4865-b7eb-ed437408c8f5", "instanceNumber": 0},
{"componentId": "d1e01ce1-fab1-4a6e-91a1-1f45f9e57aa5", "instanceNumber": 0},
{"componentId": "9c30af6d-4ee5-4782-9169-0f361d67fa76", "instanceNumber": 0},
{"componentId": "nyo5vc79sb031yz8", "instanceNumber": 0}
]
proposed_session_id = "c13a280fe17ec663047ec14de15cd93ad686fecf5f9a4dbf262d3a86de8cb577"

def test_init_wrong_path(self) -> None:
Expand Down Expand Up @@ -103,6 +112,40 @@ async def test_valid_event(self) -> None:
"+b.pet_count") == 129673
ar.shut_down()

@pytest.mark.asyncio
async def test_async_handler(self) -> None:
ar = AppRunner(test_app_dir, "run")
ar.load()
si = InitSessionRequest(
type="sessionInit",
payload=InitSessionRequestPayload(
cookies={},
headers={},
proposedSessionId=self.proposed_session_id
)
)
sres = await ar.dispatch_message(None, si)
assert sres.status == "ok"
assert sres.payload.model_dump().get("sessionId") == self.proposed_session_id

# Firing an event to bypass "initial" state mutations
ev_req = EventRequest(type="event", payload=StreamsyncEvent(
type="ss-number-change",
instancePath=self.numberinput_instance_path,
payload="129673"
))
ev_res = await ar.dispatch_message(self.proposed_session_id, ev_req)

ev_req = EventRequest(type="event", payload=StreamsyncEvent(
type="ss-click",
instancePath=self.async_handler_click_path
))
ev_res = await ar.dispatch_message(self.proposed_session_id, ev_req)
assert ev_res.status == "ok"
assert ev_res.payload.result.get("ok") == True
assert "+counter" in ev_res.payload.mutations
ar.shut_down()

@pytest.mark.asyncio
async def test_bad_event_handler(self) -> None:
ar = AppRunner(test_app_dir, "run")
Expand Down
6 changes: 6 additions & 0 deletions tests/testapp/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import logging
import streamsync.core
import altair as alt
import asyncio


@ss.session_verifier
Expand Down Expand Up @@ -93,6 +94,11 @@ def handle_form_submit(state):
def handle_add_to_list(state, context):
state["order_list"] += [context["itemId"]]


async def test_async_handler(state):
await asyncio.sleep(3)
state["counter"] += 1*my_var

# Filters data and triggers updates.


Expand Down
32 changes: 28 additions & 4 deletions tests/testapp/ui.json
Original file line number Diff line number Diff line change
Expand Up @@ -2050,7 +2050,7 @@
"text": "Increment"
},
"parentId": "9c30af6d-4ee5-4782-9169-0f361d67fa76",
"position": 8,
"position": 9,
"handlers": {
"click": "increment"
},
Expand Down Expand Up @@ -2752,7 +2752,7 @@
"options": "{\n \"ar\": \"Argentina\",\n \"uk\": \"United Kingdom\"\n}"
},
"parentId": "9c30af6d-4ee5-4782-9169-0f361d67fa76",
"position": 9,
"position": 11,
"handlers": {
"ss-option-change": "update_cities"
},
Expand All @@ -2766,7 +2766,7 @@
"options": "@{cities}"
},
"parentId": "9c30af6d-4ee5-4782-9169-0f361d67fa76",
"position": 10,
"position": 12,
"handlers": {},
"visible": true
},
Expand Down Expand Up @@ -2802,7 +2802,7 @@
"passwordMode": "yes"
},
"parentId": "9c30af6d-4ee5-4782-9169-0f361d67fa76",
"position": 11,
"position": 13,
"handlers": {},
"visible": true
},
Expand Down Expand Up @@ -2836,6 +2836,30 @@
"stateRef": "b.language"
},
"visible": true
},
"nyo5vc79sb031yz8": {
"id": "nyo5vc79sb031yz8",
"type": "button",
"content": {
"text": "Increment asynchronically"
},
"parentId": "9c30af6d-4ee5-4782-9169-0f361d67fa76",
"position": 10,
"handlers": {
"ss-click": "test_async_handler"
},
"visible": true
},
"3apnnxxg7pubdeqp": {
"id": "3apnnxxg7pubdeqp",
"type": "text",
"parentId": "9c30af6d-4ee5-4782-9169-0f361d67fa76",
"content": {
"text": "@{counter}"
},
"handlers": {},
"position": 8,
"visible": true
}
}
}

0 comments on commit 45d0ee0

Please sign in to comment.