Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix failing tests - broadcaster disconnection & multiprocess #73

Merged
merged 2 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 33 additions & 12 deletions tests/broadcaster_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from multiprocessing import Process

from fastapi_websocket_rpc.logger import get_logger, logging_config, LoggingModes

logging_config.set_mode(LoggingModes.LOGURU)

# Add parent path to use local src as package for tests
Expand All @@ -24,7 +25,11 @@

logger = get_logger("Test")
logger.remove()
logger.add(sys.stderr, format="<green>{time}</green> | {process} | <blue>{name: <50}</blue>|<level>{level:^6} | {message}</level>", level="INFO")
logger.add(
sys.stderr,
format="<green>{time}</green> | {process} | <blue>{name: <50}</blue>|<level>{level:^6} | {message}</level>",
level="INFO",
)

# Configurable
PORT = int(os.environ.get("PORT") or "7990")
Expand All @@ -37,26 +42,28 @@
PG_HOST_PORT = 25432
PG_SLEEP_TIME = 10


@pytest.fixture()
def postgres(request):
CONTAINER_NAME = "broadcastdb" + "".join(
[random.choice(string.ascii_letters) for _ in range(8)]
)

def rm_container():
os.system(f'docker rm -f {CONTAINER_NAME} > /dev/null 2>&1')
os.system(f"docker rm -f {CONTAINER_NAME} > /dev/null 2>&1")

rm_container() # Make sure no previous container exists
rm_container() # Make sure no previous container exists

postgres_args = ''
postgres_args = ""
timeout_marker = request.node.get_closest_marker("postgres_idle_timeout")
if timeout_marker is not None:
timeout = timeout_marker.args[0]
postgres_args = f'-c idle_session_timeout={timeout} -c idle_in_transaction_session_timeout={timeout}'

postgres_args = f"-c idle_session_timeout={timeout} -c idle_in_transaction_session_timeout={timeout}"

logger.info(f"running postgres on host port {PG_HOST_PORT}...")
os.system(f'docker run -d -p {PG_HOST_PORT}:5432 --name {CONTAINER_NAME} -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres postgres:alpine {postgres_args} > /dev/null 2>&1')
os.system(
f"docker run -d -p {PG_HOST_PORT}:5432 --name {CONTAINER_NAME} -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres postgres:alpine {postgres_args} > /dev/null 2>&1"
)
logger.info(f"Sleeping for {PG_SLEEP_TIME} seconds so postgres could stabilize")
time.sleep(PG_SLEEP_TIME)

Expand All @@ -65,18 +72,28 @@ def rm_container():
finally:
rm_container()


def setup_pubsub_endpoint(app: FastAPI, broadcast_url: str, path: str):
"""
sets up endpoints on the fastapi app:
- a pub/sub websocket endpoint for clients to connect to
- a trigger endpoint that causes the pub/sub server to publish a message on a predefined topic
"""
logger.info(f"[{path} endpoint] connecting to broadcast backbone service on '{broadcast_url}'")
endpoint = PubSubEndpoint(broadcaster=broadcast_url, ignore_broadcaster_disconnected=False)
logger.info(
f"[{path} endpoint] connecting to broadcast backbone service on '{broadcast_url}'"
)
endpoint = PubSubEndpoint(
broadcaster=broadcast_url, ignore_broadcaster_disconnected=False
)

@app.websocket(path)
async def websocket_rpc_endpoint(websocket: WebSocket):
await endpoint.main_loop(websocket)
try:
# Close connection if not already closed
await websocket.close()
except:
pass

@app.get(f"{path}/trigger")
async def trigger_events():
Expand All @@ -100,10 +117,11 @@ def setup_server(broadcast_url):
logger.info("Running server app")
uvicorn.run(app, port=PORT)


@pytest.fixture()
def server(postgres):
# Run the server as a separate process
proc = Process(target=setup_server, args=(postgres, ), daemon=True)
proc = Process(target=setup_server, args=(postgres,), daemon=True)
proc.start()
logger.info("Server started on a daemon process")
yield proc
Expand Down Expand Up @@ -132,7 +150,10 @@ async def on_event(data, topic):

async with PubSubClient() as client1:
async with PubSubClient() as client2:
for c, uri in [(client1,first_endpoint_uri), (client2,second_endpoint_uri)]:
for c, uri in [
(client1, first_endpoint_uri),
(client2, second_endpoint_uri),
]:
c.subscribe(EVENT_TOPIC, on_event)
c.start_client(uri)
await c.wait_until_ready()
Expand All @@ -155,6 +176,7 @@ async def wait_for_sem():
if repeat + 1 < repeats:
await asyncio.sleep(interval)


@pytest.mark.postgres_idle_timeout(3000)
@pytest.mark.asyncio
async def test_idle_pg_broadcaster_disconnect(server):
Expand All @@ -170,4 +192,3 @@ async def test_idle_pg_broadcaster_disconnect(server):
- all servers (and clients) will get both of the messages
"""
await test_all_clients_get_a_topic_via_broadcast(server, repeats=3, interval=4)

10 changes: 5 additions & 5 deletions tests/multiprocess_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
Publishing-Client -> PubSubServer -> Subscribing->Client
"""

import os
import sys
import pytest
import uvicorn
import asyncio
from multiprocessing import Process, Event as ProcEvent

from fastapi import APIRouter, FastAPI
from fastapi import FastAPI

from fastapi_websocket_rpc.logger import get_logger

Expand All @@ -35,11 +36,9 @@

def setup_server():
app = FastAPI()
router = APIRouter()
# PubSub websocket endpoint
endpoint = PubSubEndpoint()
endpoint.register_route(router, path="/pubsub")
app.include_router(router)
endpoint.register_route(app, path="/pubsub")
uvicorn.run(app, port=PORT)


Expand All @@ -51,6 +50,7 @@ def setup_publishing_client():
async def actual():
# Wait for other client to wake up before publishing to it
CLIENT_START_SYNC.wait(5)
logger.info("Client start sync done")
# Create a client and subscribe to topics
client = PubSubClient()
client.start_client(uri)
Expand All @@ -62,7 +62,7 @@ async def actual():
assert published.result

logger.info("Starting async publishing client")
asyncio.get_event_loop().run_until_complete(actual())
asyncio.run(actual())


@pytest.fixture(scope="module")
Expand Down
Loading