Skip to content

Commit

Permalink
fix: fastapi lifespan management with multiple apps at once
Browse files Browse the repository at this point in the history
* introduce streamsync.serve.lifespan to gather lifespan of different streamsync application
  • Loading branch information
FabienArcellier committed Feb 14, 2024
1 parent 9e001d6 commit 68ef42a
Show file tree
Hide file tree
Showing 12 changed files with 478 additions and 4 deletions.
2 changes: 1 addition & 1 deletion docs/docs/custom-server.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import uvicorn
import streamsync.serve
from fastapi import FastAPI, Response

root_asgi_app = FastAPI()
root_asgi_app = FastAPI(lifespan=streamsync.serve.lifespan)
sub_asgi_app_1 = streamsync.serve.get_asgi_app("../app1", "run")
sub_asgi_app_2 = streamsync.serve.get_asgi_app("../app2", "run")

Expand Down
62 changes: 61 additions & 1 deletion src/streamsync/serve.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import dataclasses
import mimetypes
from contextlib import asynccontextmanager
import sys
Expand All @@ -8,6 +9,7 @@
from fastapi import FastAPI, Request, HTTPException
from fastapi.staticfiles import StaticFiles
from pydantic import ValidationError
from fastapi.routing import Mount
from starlette.websockets import WebSocket, WebSocketDisconnect, WebSocketState
from streamsync.ss_types import (AppProcessServerResponse, ComponentUpdateRequestPayload, EventResponsePayload, InitRequestBody, InitResponseBodyEdit,
InitResponseBodyRun, InitSessionRequestPayload, InitSessionResponsePayload, ServeMode, StateEnquiryResponsePayload, StreamsyncEvent, StreamsyncWebsocketIncoming, StreamsyncWebsocketOutgoing)
Expand All @@ -22,7 +24,6 @@
MAX_WEBSOCKET_MESSAGE_SIZE = 201*1024*1024
logging.getLogger().setLevel(logging.INFO)


