Skip to content

Commit

Permalink
Merge pull request #338 from dvonthenen/make-async-like-sync-client
Browse files Browse the repository at this point in the history
Make AsyncLiveClient Similar to LiveClient
  • Loading branch information
davidvonthenen authored Mar 11, 2024
2 parents 2a7eaeb + d02b9ba commit 745aebe
Show file tree
Hide file tree
Showing 6 changed files with 269 additions and 125 deletions.
36 changes: 19 additions & 17 deletions deepgram/audio/microphone/microphone.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,10 @@ def __init__(
self.format = pyaudio.paInt16
self.channels = channels
self.input_device_index = input_device_index
self.push_callback_org = push_callback

self.asyncio_loop = None
self.asyncio_thread = None

if inspect.iscoroutinefunction(push_callback):
self.logger.verbose("async/await callback - wrapping")
# Run our own asyncio loop.
self.asyncio_thread = threading.Thread(target=self._start_asyncio_loop)
self.asyncio_thread.start()

self.push_callback = lambda data: asyncio.run_coroutine_threadsafe(
push_callback(data), self.asyncio_loop
).result()
else:
self.logger.verbose("regular threaded callback")
self.push_callback = push_callback

self.stream = None

def _start_asyncio_loop(self) -> None:
Expand Down Expand Up @@ -105,6 +93,19 @@ def start(self) -> bool:
stream_callback=self._callback,
)

if inspect.iscoroutinefunction(self.push_callback_org):
self.logger.verbose("async/await callback - wrapping")
# Run our own asyncio loop.
self.asyncio_thread = threading.Thread(target=self._start_asyncio_loop)
self.asyncio_thread.start()

self.push_callback = lambda data: asyncio.run_coroutine_threadsafe(
self.push_callback_org(data), self.asyncio_loop
).result()
else:
self.logger.verbose("regular threaded callback")
self.push_callback = self.push_callback_org

self.exit.clear()
self.stream.start_stream()

Expand Down Expand Up @@ -150,16 +151,17 @@ def finish(self) -> bool:
self.logger.notice("signal exit")
self.exit.set()

# Stop the stream.
if self.stream is not None:
self.stream.stop_stream()
self.stream.close()
self.stream = None

# clean up the thread
if self.asyncio_thread is not None:
self.asyncio_loop.call_soon_threadsafe(self.asyncio_loop.stop)
self.asyncio_thread.join() # Clean up.
self.asyncio_thread = None

self.asyncio_thread.join()
self.asyncio_thread = None
self.logger.notice("stream/recv thread joined")

self.logger.notice("finish succeeded")
Expand Down
177 changes: 104 additions & 73 deletions deepgram/clients/live/v1/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def __init__(self, config: DeepgramClientOptions):
self._socket = None
self._event_handlers = {event: [] for event in LiveTranscriptionEvents}
self.websocket_url = convert_to_websocket_url(self.config.url, self.endpoint)
self.exit_event = None

