From 11aa9dc803fc02766c4fe9c6bb1b57f82c297881 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Tue, 14 May 2024 00:30:32 -0700 Subject: [PATCH] pipeline: allow stopping tasks with StopTaskFrame --- src/pipecat/frames/frames.py | 11 +++++++++++ src/pipecat/pipeline/task.py | 6 ++++-- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index 7712f40be..27f85a182 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -205,6 +205,17 @@ class ErrorFrame(SystemFrame): def __str__(self): return f"{self.name}(error: {self.error})" + +@dataclass +class StopTaskFrame(SystemFrame): + """Indicates that a pipeline task should be stopped. This should inform the + pipeline processors that they should stop pushing frames but that they + should be kept in a running state. + + """ + pass + + # # Control frames # diff --git a/src/pipecat/pipeline/task.py b/src/pipecat/pipeline/task.py index f13f82620..70504d2ca 100644 --- a/src/pipecat/pipeline/task.py +++ b/src/pipecat/pipeline/task.py @@ -8,7 +8,7 @@ from typing import AsyncIterable, Iterable -from pipecat.frames.frames import CancelFrame, EndFrame, ErrorFrame, Frame, StartFrame +from pipecat.frames.frames import CancelFrame, EndFrame, ErrorFrame, Frame, StartFrame, StopTaskFrame from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.utils.utils import obj_count, obj_id @@ -74,7 +74,9 @@ async def _process_task_queue(self): frame = await self._task_queue.get() await self._source.process_frame(frame, FrameDirection.DOWNSTREAM) self._task_queue.task_done() - running = not (isinstance(frame, CancelFrame) or isinstance(frame, EndFrame)) + running = not (isinstance(frame, StopTaskFrame) or + isinstance(frame, CancelFrame) or + isinstance(frame, EndFrame)) # We just enqueue None to terminate the task. await self._up_queue.put(None)