def get_asgi_app(
user_app_path: str,
serve_mode: ServeMode,
Expand Down Expand Up @@ -57,6 +58,11 @@ async def lifespan(app: FastAPI):
on_shutdown()

asgi_app = FastAPI(lifespan=lifespan)
"""
Reuse the same pattern to give variable to FastAPI application
than `asgi_app.state.is_server_static_mounted` already use in streamsync.
"""
asgi_app.state.streamsync_app = True

def _get_extension_paths() -> List[str]:
extensions_path = pathlib.Path(user_app_path) / "extensions"
Expand Down Expand Up @@ -421,6 +427,60 @@ def on_load():
uvicorn.run(asgi_app, host=host,
port=port, log_level=log_level, ws_max_size=MAX_WEBSOCKET_MESSAGE_SIZE)

@asynccontextmanager
async def lifespan(app: FastAPI):
"""
This feature supports launching multiple streamsync applications simultaneously.
>>> import uvicorn
>>> import streamsync.serve
>>> from fastapi import FastAPI, Response
>>>
>>> root_asgi_app = FastAPI(lifespan=streamsync.serve.lifespan)
>>>
>>> sub_asgi_app_1 = streamsync.serve.get_asgi_app("../app1", "run")
>>> sub_asgi_app_2 = streamsync.serve.get_asgi_app("../app2", "run")
>>>
>>> uvicorn.run(root_asgi_app, ws_max_size=streamsync.serve.MAX_WEBSOCKET_MESSAGE_SIZE)
Streamsync uses lifespan to start an application server (app_runner) per
application.
"""
streamsync_lifespans = []
for route in app.routes:
if isinstance(route, Mount) and isinstance(route.app, FastAPI):
if hasattr(route.app.state, "streamsync_app"):
ctx = route.app.router.lifespan_context
streamsync_lifespans.append(ctx)

async with _lifespan_invoke(streamsync_lifespans, app):
yield


@asynccontextmanager
async def _lifespan_invoke(context: list, app: FastAPI):
"""
Helper to run multiple lifespans in cascade.
Running
>>> _lifespan_invoke([app1.router.lifespan_context, app2.router.lifespan_context], app)
is equivalent to
>>> @asynccontextmanager
>>> async def lifespan_context(app: FastAPI):
>>> async with app1.router.lifespan_context(app):
>>> async with app2.router.lifespan_context(app):
>>> yield
"""
ctx = context.pop(0)
async with ctx(app):
if len(context) > 0:
async with _lifespan_invoke(context, app):
yield
else:
yield

def _fix_mimetype():
"""
Expand Down
1 change: 1 addition & 0 deletions tests/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from pathlib import Path

test_app_dir = Path(__file__).resolve().parent / 'testapp'
test_multiapp_dir = Path(__file__).resolve().parent / 'testmultiapp'
39 changes: 37 additions & 2 deletions tests/test_serve.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import json
import mimetypes

import fastapi
from fastapi import FastAPI

import streamsync.serve
import fastapi.testclient
import pytest

from tests import test_app_dir
from tests import test_app_dir, test_multiapp_dir


class TestServe:
Expand Down Expand Up @@ -112,3 +113,37 @@ def test_serve_javascript_file_with_a_valid_content_type(self) -> None:
# Assert
assert res.status_code == 200
assert res.headers["Content-Type"].startswith("text/javascript")

def test_multiapp_should_run_the_lifespan_of_all_streamsync_app(self):
"""
This test check that multiple streamsync applications embedded
in FastAPI start completely and answer websocket request.
"""
asgi_app: fastapi.FastAPI = FastAPI(lifespan=streamsync.serve.lifespan)
asgi_app.mount("/app1", streamsync.serve.get_asgi_app(test_multiapp_dir / 'app1', "run"))
asgi_app.mount("/app2", streamsync.serve.get_asgi_app(test_multiapp_dir / 'app2', "run"))

with fastapi.testclient.TestClient(asgi_app) as client:
# test websocket connection on app1
with client.websocket_connect("/app1/api/stream") as websocket:
websocket.send_json({
"type": "streamInit",
"trackingId": 0,
"payload": {
"sessionId": "bad_session"
}
})
with pytest.raises(fastapi.WebSocketDisconnect):
websocket.receive_json()

# test websocket connection on app2
with client.websocket_connect("/app2/api/stream") as websocket:
websocket.send_json({
"type": "streamInit",
"trackingId": 0,
"payload": {
"sessionId": "bad_session"
}
})
with pytest.raises(fastapi.WebSocketDisconnect):
websocket.receive_json()
Empty file.
40 changes: 40 additions & 0 deletions tests/testmultiapp/app1/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import streamsync as ss

# This is a placeholder to get you started or refresh your memory.
# Delete it or adapt it as necessary.
# Documentation is available at https://streamsync.cloud

# Shows in the log when the app starts
print("Hello world!")

# Its name starts with _, so this function won't be exposed
def _update_message(state):
is_even = state["counter"] % 2 == 0
message = ("+Even" if is_even else "-Odd")
state["message"] = message

def decrement(state):
state["counter"] -= 1
_update_message(state)

def increment(state):
state["counter"] += 1
# Shows in the log when the event handler is run
print(f"The counter has been incremented.")
_update_message(state)

# Initialise the state

# "_my_private_element" won't be serialised or sent to the frontend,
# because it starts with an underscore

initial_state = ss.init_state({
"my_app": {
"title": "My App 1"
},
"_my_private_element": 1337,
"message": None,
"counter": 26,
})

_update_message(initial_state)
Binary file added tests/testmultiapp/app1/static/favicon.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
149 changes: 149 additions & 0 deletions tests/testmultiapp/app1/ui.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
{
"metadata": {
"streamsync_version": "0.1.4"
},
"components": {
"root": {
"id": "root",
"type": "root",
"content": {
"appName": "My App 1"
},
"parentId": null,
"position": 0,
"handlers": {},
"visible": true
},
"c0f99a9e-5004-4e75-a6c6-36f17490b134": {
"id": "c0f99a9e-5004-4e75-a6c6-36f17490b134",
"type": "page",
"content": {
"pageMode": "compact",
"emptinessColor": "#e9eef1"
},
"parentId": "root",
"position": 0,
"handlers": {},
"visible": true
},
"bebc5fe9-63a7-46a7-b0fa-62303555cfaf": {
"id": "bebc5fe9-63a7-46a7-b0fa-62303555cfaf",
"type": "header",
"content": {
"text": "@{my_app.title}"
},
"parentId": "c0f99a9e-5004-4e75-a6c6-36f17490b134",
"position": 0,
"handlers": {},
"visible": true
},
"28d3885b-0fb8-4d41-97c6-978540015431": {
"id": "28d3885b-0fb8-4d41-97c6-978540015431",
"type": "section",
"content": {
"title": "",
"snapMode": "no",
"containerShadow": "0px 4px 11px -12px #000000"
},
"parentId": "c0f99a9e-5004-4e75-a6c6-36f17490b134",
"position": 1,
"handlers": {},
"visible": true
},
"9556c0e3-8584-4ac9-903f-908a775a33ec": {
"id": "9556c0e3-8584-4ac9-903f-908a775a33ec",
"type": "button",
"content": {
"text": " Increment",
"icon": "arrow-up"
},
"parentId": "0d05bc9f-1655-4d0b-bc9b-c2f4c71a5117",
"position": 1,
"handlers": {
"click": "increment"
},
"visible": true
},
"51d1554e-1b88-461c-9353-1419cba0053a": {
"id": "51d1554e-1b88-461c-9353-1419cba0053a",
"type": "button",
"content": {
"text": "Decrement",
"icon": "arrow-down"
},
"parentId": "0d05bc9f-1655-4d0b-bc9b-c2f4c71a5117",
"position": 0,
"handlers": {
"click": "decrement"
},
"visible": true
},
"0d05bc9f-1655-4d0b-bc9b-c2f4c71a5117": {
"id": "0d05bc9f-1655-4d0b-bc9b-c2f4c71a5117",
"type": "horizontalstack",
"content": {
"alignment": "left"
},
"parentId": "f3777e75-3659-4d44-8ef7-aeec0d06855b",
"position": 0,
"handlers": {},
"visible": true
},
"172a14df-f73a-44fa-8fb1-e8648e7d32d2": {
"id": "172a14df-f73a-44fa-8fb1-e8648e7d32d2",
"type": "metric",
"content": {
"metricValue": "@{counter}",
"note": "@{message}",
"name": "Counter"
},
"parentId": "c2519671-9ce7-44e7-ba4e-b0efda9cb20e",
"position": 0,
"handlers": {},
"visible": true
},
"d4a5e62c-c6fe-49c4-80d4-33862af8727d": {
"id": "d4a5e62c-c6fe-49c4-80d4-33862af8727d",
"type": "columns",
"content": {},
"parentId": "28d3885b-0fb8-4d41-97c6-978540015431",
"position": 0,
"handlers": {},
"visible": true
},
"f3777e75-3659-4d44-8ef7-aeec0d06855b": {
"id": "f3777e75-3659-4d44-8ef7-aeec0d06855b",
"type": "column",
"content": {
"title": "",
"width": "1",
"verticalAlignment": "center",
"horizontalAlignment": "center"
},
"parentId": "d4a5e62c-c6fe-49c4-80d4-33862af8727d",
"position": 2,
"handlers": {},
"visible": true
},
"c2519671-9ce7-44e7-ba4e-b0efda9cb20e": {
"id": "c2519671-9ce7-44e7-ba4e-b0efda9cb20e",
"type": "column",
"content": {
"width": "1"
},
"parentId": "d4a5e62c-c6fe-49c4-80d4-33862af8727d",
"position": 0,
"handlers": {},
"visible": true
},
"d4a71819-7444-4083-a1c7-7995452a7abf": {
"id": "d4a71819-7444-4083-a1c7-7995452a7abf",
"type": "separator",
"content": {},
"parentId": "d4a5e62c-c6fe-49c4-80d4-33862af8727d",
"position": 1,
"handlers": {},
"visible": true
}
}
}
Empty file.
40 changes: 40 additions & 0 deletions tests/testmultiapp/app2/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import streamsync as ss

# This is a placeholder to get you started or refresh your memory.
# Delete it or adapt it as necessary.
# Documentation is available at https://streamsync.cloud

# Shows in the log when the app starts
print("Hello world!")

# Its name starts with _, so this function won't be exposed
def _update_message(state):
is_even = state["counter"] % 2 == 0
message = ("+Even" if is_even else "-Odd")
state["message"] = message

def decrement(state):
state["counter"] -= 1
_update_message(state)

def increment(state):
state["counter"] += 1
# Shows in the log when the event handler is run
print(f"The counter has been incremented.")
_update_message(state)

# Initialise the state

# "_my_private_element" won't be serialised or sent to the frontend,
# because it starts with an underscore

initial_state = ss.init_state({
"my_app": {
"title": "My App 2"
},
"_my_private_element": 1337,
"message": None,
"counter": 26,
})

_update_message(initial_state)
Binary file added tests/testmultiapp/app2/static/favicon.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading

0 comments on commit 68ef42a

Please sign in to comment.