From a1f3c58061202fa72e2fb50331bacfd44c6f4d39 Mon Sep 17 00:00:00 2001 From: Ali RajabNezhad Date: Tue, 23 Jan 2024 19:39:04 +0330 Subject: [PATCH 1/7] Minor improvement in `ModelSerializer` --- panther/serializer.py | 39 ++++++++++++++++++++++------------ tests/test_model_serializer.py | 13 +++++------- 2 files changed, 31 insertions(+), 21 deletions(-) diff --git a/panther/serializer.py b/panther/serializer.py index 7d56ea5..9948e0b 100644 --- a/panther/serializer.py +++ b/panther/serializer.py @@ -3,31 +3,44 @@ class ModelSerializer: - def __new__(cls, *args, **kwargs): + def __new__(cls, *args, model=None, **kwargs): + # Check `metaclass` if len(args) == 0: - msg = f"you should not inherit the 'ModelSerializer', you should use it as 'metaclass' -> {cls.__name__}" + address = f'{cls.__module__}.{cls.__name__}' + msg = f"you should not inherit the 'ModelSerializer', you should use it as 'metaclass' -> {address}" raise TypeError(msg) + model_name = args[0] - if 'model' not in kwargs: - msg = f"'model' required while using 'ModelSerializer' metaclass -> {model_name}" + data = args[2] + address = f'{data["__module__"]}.{model_name}' + + # Check `model` + if model is None: + msg = f"'model' required while using 'ModelSerializer' metaclass -> {address}" raise AttributeError(msg) + # Check `fields` + if 'fields' not in data: + msg = f"'fields' required while using 'ModelSerializer' metaclass. -> {address}" + raise AttributeError(msg) from None - model_fields = kwargs['model'].model_fields + model_fields = model.model_fields field_definitions = {} - if 'fields' not in args[2]: - msg = f"'fields' required while using 'ModelSerializer' metaclass. -> {model_name}" - raise AttributeError(msg) from None - for field_name in args[2]['fields']: + + # Collect `fields` + for field_name in data['fields']: if field_name not in model_fields: - msg = f"'{field_name}' is not in '{kwargs['model'].__name__}' -> {model_name}" + msg = f"'{field_name}' is not in '{model.__name__}' -> {address}" raise AttributeError(msg) from None - field_definitions[field_name] = (model_fields[field_name].annotation, model_fields[field_name]) - for required in args[2].get('required_fields', []): + + # Change `required_fields + for required in data.get('required_fields', []): if required not in field_definitions: - msg = f"'{required}' is in 'required_fields' but not in 'fields' -> {model_name}" + msg = f"'{required}' is in 'required_fields' but not in 'fields' -> {address}" raise AttributeError(msg) from None field_definitions[required][1].default = PydanticUndefined + + # Create Model return create_model( __model_name=model_name, **field_definitions diff --git a/tests/test_model_serializer.py b/tests/test_model_serializer.py index 59bcf16..194c360 100644 --- a/tests/test_model_serializer.py +++ b/tests/test_model_serializer.py @@ -1,4 +1,3 @@ -import asyncio from pathlib import Path from unittest import TestCase @@ -6,7 +5,6 @@ from panther import Panther from panther.app import API -from panther.configs import config from panther.db import Model from panther.request import Request from panther.serializer import ModelSerializer @@ -122,7 +120,7 @@ class Serializer1(metaclass=ModelSerializer, model=Book): pass except Exception as e: assert isinstance(e, AttributeError) - assert e.args[0] == "'fields' required while using 'ModelSerializer' metaclass. -> Serializer1" + assert e.args[0] == "'fields' required while using 'ModelSerializer' metaclass. -> tests.test_model_serializer.Serializer1" else: assert False @@ -132,7 +130,7 @@ class Serializer2(metaclass=ModelSerializer, model=Book): fields = ['ok', 'no'] except Exception as e: assert isinstance(e, AttributeError) - assert e.args[0] == "'ok' is not in 'Book' -> Serializer2" + assert e.args[0] == "'ok' is not in 'Book' -> tests.test_model_serializer.Serializer2" else: assert False @@ -143,7 +141,7 @@ class Serializer3(metaclass=ModelSerializer, model=Book): required_fields = ['pages_count'] except Exception as e: assert isinstance(e, AttributeError) - assert e.args[0] == "'pages_count' is in 'required_fields' but not in 'fields' -> Serializer3" + assert e.args[0] == "'pages_count' is in 'required_fields' but not in 'fields' -> tests.test_model_serializer.Serializer3" else: assert False @@ -154,7 +152,7 @@ class Serializer4(metaclass=ModelSerializer): required_fields = ['pages_count'] except Exception as e: assert isinstance(e, AttributeError) - assert e.args[0] == "'model' required while using 'ModelSerializer' metaclass -> Serializer4" + assert e.args[0] == "'model' required while using 'ModelSerializer' metaclass -> tests.test_model_serializer.Serializer4" else: assert False @@ -167,7 +165,6 @@ class Serializer5(ModelSerializer): Serializer5(name='alice', author='bob') except Exception as e: assert isinstance(e, TypeError) - assert e.args[0] == ("you should not inherit the 'ModelSerializer', " - "you should use it as 'metaclass' -> Serializer5") + assert e.args[0] == "you should not inherit the 'ModelSerializer', you should use it as 'metaclass' -> tests.test_model_serializer.Serializer5" else: assert False From 1346241247b75bb5eef5d10bf22eec10fa6eed98 Mon Sep 17 00:00:00 2001 From: Ali RajabNezhad Date: Wed, 24 Jan 2024 03:04:02 +0330 Subject: [PATCH 2/7] Add multiprocessing Manger + Create a PubSub with Queue --- example/core/configs.py | 2 +- example/model_serializer_example.py | 2 +- panther/base_websocket.py | 53 +++++++++++++++++++++++++++-- panther/main.py | 26 +++++++++----- panther/response.py | 3 +- panther/websocket.py | 24 ++++++------- 6 files changed, 84 insertions(+), 26 deletions(-) diff --git a/example/core/configs.py b/example/core/configs.py index d344554..e0fae08 100644 --- a/example/core/configs.py +++ b/example/core/configs.py @@ -28,7 +28,7 @@ {'url': f'pantherdb://{BASE_DIR}/{DB_NAME}.pdb'}, ), # ('panther.middlewares.db.DatabaseMiddleware', {'url': f'mongodb://{DB_HOST}:27017/{DB_NAME}'}), - ('panther.middlewares.redis.RedisMiddleware', {'host': '127.0.0.1', 'port': 6379}), + # ('panther.middlewares.redis.RedisMiddleware', {'host': '127.0.0.1', 'port': 6379}), ] """ mongodb://[Username:Password(optional)]@HostName:Port/?aruguments diff --git a/example/model_serializer_example.py b/example/model_serializer_example.py index 384a1ad..ece1ff0 100644 --- a/example/model_serializer_example.py +++ b/example/model_serializer_example.py @@ -1,6 +1,6 @@ from pydantic import Field -from panther import status, Panther +from panther import Panther, status from panther.app import API from panther.db import Model from panther.request import Request diff --git a/panther/base_websocket.py b/panther/base_websocket.py index 255495a..e6dca54 100644 --- a/panther/base_websocket.py +++ b/panther/base_websocket.py @@ -3,6 +3,7 @@ import asyncio import contextlib import logging +from multiprocessing import Manager from typing import TYPE_CHECKING import orjson as json @@ -16,10 +17,27 @@ if TYPE_CHECKING: from redis import Redis - logger = logging.getLogger('panther') +class PubSub: + def __init__(self): + self._subscribers = MANAGER.list() + + def subscribe(self): + queue = MANAGER.Queue() + self._subscribers.append(queue) + return queue + + def publish(self, msg): + for queue in self._subscribers: + queue.put(msg) + + +MANAGER = Manager() +PUBSUB = PubSub() + + class WebsocketConnections(Singleton): def __init__(self): self.connections = {} @@ -44,7 +62,7 @@ def __call__(self, r: Redis | None): and (data := loaded_data.get('data')) and (action := loaded_data.get('action')) and (connection := self.connections.get(connection_id)) - ): + ): # Check Action of WS match action: case 'send': @@ -64,6 +82,37 @@ def __call__(self, r: Redis | None): logger.debug(f'Unknown Message Action: {action}') case _: logger.debug(f'Unknown Channel Type: {channel_data["type"]}') + else: + queue = PUBSUB.subscribe() + logger.info("Subscribed to 'websocket_connections' queue") + while True: + msg = queue.get() + if msg is None: + break + if ( + (connection_id := msg.get('connection_id')) + and (data := msg.get('data')) + and (action := msg.get('action')) + and (connection := self.connections.get(connection_id)) + ): + + # Check Action of WS + match action: + case 'send': + logger.debug(f'Sending Message to {connection_id}') + asyncio.run(connection.send(data=data)) + case 'close': + with contextlib.suppress(RuntimeError): + asyncio.run(connection.close(code=data['code'], reason=data['reason'])) + # We are trying to disconnect the connection between a thread and a user + # from another thread, it's working, but we have to find another solution it + # + # Error: + # Task > got Future + # > + # attached to a different loop + case _: + logger.debug(f'Unknown Message Action: {action}') async def new_connection(self, connection: Websocket) -> None: await connection.connect(**connection.path_variables) diff --git a/panther/main.py b/panther/main.py index 545b45f..26af3aa 100644 --- a/panther/main.py +++ b/panther/main.py @@ -52,14 +52,6 @@ def __init__(self, name: str, configs=None, urls: dict | None = None, startup: C # Print Info print_info(config) - # Start Websocket Listener (Redis Required) - if config['has_ws']: - Thread( - target=config['websocket_connections'], - daemon=True, - args=(self.ws_redis_connection,), - ).start() - def load_configs(self) -> None: # Check & Read The Configs File @@ -138,6 +130,7 @@ async def __call__(self, scope: dict, receive: Callable, send: Callable) -> None if scope['type'] == 'lifespan': message = await receive() if message["type"] == "lifespan.startup": + await self.handle_ws_listener() await self.handle_startup() return @@ -262,6 +255,15 @@ async def handle_http(self, scope: dict, receive: Callable, send: Callable) -> N body=response.body, ) + async def handle_ws_listener(self): + # Start Websocket Listener (Redis/ Queue) + if config['has_ws']: + Thread( + target=config['websocket_connections'], + daemon=True, + args=(self.ws_redis_connection,), + ).start() + async def handle_startup(self): if startup := config['startup'] or self._startup: if is_function_async(startup): @@ -272,7 +274,13 @@ async def handle_startup(self): def handle_shutdown(self): if shutdown := config['shutdown'] or self._shutdown: if is_function_async(shutdown): - asyncio.run(shutdown()) + try: + asyncio.run(shutdown()) + except ModuleNotFoundError: + # Error: import of asyncio halted; None in sys.modules + # And as I figured it out, it only happens when we running with + # gunicorn and Uvicorn workers (-k uvicorn.workers.UvicornWorker) + pass else: shutdown() diff --git a/panther/response.py b/panther/response.py index 3fbe2bf..d7bbabb 100644 --- a/panther/response.py +++ b/panther/response.py @@ -35,9 +35,10 @@ def body(self) -> bytes: @property def headers(self) -> dict: + content_length = 0 if self.body == b'null' else len(self.body) return { 'content-type': self.content_type, - 'content-length': len(self.body), + 'content-length': content_length, 'access-control-allow-origin': '*', } | (self._headers or {}) diff --git a/panther/websocket.py b/panther/websocket.py index 2cd316d..5da45d0 100644 --- a/panther/websocket.py +++ b/panther/websocket.py @@ -5,8 +5,7 @@ import orjson as json from panther import status -from panther.base_websocket import Websocket, WebsocketConnections -from panther.configs import config +from panther.base_websocket import Websocket, PUBSUB from panther.db.connection import redis @@ -34,22 +33,18 @@ async def send_message_to_websocket(connection_id: str, data: any): if redis.is_connected: _publish_to_ws_channel(connection_id=connection_id, action='send', data=data) else: - websocket_connections: WebsocketConnections = config['websocket_connections'] - if connection := websocket_connections.connections.get(connection_id): - await connection.send(data=data) + _publish_to_ws_queue(connection_id=connection_id, action='send', data=data) async def close_websocket_connection(connection_id: str, code: int = status.WS_1000_NORMAL_CLOSURE, reason: str = ''): + data = { + 'code': code, + 'reason': reason, + } if redis.is_connected: - data = { - 'code': code, - 'reason': reason, - } _publish_to_ws_channel(connection_id=connection_id, action='close', data=data) else: - websocket_connections: WebsocketConnections = config['websocket_connections'] - if connection := websocket_connections.connections.get(connection_id): - await connection.close(code=code, reason=reason) + _publish_to_ws_queue(connection_id=connection_id, action='close', data=data) def _publish_to_ws_channel(connection_id: str, action: Literal['send', 'close'], data: any): @@ -59,3 +54,8 @@ def _publish_to_ws_channel(connection_id: str, action: Literal['send', 'close'], p_data = json.dumps({'connection_id': connection_id, 'action': action, 'data': data}) redis.publish('websocket_connections', p_data) + + +def _publish_to_ws_queue(connection_id: str, action: Literal['send', 'close'], data: any): + p_data = {'connection_id': connection_id, 'action': action, 'data': data} + PUBSUB.publish(p_data) From 77c22e6ffa8857962119f895ecef0f3622eeca53 Mon Sep 17 00:00:00 2001 From: Ali RajabNezhad Date: Wed, 24 Jan 2024 03:36:39 +0330 Subject: [PATCH 3/7] Minor Improvement in WebsocketConnections() --- panther/base_websocket.py | 92 ++++++++++++++++----------------------- panther/websocket.py | 32 ++++---------- 2 files changed, 47 insertions(+), 77 deletions(-) diff --git a/panther/base_websocket.py b/panther/base_websocket.py index e6dca54..d79a84f 100644 --- a/panther/base_websocket.py +++ b/panther/base_websocket.py @@ -49,70 +49,52 @@ def __call__(self, r: Redis | None): subscriber.subscribe('websocket_connections') logger.info("Subscribed to 'websocket_connections' channel") for channel_data in subscriber.listen(): - # Check Type of PubSub Message match channel_data['type']: + # Subscribed case 'subscribe': continue + # Message Received case 'message': loaded_data = json.loads(channel_data['data'].decode()) - if ( - isinstance(loaded_data, dict) - and (connection_id := loaded_data.get('connection_id')) - and (data := loaded_data.get('data')) - and (action := loaded_data.get('action')) - and (connection := self.connections.get(connection_id)) - ): - # Check Action of WS - match action: - case 'send': - logger.debug(f'Sending Message to {connection_id}') - asyncio.run(connection.send(data=data)) - case 'close': - with contextlib.suppress(RuntimeError): - asyncio.run(connection.close(code=data['code'], reason=data['reason'])) - # We are trying to disconnect the connection between a thread and a user - # from another thread, it's working, but we have to find another solution it - # - # Error: - # Task > got Future - # > - # attached to a different loop - case _: - logger.debug(f'Unknown Message Action: {action}') - case _: - logger.debug(f'Unknown Channel Type: {channel_data["type"]}') + self._handle_received_message(received_message=loaded_data) + + case unknown_type: + logger.debug(f'Unknown Channel Type: {unknown_type}') else: queue = PUBSUB.subscribe() logger.info("Subscribed to 'websocket_connections' queue") while True: - msg = queue.get() - if msg is None: - break - if ( - (connection_id := msg.get('connection_id')) - and (data := msg.get('data')) - and (action := msg.get('action')) - and (connection := self.connections.get(connection_id)) - ): - - # Check Action of WS - match action: - case 'send': - logger.debug(f'Sending Message to {connection_id}') - asyncio.run(connection.send(data=data)) - case 'close': - with contextlib.suppress(RuntimeError): - asyncio.run(connection.close(code=data['code'], reason=data['reason'])) - # We are trying to disconnect the connection between a thread and a user - # from another thread, it's working, but we have to find another solution it - # - # Error: - # Task > got Future - # > - # attached to a different loop - case _: - logger.debug(f'Unknown Message Action: {action}') + received_message = queue.get() + self._handle_received_message(received_message=received_message) + + def _handle_received_message(self, received_message): + if ( + isinstance(received_message, dict) + and (connection_id := received_message.get('connection_id')) + and connection_id in self.connections + and 'action' in received_message + and 'data' in received_message + ): + # Check Action of WS + match received_message['action']: + case 'send': + asyncio.run(self.connections[connection_id].send(data=received_message['data'])) + case 'close': + with contextlib.suppress(RuntimeError): + asyncio.run(self.connections[connection_id].close( + code=received_message['data']['code'], + reason=received_message['data']['reason'] + )) + # We are trying to disconnect the connection between a thread and a user + # from another thread, it's working, but we have to find another solution it + # + # Error: + # Task > got Future + # > + # attached to a different loop + case unknown_action: + logger.debug(f'Unknown Message Action: {unknown_action}') async def new_connection(self, connection: Websocket) -> None: await connection.connect(**connection.path_variables) @@ -155,6 +137,7 @@ async def receive(self, data: str | bytes) -> None: pass async def send(self, data: any = None) -> None: + logger.debug(f'Sending WS Message to {self.connection_id}') if data: if isinstance(data, bytes): await self.send_bytes(bytes_data=data) @@ -170,6 +153,7 @@ async def send_bytes(self, bytes_data: bytes) -> None: await self.asgi_send({'type': 'websocket.send', 'bytes': bytes_data}) async def close(self, code: int = status.WS_1000_NORMAL_CLOSURE, reason: str = '') -> None: + logger.debug(f'Closing WS Connection {self.connection_id}') self.is_connected = False config['websocket_connections'].remove_connection(self) await self.asgi_send({'type': 'websocket.close', 'code': code, 'reason': reason}) diff --git a/panther/websocket.py b/panther/websocket.py index 5da45d0..eb8813f 100644 --- a/panther/websocket.py +++ b/panther/websocket.py @@ -30,32 +30,18 @@ async def send(self, data: any = None): async def send_message_to_websocket(connection_id: str, data: any): - if redis.is_connected: - _publish_to_ws_channel(connection_id=connection_id, action='send', data=data) - else: - _publish_to_ws_queue(connection_id=connection_id, action='send', data=data) + _publish_to_websocket(connection_id=connection_id, action='send', data=data) async def close_websocket_connection(connection_id: str, code: int = status.WS_1000_NORMAL_CLOSURE, reason: str = ''): - data = { - 'code': code, - 'reason': reason, - } - if redis.is_connected: - _publish_to_ws_channel(connection_id=connection_id, action='close', data=data) - else: - _publish_to_ws_queue(connection_id=connection_id, action='close', data=data) - + data = {'code': code, 'reason': reason} + _publish_to_websocket(connection_id=connection_id, action='close', data=data) -def _publish_to_ws_channel(connection_id: str, action: Literal['send', 'close'], data: any): - from panther.db.connection import redis - assert redis.is_connected, 'Redis Is Not Connected.' +def _publish_to_websocket(connection_id: str, action: Literal['send', 'close'], data: any): + publish_data = {'connection_id': connection_id, 'action': action, 'data': data} - p_data = json.dumps({'connection_id': connection_id, 'action': action, 'data': data}) - redis.publish('websocket_connections', p_data) - - -def _publish_to_ws_queue(connection_id: str, action: Literal['send', 'close'], data: any): - p_data = {'connection_id': connection_id, 'action': action, 'data': data} - PUBSUB.publish(p_data) + if redis.is_connected: + redis.publish('websocket_connections', json.dumps(publish_data)) + else: + PUBSUB.publish(publish_data) From fab0390c03ebc6836dcac1f292b7508a8105fefd Mon Sep 17 00:00:00 2001 From: Ali RajabNezhad Date: Wed, 24 Jan 2024 03:47:44 +0330 Subject: [PATCH 4/7] Update ws doc --- docs/docs/websocket.md | 16 +++++++++++----- panther/base_websocket.py | 2 +- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/docs/docs/websocket.md b/docs/docs/websocket.md index d8f649e..2949a71 100644 --- a/docs/docs/websocket.md +++ b/docs/docs/websocket.md @@ -50,9 +50,15 @@ urls = { from panther.websocket import send_message_to_websocket await send_message_to_websocket(connection_id='7e82d57c9ec0478787b01916910a9f45', data='New Message From WS') ``` -8. If you want to use `webscoket` in `multi-tread` or `multi-instance` backend, you should add `RedisMiddleware` in your `configs` or it won't work well. +8. If you want to use `webscoket` in a backend with `multiple workers`, we recommend you to add `RedisMiddleware` in your `configs` [[Adding Redis Middleware]](https://pantherpy.github.io/middlewares/#redis-middleware) -9. If you want to close a connection: +9. If you don't want to add `RedisMiddleware` and you still want to use `websocket` in `multi-thread`, +you have to use `--preload` option while running the project like below: + ```bash + gunicorn -w 10 -k uvicorn.workers.UvicornWorker main:app --preload + ``` + +10. If you want to close a connection: - In websocket class scope: You can close connection with `self.close()` method which takes 2 args, `code` and `reason`: ```python from panther import status @@ -65,7 +71,7 @@ urls = { await close_websocket_connection(connection_id='7e82d57c9ec0478787b01916910a9f45', code=status.WS_1008_POLICY_VIOLATION, reason='') ``` -10. `Path Variables` will be passed to `connect()`: +11. `Path Variables` will be passed to `connect()`: ```python from panther.websocket import GenericWebsocket @@ -77,6 +83,6 @@ urls = { '/ws///': UserWebsocket } ``` -11. WebSocket Echo Example -> [Https://GitHub.com/PantherPy/echo_websocket](https://github.com/PantherPy/echo_websocket) -12. Enjoy. +12. WebSocket Echo Example -> [Https://GitHub.com/PantherPy/echo_websocket](https://github.com/PantherPy/echo_websocket) +13. Enjoy. diff --git a/panther/base_websocket.py b/panther/base_websocket.py index d79a84f..7f75a59 100644 --- a/panther/base_websocket.py +++ b/panther/base_websocket.py @@ -87,7 +87,7 @@ def _handle_received_message(self, received_message): reason=received_message['data']['reason'] )) # We are trying to disconnect the connection between a thread and a user - # from another thread, it's working, but we have to find another solution it + # from another thread, it's working, but we have to find another solution for it # # Error: # Task > got Future From 280d910e7776ddf75cd50acdf99555842a6561e9 Mon Sep 17 00:00:00 2001 From: Ali RajabNezhad Date: Wed, 24 Jan 2024 04:00:15 +0330 Subject: [PATCH 5/7] Fix an issue --- panther/main.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/panther/main.py b/panther/main.py index 26af3aa..cfdf5cf 100644 --- a/panther/main.py +++ b/panther/main.py @@ -90,8 +90,7 @@ def load_configs(self) -> None: self._create_ws_connections_instance() def _create_ws_connections_instance(self): - from panther.base_websocket import Websocket - from panther.websocket import WebsocketConnections + from panther.base_websocket import Websocket, WebsocketConnections # Check do we have ws endpoint for endpoint in config['flat_urls'].values(): From 52ad77a76ab5f9da761855a8dc4f52199aa86122 Mon Sep 17 00:00:00 2001 From: Ali RajabNezhad Date: Wed, 24 Jan 2024 04:15:22 +0330 Subject: [PATCH 6/7] Create `Manager()` only when `config.has_ws` is `True` --- docs/docs/release_notes.md | 3 +++ panther/__init__.py | 2 +- panther/base_websocket.py | 28 ++++++++++++++++++---------- panther/main.py | 3 ++- panther/websocket.py | 21 ++++----------------- 5 files changed, 28 insertions(+), 29 deletions(-) diff --git a/docs/docs/release_notes.md b/docs/docs/release_notes.md index 87e48e8..8df6cd9 100644 --- a/docs/docs/release_notes.md +++ b/docs/docs/release_notes.md @@ -1,3 +1,6 @@ +### 3.8.0 +- Handle WebSocket connections when we have multiple workers with `multiprocessing.Manager` + ### 3.7.0 - Add `ModelSerializer` diff --git a/panther/__init__.py b/panther/__init__.py index 1bf3b96..ada7e42 100644 --- a/panther/__init__.py +++ b/panther/__init__.py @@ -1,6 +1,6 @@ from panther.main import Panther # noqa: F401 -__version__ = '3.7.0' +__version__ = '3.8.0' def version(): diff --git a/panther/base_websocket.py b/panther/base_websocket.py index 7f75a59..250a833 100644 --- a/panther/base_websocket.py +++ b/panther/base_websocket.py @@ -4,7 +4,7 @@ import contextlib import logging from multiprocessing import Manager -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Literal import orjson as json @@ -12,6 +12,7 @@ from panther._utils import generate_ws_connection_id from panther.base_request import BaseRequest from panther.configs import config +from panther.db.connection import redis from panther.utils import Singleton if TYPE_CHECKING: @@ -21,11 +22,12 @@ class PubSub: - def __init__(self): - self._subscribers = MANAGER.list() + def __init__(self, manager): + self._manager = manager + self._subscribers = self._manager.list() def subscribe(self): - queue = MANAGER.Queue() + queue = self._manager.Queue() self._subscribers.append(queue) return queue @@ -34,14 +36,11 @@ def publish(self, msg): queue.put(msg) -MANAGER = Manager() -PUBSUB = PubSub() - - class WebsocketConnections(Singleton): - def __init__(self): + def __init__(self, manager: Manager): self.connections = {} self.connections_count = 0 + self.manager = manager def __call__(self, r: Redis | None): if r: @@ -62,7 +61,8 @@ def __call__(self, r: Redis | None): case unknown_type: logger.debug(f'Unknown Channel Type: {unknown_type}') else: - queue = PUBSUB.subscribe() + self.pubsub = PubSub(manager=self.manager) + queue = self.pubsub.subscribe() logger.info("Subscribed to 'websocket_connections' queue") while True: received_message = queue.get() @@ -96,6 +96,14 @@ def _handle_received_message(self, received_message): case unknown_action: logger.debug(f'Unknown Message Action: {unknown_action}') + def publish(self, connection_id: str, action: Literal['send', 'close'], data: any): + publish_data = {'connection_id': connection_id, 'action': action, 'data': data} + + if redis.is_connected: + redis.publish('websocket_connections', json.dumps(publish_data)) + else: + self.pubsub.publish(publish_data) + async def new_connection(self, connection: Websocket) -> None: await connection.connect(**connection.path_variables) if not hasattr(connection, '_connection_id'): diff --git a/panther/main.py b/panther/main.py index cfdf5cf..1be7c50 100644 --- a/panther/main.py +++ b/panther/main.py @@ -5,6 +5,7 @@ import types from collections.abc import Callable from logging.config import dictConfig +from multiprocessing import Manager from pathlib import Path from threading import Thread @@ -102,7 +103,7 @@ def _create_ws_connections_instance(self): # Create websocket connections instance if config['has_ws']: - config['websocket_connections'] = WebsocketConnections() + config['websocket_connections'] = WebsocketConnections(manager=Manager()) # Websocket Redis Connection for middleware in config['http_middlewares']: if middleware.__class__.__name__ == 'RedisMiddleware': diff --git a/panther/websocket.py b/panther/websocket.py index eb8813f..3541a53 100644 --- a/panther/websocket.py +++ b/panther/websocket.py @@ -1,12 +1,8 @@ from __future__ import annotations -from typing import Literal - -import orjson as json - from panther import status -from panther.base_websocket import Websocket, PUBSUB -from panther.db.connection import redis +from panther.base_websocket import Websocket +from panther.configs import config class GenericWebsocket(Websocket): @@ -30,18 +26,9 @@ async def send(self, data: any = None): async def send_message_to_websocket(connection_id: str, data: any): - _publish_to_websocket(connection_id=connection_id, action='send', data=data) + config.websocket_connections.publish(connection_id=connection_id, action='send', data=data) async def close_websocket_connection(connection_id: str, code: int = status.WS_1000_NORMAL_CLOSURE, reason: str = ''): data = {'code': code, 'reason': reason} - _publish_to_websocket(connection_id=connection_id, action='close', data=data) - - -def _publish_to_websocket(connection_id: str, action: Literal['send', 'close'], data: any): - publish_data = {'connection_id': connection_id, 'action': action, 'data': data} - - if redis.is_connected: - redis.publish('websocket_connections', json.dumps(publish_data)) - else: - PUBSUB.publish(publish_data) + config.websocket_connections.publish(connection_id=connection_id, action='close', data=data) From 192f8a9a2249ed361203ff66c823f29f5722ba9f Mon Sep 17 00:00:00 2001 From: Ali RajabNezhad Date: Wed, 24 Jan 2024 04:24:49 +0330 Subject: [PATCH 7/7] Only create Manager() if we are not going to use Redis for PubSub --- example/core/configs.py | 2 +- panther/base_websocket.py | 2 +- panther/main.py | 5 ++++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/example/core/configs.py b/example/core/configs.py index e0fae08..d344554 100644 --- a/example/core/configs.py +++ b/example/core/configs.py @@ -28,7 +28,7 @@ {'url': f'pantherdb://{BASE_DIR}/{DB_NAME}.pdb'}, ), # ('panther.middlewares.db.DatabaseMiddleware', {'url': f'mongodb://{DB_HOST}:27017/{DB_NAME}'}), - # ('panther.middlewares.redis.RedisMiddleware', {'host': '127.0.0.1', 'port': 6379}), + ('panther.middlewares.redis.RedisMiddleware', {'host': '127.0.0.1', 'port': 6379}), ] """ mongodb://[Username:Password(optional)]@HostName:Port/?aruguments diff --git a/panther/base_websocket.py b/panther/base_websocket.py index 250a833..074e268 100644 --- a/panther/base_websocket.py +++ b/panther/base_websocket.py @@ -37,7 +37,7 @@ def publish(self, msg): class WebsocketConnections(Singleton): - def __init__(self, manager: Manager): + def __init__(self, manager: Manager = None): self.connections = {} self.connections_count = 0 self.manager = manager diff --git a/panther/main.py b/panther/main.py index 1be7c50..11af4d5 100644 --- a/panther/main.py +++ b/panther/main.py @@ -103,7 +103,6 @@ def _create_ws_connections_instance(self): # Create websocket connections instance if config['has_ws']: - config['websocket_connections'] = WebsocketConnections(manager=Manager()) # Websocket Redis Connection for middleware in config['http_middlewares']: if middleware.__class__.__name__ == 'RedisMiddleware': @@ -112,6 +111,10 @@ def _create_ws_connections_instance(self): else: self.ws_redis_connection = None + # Don't create Manager() if we are going to use Redis for PubSub + manager = None if self.ws_redis_connection else Manager() + config['websocket_connections'] = WebsocketConnections(manager=manager) + async def __call__(self, scope: dict, receive: Callable, send: Callable) -> None: """ 1.