From 5c618e930a9ec8aa9693ced4b0b87823705fdbd0 Mon Sep 17 00:00:00 2001 From: JacobPlaster Date: Mon, 13 Apr 2020 14:38:41 +0100 Subject: [PATCH] generic_websocket: set event_emmiter thread to daemon --- bfxapi/utils/custom_logger.py | 4 ++++ bfxapi/websockets/generic_websocket.py | 16 +++++++--------- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/bfxapi/utils/custom_logger.py b/bfxapi/utils/custom_logger.py index 52669cb5..6ae38c36 100644 --- a/bfxapi/utils/custom_logger.py +++ b/bfxapi/utils/custom_logger.py @@ -82,6 +82,10 @@ def __init__(self, name, logLevel='DEBUG'): self.addHandler(console) logging.addLevelName(self.TRADE, "TRADE") return + + def set_level(self, level): + logging.Logger.setLevel(self, level) + def trade(self, message, *args, **kws): """ diff --git a/bfxapi/websockets/generic_websocket.py b/bfxapi/websockets/generic_websocket.py index c1cab8df..826d4742 100644 --- a/bfxapi/websockets/generic_websocket.py +++ b/bfxapi/websockets/generic_websocket.py @@ -3,6 +3,7 @@ """ import asyncio +import concurrent.futures import websockets import socket import json @@ -56,19 +57,14 @@ async def send(self, data): await self.ws.send(data) def _start_event_worker(): - async def event_sleep_process(): - """ - sleeping process for event emitter to schedule on - """ - while True: - await asyncio.sleep(0) def start_loop(loop): asyncio.set_event_loop(loop) - loop.run_until_complete(event_sleep_process()) + loop.run_forever() event_loop = asyncio.new_event_loop() + ee = EventEmitter(scheduler=asyncio.ensure_future) worker = Thread(target=start_loop, args=(event_loop,)) + worker.daemon = True worker.start() - ee = EventEmitter(scheduler=asyncio.ensure_future, loop=event_loop) return ee class GenericWebsocket: @@ -76,10 +72,11 @@ class GenericWebsocket: Websocket object used to contain the base functionality of a websocket. Inlcudes an event emitter and a standard websocket client. """ + logger = CustomLogger('BfxWebsocket', logLevel="DEBUG") def __init__(self, host, logLevel='INFO', max_retries=5, create_event_emitter=None): self.host = host - self.logger = CustomLogger('BfxWebsocket', logLevel=logLevel) + self.logger.set_level(logLevel) # overide 'error' event to stop it raising an exception # self.events.on('error', self.on_error) self.ws = None @@ -139,6 +136,7 @@ async def _run_socket(self): sId = len(self.sockets) s = Socket(sId) self.sockets[sId] = s + loop = asyncio.get_event_loop() while retries < self.max_retries and self.attempt_retry: try: async with websockets.connect(self.host) as websocket: