Skip to content

Commit

Permalink
pipeline: allow stopping tasks with StopTaskFrame
Browse files Browse the repository at this point in the history
  • Loading branch information
aconchillo committed May 14, 2024
1 parent 922cdef commit 11aa9dc
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 2 deletions.
11 changes: 11 additions & 0 deletions src/pipecat/frames/frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,17 @@ class ErrorFrame(SystemFrame):
def __str__(self):
return f"{self.name}(error: {self.error})"


@dataclass
class StopTaskFrame(SystemFrame):
"""Indicates that a pipeline task should be stopped. This should inform the
pipeline processors that they should stop pushing frames but that they
should be kept in a running state.
"""
pass


#
# Control frames
#
Expand Down
6 changes: 4 additions & 2 deletions src/pipecat/pipeline/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from typing import AsyncIterable, Iterable

from pipecat.frames.frames import CancelFrame, EndFrame, ErrorFrame, Frame, StartFrame
from pipecat.frames.frames import CancelFrame, EndFrame, ErrorFrame, Frame, StartFrame, StopTaskFrame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.utils.utils import obj_count, obj_id

Expand Down Expand Up @@ -74,7 +74,9 @@ async def _process_task_queue(self):
frame = await self._task_queue.get()
await self._source.process_frame(frame, FrameDirection.DOWNSTREAM)
self._task_queue.task_done()
running = not (isinstance(frame, CancelFrame) or isinstance(frame, EndFrame))
running = not (isinstance(frame, StopTaskFrame) or
isinstance(frame, CancelFrame) or
isinstance(frame, EndFrame))
# We just enqueue None to terminate the task.
await self._up_queue.put(None)

Expand Down

0 comments on commit 11aa9dc

Please sign in to comment.