# starts the WebSocket connection for live transcription
async def start(
Expand All @@ -61,10 +62,10 @@ async def start(
**kwargs,
) -> bool:
self.logger.debug("AsyncLiveClient.start ENTER")
self.logger.info("kwargs: %s", options)
self.logger.info("options: %s", options)
self.logger.info("addons: %s", addons)
self.logger.info("members: %s", members)
self.logger.info("options: %s", kwargs)
self.logger.info("kwargs: %s", kwargs)

if isinstance(options, LiveOptions) and not options.check():
self.logger.error("options.check failed")
Expand All @@ -83,7 +84,7 @@ async def start(
if members is not None:
self.__dict__.update(members)

# add "kwargs" as members of the class
# set kwargs as members of the class
if kwargs is not None:
self.kwargs = kwargs
else:
Expand All @@ -101,6 +102,8 @@ async def start(
self.logger.debug("combined_options: %s", combined_options)

url_with_params = append_query_params(self.websocket_url, combined_options)
self.exit_event = asyncio.Event()

try:
self._socket = await _socket_connect(url_with_params, self.config.headers)

Expand Down Expand Up @@ -135,8 +138,24 @@ async def _emit(self, event: LiveTranscriptionEvents, *args, **kwargs) -> None:
async def _listening(self) -> None:
self.logger.debug("AsyncLiveClient._listening ENTER")

try:
async for message in self._socket:
while True:
try:
if self.exit_event.is_set():
self.logger.notice("_listening exiting gracefully")
self.logger.debug("AsyncLiveClient._listening LEAVE")
return

if self._socket is None:
self.logger.warning("socket is empty")
self.logger.debug("AsyncLiveClient._listening LEAVE")
return

message = await self._socket.recv()

if message is None:
self.logger.spam("message is None")
continue

data = json.loads(message)
response_type = data.get("type")
self.logger.debug("response_type: %s, data: %s", response_type, data)
Expand Down Expand Up @@ -206,48 +225,48 @@ async def _listening(self) -> None:
)
await self._emit(LiveTranscriptionEvents.Error, error=error)

except websockets.exceptions.ConnectionClosedOK as e:
self.logger.notice(f"_listening({e.code}) exiting gracefully")
self.logger.debug("AsyncLiveClient._listening LEAVE")
return

except websockets.exceptions.WebSocketException as e:
error: ErrorResponse = {
"type": "Exception",
"description": "WebSocketException in _listening",
"message": f"{e}",
"variant": "",
}
self.logger.notice(
f"WebSocket exception in _listening with code {e.code}: {e.reason}"
)
await self._emit(LiveTranscriptionEvents.Error, error)

self.logger.debug("AsyncLiveClient._listening LEAVE")

if (
"termination_exception" in self.options
and self.options["termination_exception"] == "true"
):
raise

except Exception as e:
error: ErrorResponse = {
"type": "Exception",
"description": "Exception in _listening",
"message": f"{e}",
"variant": "",
}
self.logger.error("Exception in _listening: %s", str(e))
await self._emit(LiveTranscriptionEvents.Error, error)

self.logger.debug("AsyncLiveClient._listening LEAVE")

if (
"termination_exception" in self.options
and self.options["termination_exception"] == "true"
):
raise
except websockets.exceptions.ConnectionClosedOK as e:
self.logger.notice(f"_listening({e.code}) exiting gracefully")
self.logger.debug("AsyncLiveClient._listening LEAVE")
return

except websockets.exceptions.WebSocketException as e:
error: ErrorResponse = {
"type": "Exception",
"description": "WebSocketException in AsyncLiveClient._listening",
"message": f"{e}",
"variant": "",
}
self.logger.notice(
f"WebSocket exception in AsyncLiveClient._listening with code {e.code}: {e.reason}"
)
await self._emit(LiveTranscriptionEvents.Error, error)

self.logger.debug("AsyncLiveClient._listening LEAVE")

if (
"termination_exception" in self.options
and self.options["termination_exception"] == "true"
):
raise

except Exception as e:
error: ErrorResponse = {
"type": "Exception",
"description": "Exception in AsyncLiveClient._listening",
"message": f"{e}",
"variant": "",
}
self.logger.error("Exception in AsyncLiveClient._listening: %s", str(e))
await self._emit(LiveTranscriptionEvents.Error, error)

self.logger.debug("AsyncLiveClient._listening LEAVE")

if (
"termination_exception" in self.options
and self.options["termination_exception"] == "true"
):
raise

# keep the connection alive by sending keepalive messages
async def _keep_alive(self) -> None:
Expand All @@ -259,6 +278,11 @@ async def _keep_alive(self) -> None:
counter += 1
await asyncio.sleep(ONE_SECOND)

if self.exit_event.is_set():
self.logger.notice("_keep_alive exiting gracefully")
self.logger.debug("AsyncLiveClient._keep_alive LEAVE")
return

if self._socket is None:
self.logger.notice("socket is None, exiting keep_alive")
self.logger.debug("AsyncLiveClient._keep_alive LEAVE")
Expand All @@ -270,25 +294,22 @@ async def _keep_alive(self) -> None:
and self.config.options.get("keepalive") == "true"
):
self.logger.verbose("Sending KeepAlive...")
try:
await self.send(json.dumps({"type": "KeepAlive"}))
except websockets.exceptions.WebSocketException as e:
self.logger.error("KeepAlive failed: %s", e)
await self.send(json.dumps({"type": "KeepAlive"}))

except websockets.exceptions.ConnectionClosedOK as e:
self.logger.notice(f"_keep_alive({e.code}) exiting gracefully")
self.logger.debug("AsyncLiveClient._keep_alive LEAVE")
return

except websockets.exceptions.ConnectionClosedError as e:
except websockets.exceptions.WebSocketException as e:
error: ErrorResponse = {
"type": "Exception",
"description": "ConnectionClosedError in _keep_alive",
"description": "WebSocketException in AsyncLiveClient._keep_alive",
"message": f"{e}",
"variant": "",
}
self.logger.error(
f"WebSocket connection closed in _keep_alive with code {e.code}: {e.reason}"
f"WebSocket connection closed in AsyncLiveClient._keep_alive with code {e.code}: {e.reason}"
)
await self._emit(LiveTranscriptionEvents.Error, error)

Expand All @@ -308,8 +329,10 @@ async def _keep_alive(self) -> None:
"message": f"{e}",
"variant": "",
}
self.logger.error(
"Exception in AsyncLiveClient._keep_alive: %s", str(e)
)
await self._emit(LiveTranscriptionEvents.Error, error)
self.logger.error("Exception in _keep_alive: %s", str(e))

