From 9777083119bb7abb1fdaebeb61fa4a60afac9893 Mon Sep 17 00:00:00 2001 From: Robert Dunmire III Date: Sun, 13 Aug 2023 14:51:48 -0400 Subject: [PATCH 1/2] Update voice_pipeline.py --- voice_pipeline.py | 71 ++++++++++++++++++++++++++++++++--------------- 1 file changed, 49 insertions(+), 22 deletions(-) diff --git a/voice_pipeline.py b/voice_pipeline.py index 4162a3d..b0d00e2 100644 --- a/voice_pipeline.py +++ b/voice_pipeline.py @@ -35,6 +35,8 @@ _LOGGER = logging.getLogger(__name__) RESULT = "result" +ERROR = "error" +MESSAGE = "message" EVENT = "event" NAME = "name" DATA = "data" @@ -67,15 +69,16 @@ class PorcupinePipeline: _message_id = 1 _last_ping = 0 _devices = {} + _conversation_id = None _followup = False ########################################## def __init__(self, args: argparse.Namespace): """Setup Websocket client and audio pipeline""" - + signal.signal(signal.SIGINT, self.stop) signal.signal(signal.SIGTERM, self.stop) - + self._state = State(args=args) self._state.running = False @@ -135,22 +138,24 @@ def start(self) -> None: self._event_loop.run_until_complete(self._start_audio_pipeline()) ########################################## - def stop(self) -> None: + def stop(self, **args) -> None: """Stop audio thread and loop""" _LOGGER.info("Stopping") + if args: + _LOGGER.debug(args) self._state.recording = False self._state.running = False self._websocket = None - + self._audio_thread.join(1) - + if hasattr(self._porcupine, "delete"): self._porcupine.delete() - + self._porcupine = None - + ########################################## async def _ping(self): """Send Ping to HA""" @@ -163,10 +168,14 @@ async def _ping(self): await asyncio.sleep(0.3) return - await self._send_ws({TYPE: "ping"}) - response = await self._websocket.receive_json(timeout=WEBSOCKET_TIMEOUT) + response = await self._send_ws({TYPE: "ping"}) + if response.get(TYPE) == "pong": + self._state.connected = True + + else: + self._state.connected = False + _LOGGER.error(response) - assert response[TYPE] == "pong", response self._last_ping = int(time.time()) ########################################## @@ -193,6 +202,10 @@ async def _send_ws(self, message: dict) -> None: await self._websocket.send_json(message) self._message_id += 1 + response = await self._websocket.receive_json(timeout=WEBSOCKET_TIMEOUT) + _LOGGER.debug("send_ws() response=%s", response) + return response + ########################################## async def _start_audio_pipeline(self): """Start HA audio pipeline""" @@ -244,16 +257,15 @@ async def get_audio_pipeline(self) -> None: ) # Get list of available pipelines and resolve name - await self._send_ws( + msg = await self._send_ws( { TYPE: "assist_pipeline/pipeline/list", } ) - msg = await self._websocket.receive_json() _LOGGER.debug(msg) if RESULT not in msg: _LOGGER.error("FAiled to get audio pipeline from HA") - _LOGGER.error("response=%s", msg) + _LOGGER.error(msg) return pipelines = msg[RESULT]["pipelines"] @@ -295,9 +307,12 @@ async def _process_loop(self) -> None: pipeline_args["pipeline"] = self._pipeline_id # Send audio pipeline args to HA - await self._send_ws(pipeline_args) - msg = await self._websocket.receive_json() - assert msg["success"], "Pipeline failed to start" + msg = await self._send_ws(pipeline_args) + if not msg.get("success"): + _LOGGER.error( + msg.get(ERROR, {}).get(MESSAGE, "Pipeline failed to start") + ) + return _LOGGER.info( "Listening and sending audio to voice pipeline %s", self._pipeline_id @@ -358,6 +373,10 @@ async def stt_task(self) -> None: event_data.get("message"), ) break + + elif event_type == "intent-end": + intent = event_data.get("intent_output",{}) + self._conversation_id = intent.get("conversation_id") elif event_type == "stt-end": # HA finished processing speech to text with result @@ -377,7 +396,8 @@ async def stt_task(self) -> None: else: _LOGGER.debug("event_type=%s", event_type) - _LOGGER.debug("event_data=%s", event_data) + _LOGGER.debug("event=%s", event) + #_LOGGER.debug("event_data=%s", event_data) receive_event_task = asyncio.create_task(self._websocket.receive_json()) @@ -450,13 +470,21 @@ def read_audio(self) -> None: async def _play_response(self, url: str) -> None: """Play response wav file from HA""" - request = requests.get(url, timeout=(10, 30)) - if request.status_code > 299: + try: + audio_data = None + request = requests.get(url, timeout=(10, 15)) + if request.status_code < 300: + audio_data = request.content + + except (TimeoutError, ConnectionError) as err: + _LOGGER.error("Exception: %s", err) + + if not audio_data: _LOGGER.error("Failed to get audio file at %s", url) return audio = simpleaudio.play_buffer( - request.content, + audio_data, self._state.args.channels, self._state.args.width, self._state.args.rate, @@ -541,7 +569,6 @@ def get_porcupine(state: State) -> Porcupine: ########################################## if __name__ == "__main__": - args = get_cli_args() _LOGGER.setLevel(level=logging.DEBUG if args.debug else logging.INFO) if args.debug: @@ -558,6 +585,6 @@ def get_porcupine(state: State) -> Porcupine: audio_pipeline = PorcupinePipeline(args) with suppress(KeyboardInterrupt): audio_pipeline.start() - + audio_pipeline.stop() sys.exit(0) From 8f096839c3d72fcd4c89c79f7a58d52c03734b5b Mon Sep 17 00:00:00 2001 From: Robert Dunmire III Date: Sun, 13 Aug 2023 15:08:39 -0400 Subject: [PATCH 2/2] Update for stop() --- .vscode/settings.json | 2 ++ voice_pipeline.py | 47 +++++++++++++++++++++++++------------------ 2 files changed, 29 insertions(+), 20 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index d2fc008..c16a0d0 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -9,6 +9,8 @@ "pvporcupine", "pvrecorder", "ratecv", + "signame", + "signum", "SIGTERM", "simpleaudio", "tomono", diff --git a/voice_pipeline.py b/voice_pipeline.py index b0d00e2..f9ce702 100644 --- a/voice_pipeline.py +++ b/voice_pipeline.py @@ -68,6 +68,7 @@ class PorcupinePipeline: _sslcontext = None _message_id = 1 _last_ping = 0 + _recorder = None _devices = {} _conversation_id = None _followup = False @@ -100,6 +101,11 @@ def __init__(self, args: argparse.Namespace): sys.exit(0) self._porcupine = get_porcupine(self._state) + self._recorder = PvRecorder( + device_index=args.audio_device, + frame_length=self._porcupine.frame_length, + ) + self._audio_thread = threading.Thread( target=self.read_audio, daemon=True, @@ -138,16 +144,17 @@ def start(self) -> None: self._event_loop.run_until_complete(self._start_audio_pipeline()) ########################################## - def stop(self, **args) -> None: + def stop(self, signum=0, frame=None) -> None: """Stop audio thread and loop""" _LOGGER.info("Stopping") - if args: - _LOGGER.debug(args) + if signum: + signame = signal.Signals(signum).name + _LOGGER.debug(signame) - self._state.recording = False self._state.running = False - self._websocket = None + self._state.recording = False + self._recorder.stop() self._audio_thread.join(1) @@ -155,6 +162,8 @@ def stop(self, **args) -> None: self._porcupine.delete() self._porcupine = None + self._websocket = None + sys.exit(0) ########################################## async def _ping(self): @@ -335,11 +344,14 @@ async def stt_task(self) -> None: [msg[EVENT][DATA]["runner_data"].get("stt_binary_handler_id")] ) - receive_event_task = asyncio.create_task(self._websocket.receive_json()) + receive_event_task = asyncio.create_task( + self._websocket.receive_json(timeout=WEBSOCKET_TIMEOUT) + ) + while self._state.connected: audio_chunk = await self._state.audio_queue.get() if not audio_chunk: - _LOGGER.error("No audio chunk in queue") + break # Prefix binary message with handler id send_audio_task = asyncio.create_task( @@ -373,9 +385,9 @@ async def stt_task(self) -> None: event_data.get("message"), ) break - + elif event_type == "intent-end": - intent = event_data.get("intent_output",{}) + intent = event_data.get("intent_output", {}) self._conversation_id = intent.get("conversation_id") elif event_type == "stt-end": @@ -397,10 +409,13 @@ async def stt_task(self) -> None: else: _LOGGER.debug("event_type=%s", event_type) _LOGGER.debug("event=%s", event) - #_LOGGER.debug("event_data=%s", event_data) + # _LOGGER.debug("event_data=%s", event_data) receive_event_task = asyncio.create_task(self._websocket.receive_json()) + if not self._state.running: + break + if send_audio_task not in done: await send_audio_task @@ -413,16 +428,11 @@ def read_audio(self) -> None: ratecv_state = None _LOGGER.debug("Reading audio") - recorder = PvRecorder( - device_index=args.audio_device, - frame_length=self._porcupine.frame_length, - ) - - recorder.start() + self._recorder.start() self._state.recording = False while self._state.running: try: - pcm = recorder.read() + pcm = self._recorder.read() except OSError as err: _LOGGER.error("Exception: %s", err) @@ -585,6 +595,3 @@ def get_porcupine(state: State) -> Porcupine: audio_pipeline = PorcupinePipeline(args) with suppress(KeyboardInterrupt): audio_pipeline.start() - - audio_pipeline.stop() - sys.exit(0)