Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

initializes Inboundprotcol - it wasn't clear how this was supposed to happen and the library was broken on master. #78

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 30 additions & 30 deletions switchio/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,19 @@
#
# Copyright (c) 2017 Tyler Goodlet <[email protected]>
"""
Asyncio ESL connection abstactions
Asyncio ESL connection abstractions
"""
import asyncio
from functools import partial
from concurrent import futures
from threading import get_ident
import traceback
from . import utils
from .protocol import InboundProtocol


class ConnectionError(utils.ESLError):
"Failed to connect to ESL"


async def await_in_order(awaitables, loop, timeout=None):
awaitables = map(partial(asyncio.ensure_future, loop=loop), awaitables)
for awaitable in awaitables:
Expand All @@ -30,7 +29,6 @@ async def await_in_order(awaitables, loop, timeout=None):

return res


def run_in_order_threadsafe(awaitables, loop, timeout=0.5, block=True):
""""Given a sequence of awaitables, schedule each threadsafe in order
optionally blocking until completion.
Expand All @@ -54,30 +52,32 @@ def run_in_order_threadsafe(awaitables, loop, timeout=0.5, block=True):

return future


async def connect_and_auth(host, port, password, prot, loop, log, timeout=0.5):
"""Try to create a connection and authenticate to the
target FS ESL.
"""
target FS ESL."""
msg = ("Failed to connect to server at '{}:{}'\n"
"Please check that FreeSWITCH is running and "
"accepting ESL connections.".format(host, port))
try:
log.debug("Attempting to create connection to {}:{}".format(host, port))
await asyncio.wait_for(
loop.create_connection(lambda: prot, host, port),
timeout=timeout)
log.debug("Connection to {}:{} created".format(host, port))
except (
ConnectionRefusedError, asyncio.TimeoutError, OSError,
futures.TimeoutError,
) as err:
raise ConnectionError(msg.format(host, port))
log.error(f"Connection attempt failed: {traceback.format_exc()}")
raise ConnectionError(msg.format(host, port)) from err

# TODO: consider using the asyncio_timeout lib here
try:
log.debug("Attempting to authenticate to {}:{}".format(host, port))
await asyncio.wait_for(prot.authenticate(), timeout)
except asyncio.TimeoutError:
raise ConnectionRefusedError(msg.format(host, port))

log.debug("Authentication to {}:{} succeeded".format(host, port))
except asyncio.TimeoutError as err:
log.error(f"Authentication attempt failed: {traceback.format_exc()}")
raise ConnectionRefusedError(msg.format(host, port)) from err

async def async_reconnect(host, port, password, prot, loop, log):
log.info("Attempting to reconnect to {}:{}".format(host, port))
Expand Down Expand Up @@ -107,7 +107,6 @@ async def async_reconnect(host, port, password, prot, loop, log):
log.info("Successfully reconnected to '{}:{}'"
.format(host, port))


class Connection(object):
"""An ESL connection implemented using an ``asyncio`` TCP protocol.

Expand Down Expand Up @@ -140,6 +139,19 @@ def __init__(self, host, port='8021', password='ClueCon', loop=None,
self.loop = loop
self.autorecon = autorecon
self.protocol = None
self.initialize_protocol()

def initialize_protocol(self):
self.protocol = InboundProtocol(
self.host, self.password, self.loop, autorecon=self.autorecon,
on_disconnect=self.reconnect)

def reconnect(self, prot):
"""Schedule a reconnection task."""
self.log.debug("Scheduling a reconnection task")
asyncio.ensure_future(async_reconnect(
self.host, self.port, self.password, prot,
self.loop, self.log), loop=self.loop)

def __enter__(self, **kwargs):
self.connect(**kwargs)
Expand All @@ -166,8 +178,7 @@ def connect(
if not self.connected() or not block:

def reconnect(prot):
"""Schedule a reconnection task.
"""
"""Schedule a reconnection task."""
self.log.debug("Scheduling a reconnection task")
asyncio.ensure_future(async_reconnect(
host, port, password, prot,
Expand Down Expand Up @@ -210,22 +221,14 @@ def disconnect(self, block=True, loop=None):
).result()

async def recv_event(self):
"""Retreive the latest queued event.
"""
"""Retrieve the latest queued event."""
queue = self.protocol.event_queue
event = await queue.get()
queue.task_done()
return event

def execute(self, uuid, app, arg='', params='', loops=1):
"""Execute a dialplan ``app`` with argument ``arg``.
"""
return self.protocol.sendmsg(uuid, 'execute', app, arg, params,
loops=loops)

def api(self, cmd, errcheck=True, block=False, timeout=0.5):
'''Invoke api command (with error checking by default).
'''
'''Invoke api command (with error checking by default).'''
if not self.connected():
raise ConnectionError("Call ``connect()`` first")
self.log.debug("api cmd '{}'".format(cmd))
Expand All @@ -247,8 +250,7 @@ def api(self, cmd, errcheck=True, block=False, timeout=0.5):
return future.result(0.005)

def cmd(self, cmd):
'''Return the string-body output from invoking a command.
'''
'''Return the string-body output from invoking a command.'''
event = self.api(cmd, block=True)
_, body = self._handle_socket_data(event)
return body
Expand Down Expand Up @@ -303,10 +305,8 @@ def _handle_socket_data(event):
raise utils.APIError(body)
return True, body


def get_connection(host, port=8021, password='ClueCon', loop=None):
"""ESL connection factory.
"""
"""ESL connection factory."""
loop = loop or asyncio.get_event_loop()
loop._tid = get_ident()
return Connection(host, port=port, password=password, loop=loop)