self.logger.debug("AsyncLiveClient._keep_alive LEAVE")

Expand All @@ -323,31 +346,45 @@ async def _keep_alive(self) -> None:
self.logger.debug("AsyncLiveClient._keep_alive LEAVE")

# sends data over the WebSocket connection
async def send(self, data: Union[str, bytes]) -> int:
async def send(self, data: Union[str, bytes]) -> bool:
"""
Sends data over the WebSocket connection.
"""
self.logger.spam("AsyncLiveClient.send ENTER")
self.logger.spam("data: %s", data)

if self._socket is not None:
cnt = await self._socket.send(data)
self.logger.spam(f"send() succeeded. bytes: {cnt}")
try:
await self._socket.send(data)
except websockets.exceptions.WebSocketException as e:
self.logger.error("send() failed - WebSocketException: %s", str(e))
self.logger.spam("AsyncLiveClient.send LEAVE")
return False
except Exception as e:
self.logger.error("send() failed - Exception: %s", str(e))
self.logger.spam("AsyncLiveClient.send LEAVE")
return False

self.logger.spam(f"send() succeeded")
self.logger.spam("AsyncLiveClient.send LEAVE")
return cnt
return True

self.logger.error("send() failed. socket is None")
self.logger.spam("AsyncLiveClient.send LEAVE")
return 0
return False

async def finish(self) -> bool:
"""
Closes the WebSocket connection gracefully.
"""
self.logger.debug("AsyncLiveClient.finish ENTER")

if self._socket:
self.logger.notice("send CloseStream...")
# signal exit
self.exit_event.set()

# close the stream
self.logger.verbose("closing socket...")
if self._socket is not None:
self.logger.verbose("send CloseStream...")
await self._socket.send(json.dumps({"type": "CloseStream"}))

await asyncio.sleep(0.5)
Expand All @@ -358,14 +395,14 @@ async def finish(self) -> bool:
CloseResponse(type=LiveTranscriptionEvents.Close.value),
)

self.logger.notice("socket.wait_closed...")
self.logger.verbose("socket.wait_closed...")
try:
await self._socket.wait_closed()
except websockets.exceptions.WebSocketException as e:
self.logger.error("socket.wait_closed failed: %s", e)
self.logger.notice("socket.wait_closed succeeded")
self._socket = None

self.logger.notice("cancelling tasks...")
self.logger.verbose("cancelling tasks...")
try:
# Before cancelling, check if the tasks were created
if self._listen_thread is not None:
Expand All @@ -380,13 +417,7 @@ async def finish(self) -> bool:
except asyncio.CancelledError as e:
self.logger.error("tasks cancelled error: %s", e)

if self._socket is not None:
self.logger.notice("closing socket...")
await self._socket.close()

self._socket = None

self.logger.notice("finish succeeded")
self.logger.info("finish succeeded")
self.logger.debug("AsyncLiveClient.finish LEAVE")
return True

Expand Down
Loading

0 comments on commit 745aebe

Please sign in to comment.