Skip to content

Commit

Permalink
generic_websocket: set event_emmiter thread to daemon
Browse files Browse the repository at this point in the history
  • Loading branch information
JacobPlaster committed Apr 13, 2020
1 parent e0d58c5 commit 5c618e9
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 9 deletions.
4 changes: 4 additions & 0 deletions bfxapi/utils/custom_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ def __init__(self, name, logLevel='DEBUG'):
self.addHandler(console)
logging.addLevelName(self.TRADE, "TRADE")
return

def set_level(self, level):
logging.Logger.setLevel(self, level)


def trade(self, message, *args, **kws):
"""
Expand Down
16 changes: 7 additions & 9 deletions bfxapi/websockets/generic_websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""

import asyncio
import concurrent.futures
import websockets
import socket
import json
Expand Down Expand Up @@ -56,30 +57,26 @@ async def send(self, data):
await self.ws.send(data)

def _start_event_worker():
async def event_sleep_process():
"""
sleeping process for event emitter to schedule on
"""
while True:
await asyncio.sleep(0)
def start_loop(loop):
asyncio.set_event_loop(loop)
loop.run_until_complete(event_sleep_process())
loop.run_forever()
event_loop = asyncio.new_event_loop()
ee = EventEmitter(scheduler=asyncio.ensure_future)
worker = Thread(target=start_loop, args=(event_loop,))
worker.daemon = True
worker.start()
ee = EventEmitter(scheduler=asyncio.ensure_future, loop=event_loop)
return ee

class GenericWebsocket:
"""
Websocket object used to contain the base functionality of a websocket.
Inlcudes an event emitter and a standard websocket client.
"""
logger = CustomLogger('BfxWebsocket', logLevel="DEBUG")

def __init__(self, host, logLevel='INFO', max_retries=5, create_event_emitter=None):
self.host = host
self.logger = CustomLogger('BfxWebsocket', logLevel=logLevel)
self.logger.set_level(logLevel)
# overide 'error' event to stop it raising an exception
# self.events.on('error', self.on_error)
self.ws = None
Expand Down Expand Up @@ -139,6 +136,7 @@ async def _run_socket(self):
sId = len(self.sockets)
s = Socket(sId)
self.sockets[sId] = s
loop = asyncio.get_event_loop()
while retries < self.max_retries and self.attempt_retry:
try:
async with websockets.connect(self.host) as websocket:
Expand Down

0 comments on commit 5c618e9

Please sign in to comment.