Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes from Mr. Williams #69

Open
wants to merge 20 commits into
base: master
Choose a base branch
from
Open
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

setup(
name="switchio",
version='0.1.0.alpha1',
version='0.1.0.a1~mgwilliams-1',
description='asyncio powered FreeSWITCH cluster control',
long_description=readme,
license='Mozilla',
Expand Down
8 changes: 6 additions & 2 deletions switchio/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ class Client(object):
app_id_header = utils.xheaderify('switchio_app')

def __init__(self, host='127.0.0.1', port='8021', auth='ClueCon',
call_tracking_header=None, listener=None, logger=None):
call_tracking_header=None, listener=None, logger=None,
autorecon=30, reconnect_delay=0.1):
self.host = host
self.port = port
self.auth = auth
Expand All @@ -50,7 +51,10 @@ def __init__(self, host='127.0.0.1', port='8021', auth='ClueCon',

# WARNING: order of these next steps matters!
# create a local connection for sending commands
self._con = get_connection(self.host, self.port, self.auth)
self.log.debug(f'api.__init__: {autorecon}')
self._con = get_connection(self.host, self.port, self.auth,
autorecon=autorecon,
reconnect_delay=reconnect_delay)
# if the listener is provided it is expected that the
# user will run the set up methods (i.e. connect, start, etc..)
self.listener = listener
Expand Down
2 changes: 1 addition & 1 deletion switchio/apps/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def load(packages=(), imp_excs=('pandas',)):
imp_excs=imp_excs,
):
if isinstance(app, ImportError):
utils.log_to_stderr().warn("'{}' failed to load - {}\n".format(
utils.log_to_stderr().warning("'{}' failed to load - {}\n".format(
path, app.message))
else:
apps_map[path] = app
Expand Down
2 changes: 1 addition & 1 deletion switchio/apps/measure/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
try:
import pandas as pd
except ImportError as ie:
utils.log_to_stderr().warn(str(ie))
utils.log_to_stderr().warning(str(ie))
pd = None
else:
from . import shmarray
Expand Down
56 changes: 37 additions & 19 deletions switchio/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,27 +81,38 @@ async def connect_and_auth(host, port, password, prot, loop, log, timeout=0.5):

async def async_reconnect(host, port, password, prot, loop, log):
log.info("Attempting to reconnect to {}:{}".format(host, port))
log.info(f'async_reconnect: {prot.autorecon}')
if not prot.autorecon:
log.debug("Autorecon had been disabled")
return
elif prot.autorecon is True:
while True:
try:
await connect_and_auth(
host, port, password, prot, loop, log, timeout=1)
break
except (ConnectionError, ConnectionRefusedError):
log.warning(
"Failed reconnection attempt... will retry in "
f"{prot.reconnect_delay} seconds...")
await asyncio.sleep(prot.reconnect_delay)
else:
count = prot.autorecon

for i in range(count):
try:
await connect_and_auth(
host, port, password, prot, loop, log, timeout=1)
break
except ConnectionError:
for i in range(count):
try:
await connect_and_auth(
host, port, password, prot, loop, log, timeout=1)
break
except (ConnectionError, ConnectoinRefusedError):
log.warning(
"Failed reconnection attempt...retries"
" left {}".format(count - i))
await asyncio.sleep(prot.reconnect_delay)
else:
log.warning(
"Failed reconnection attempt...retries"
" left {}".format(count - i))
await asyncio.sleep(0.1)
else:
log.warning(
"Reconnection attempts to '{}' failed. Please call"
" 'connect' manually when server is ready "
.format(host))
"Reconnection attempts to '{}' failed. Please call"
" 'connect' manually when server is ready "
.format(host))

if prot.connected():
log.info("Successfully reconnected to '{}:{}'"
Expand All @@ -121,7 +132,7 @@ class Connection(object):
:type autorecon: int or bool
"""
def __init__(self, host, port='8021', password='ClueCon', loop=None,
autorecon=30):
autorecon=30, reconnect_delay=0.1):
"""
Parameters
-----------
Expand All @@ -139,7 +150,9 @@ def __init__(self, host, port='8021', password='ClueCon', loop=None,
self._sub = () # events subscription
self.loop = loop
self.autorecon = autorecon
self.reconnect_delay = reconnect_delay
self.protocol = None
self.log.debug(f'Connection.__init__: {self.autorecon}')

def __enter__(self, **kwargs):
self.connect(**kwargs)
Expand All @@ -163,6 +176,8 @@ def connect(
self.loop = loop if loop else self.loop
loop = self.loop

self.log.debug(f'Connection.connect: {self.autorecon}')

if not self.connected() or not block:

def reconnect(prot):
Expand All @@ -173,9 +188,10 @@ def reconnect(prot):
host, port, password, prot,
loop, self.log), loop=loop)

self.log.debug(f'Connection.connect:protocol: {self.autorecon}')
prot = self.protocol = InboundProtocol(
self.host, password, loop, autorecon=self.autorecon,
on_disconnect=reconnect)
on_disconnect=reconnect, reconnect_delay=self.reconnect_delay)

coro = connect_and_auth(
host, port, password, prot, self.loop, self.log)
Expand Down Expand Up @@ -304,9 +320,11 @@ def _handle_socket_data(event):
return True, body


def get_connection(host, port=8021, password='ClueCon', loop=None):
def get_connection(host, port=8021, password='ClueCon', loop=None,
autorecon=30, reconnect_delay=0.1):
"""ESL connection factory.
"""
loop = loop or asyncio.get_event_loop()
loop._tid = get_ident()
return Connection(host, port=port, password=password, loop=loop)
return Connection(host, port=port, password=password,
loop=loop, autorecon=autorecon, reconnect_delay=reconnect_delay)
8 changes: 5 additions & 3 deletions switchio/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ def _handle_destroy(self, e):
call.sessions.remove(sess)
else:
# session was somehow tracked by the wrong call
self.log.err("session '{}' mismatched with call '{}'?"
self.log.error("session '{}' mismatched with call '{}'?"
.format(sess.uuid, call.uuid))

# all sessions hungup
Expand Down Expand Up @@ -396,10 +396,12 @@ def unsubscribe(self, evname):


def get_listener(
host, port=8021, password='ClueCon', app_id_headers=None,
host, port=8021, password='ClueCon', app_id_headers=None, autorecon=False,
reconnect_delay=None,
call_tracking_header='variable_call_uuid', max_limit=float('inf'),
):
el = get_event_loop(
host, port, password, app_id_headers=app_id_headers or {})
host, port, password, app_id_headers=app_id_headers or {},
autorecon=autorecon, reconnect_delay=reconnect_delay)
return EventListener(
el, call_tracking_header=call_tracking_header, max_limit=max_limit)
8 changes: 4 additions & 4 deletions switchio/loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def new_event_loop():
import uvloop
return uvloop.new_event_loop()
except ImportError as err:
utils.log_to_stderr().warn(str(err))
utils.log_to_stderr().warning(str(err))
return asyncio.new_event_loop()


Expand All @@ -61,7 +61,7 @@ class EventLoop(object):
AUTH = 'ClueCon'

def __init__(self, host=HOST, port=PORT, auth=AUTH, app_id_headers=None,
loop=None):
loop=None, autorecon=False, reconnect_delay=None):
'''
:param str host: Hostname or IP addr of the FS server
:param str port: Port on which the FS process is listening for ESL
Expand Down Expand Up @@ -95,7 +95,7 @@ def __init__(self, host=HOST, port=PORT, auth=AUTH, app_id_headers=None,

# set up contained connections
self._con = get_connection(self.host, self.port, self.auth,
loop=loop)
loop=loop, autorecon=autorecon, reconnect_delay=reconnect_delay)

self.coroutines = {} # coroutine chains, one for each event type
self._entry_fut = None
Expand Down Expand Up @@ -223,7 +223,7 @@ async def _listen_forever(self):
if evname:
consumed = await self._process_event(e, evname)
if not consumed:
self.log.warn("unconsumed event '{}'?".format(e))
self.log.debug("unconsumed event '{}'?".format(e))
else:
self.log.warn("received unnamed event '{}'?".format(e))

Expand Down
9 changes: 8 additions & 1 deletion switchio/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,16 @@ class InboundProtocol(asyncio.Protocol):
``asyncio.Queue``.
"""
def __init__(self, host, password, loop, autorecon=False,
on_disconnect=None):
on_disconnect=None, reconnect_delay=None):
self.host = host
self.password = password
self.loop = loop
self.on_disconnect = on_disconnect
self.autorecon = autorecon
self.reconnect_delay = reconnect_delay
self.event_queue = asyncio.Queue(loop=loop)
self.log = utils.get_logger(utils.pstr(self))
self.log.debug(f'Protocol.__init__(1): {autorecon}')
self.transport = None
self._previous = None, None
# segment data in the form (event, size, data)
Expand All @@ -47,6 +49,8 @@ def __init__(self, host, password, loop, autorecon=False,
for ctype in ['command/reply', 'auth/request', 'api/response']:
self._futures_map.get(ctype)

self.log.debug(f'Protocol.__init__: {self.autorecon}')

def connected(self):
return bool(self.transport) and not self.transport.is_closing()

Expand All @@ -68,6 +72,9 @@ def connection_lost(self, exc):
self._auth_resp = None
self.log.debug('The connection closed @ {}'.format(self.host))
self._disconnected.set_result(True)

self.log.debug(f'connection_lost: {self.autorecon}')

if self.autorecon:
self.on_disconnect(self)

Expand Down