Strange behaviour with streaming events from nested workflow #16177
-
I'm having trouble getting a nested workflow working properly with event streaming. The workflow runs fine, but the event streaming is behaving strangely. The workflow I'm testing is a variation on the Joke example from the documentation. I have a The first time I run the workflow without a If I run the workflow a third time with no changes, the events come through, but if I run it a fourth time with no changes, the issue reappears. I've logged the context object and made sure it's the exact same object for each run. I also checked that I'm awaiting the result properly each time. Am I doing something wrong, or is there an async bug somewhere here? My workflow code: from llama_index.core.workflow import (
StartEvent,
StopEvent,
Workflow,
step,
Context,
Event,
)
class MessageEvent(Event):
message: str
class TopicWorkflow(Workflow):
@step
async def start(self, ctx: Context, event: StartEvent) -> StopEvent:
print("TopicWorkflow: Asking for topic")
ctx.write_event_to_stream(MessageEvent(message="MESSAGE FROM TOPIC WORKFLOW"))
return StopEvent(result="Give me a topic and I'll tell you a joke!")
class JokeWorkflow(Workflow):
@step
async def start(self, ctx: Context, event: StartEvent) -> StopEvent:
print(f"JokeWorkflow: Telling joke about {event.topic}")
ctx.write_event_to_stream(MessageEvent(message="MESSAGE FROM JOKE WORKFLOW"))
return StopEvent(result=f"Here is a funny joke about {event.topic}")
class MainWorkflow(Workflow):
@step
async def start(
self,
context: Context,
event: StartEvent,
topic_workflow: Workflow,
joke_workflow: Workflow,
) -> StopEvent:
topic = await context.get("topic", "")
print(context)
if topic == "":
# Context has no topic; we need to ask for it
print("MainWorkflow: Asking for topic")
handler = topic_workflow.run()
async for nested_event in handler.stream_events():
context.write_event_to_stream(nested_event)
res = await handler
else:
# We have a topic, tell a joke about it
print(f"MainWorkflow: Already have topic {topic}")
handler = joke_workflow.run(topic=topic)
async for nested_event in handler.stream_events():
context.write_event_to_stream(nested_event)
res = await handler
return StopEvent(result=res) Main: import asyncio
from project_context_bootstrapping.workflows.test import (
MainWorkflow,
TopicWorkflow,
JokeWorkflow,
)
from llama_index.core.workflow import Context
async def main():
w = MainWorkflow(timeout=60, verbose=True)
w.add_workflows(
topic_workflow=TopicWorkflow(),
joke_workflow=JokeWorkflow(),
)
context = Context(workflow=w)
# First workflow run with no topic
print("Running workflow with no topic")
handler = w.run(ctx=context)
async for event in handler.stream_events():
print(f">>>>> EVENT (1): {event}")
result = await handler
print("FIRST RESULT: ", result)
print("")
# Set the joke topic
print("Running workflow second time")
await context.set("topic", "pirates")
# Second workflow run with a topic in the context
# Issue: I get no events this time (except for the last event again)
handler = w.run(ctx=context)
async for event in handler.stream_events():
print(f">>>>> EVENT (2): {event}")
result = await handler
print("SECOND RESULT: ", result)
# Third workflow run, no changes
# This time, events come through fine
print("")
handler = w.run(ctx=context)
async for event in handler.stream_events():
print(f">>>>> EVENT (3): {event}")
result = await handler
print("THIRD RESULT: ", result)
# Fourth workflow run, no changes
# Previous issue reappears :(
print("")
handler = w.run(ctx=context)
async for event in handler.stream_events():
print(f">>>>> EVENT (4): {event}")
result = await handler
print("FOURTH RESULT: ", result)
if __name__ == "__main__":
asyncio.run(main()) Output:
|
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 5 replies
-
Hey there, @rohanliston! 👋 I'm here to help you with any bugs, questions, or contributions you have. Let's tackle this together! The inconsistent behavior in your nested workflow's event streaming, where it sometimes repeats the last event from the previous run and then stops streaming further events, is likely due to the context ( Here's the relevant part of the code that handles context cleanup: def _start(self, stepwise: bool = False, ctx: Optional[Context] = None) -> Context:
"""Sets up the queues and tasks for each declared step.
This method also launches each step as an async task.
"""
if ctx is None:
ctx = Context(self, stepwise=stepwise)
self._contexts.add(ctx)
else:
# clean up the context from the previous run
ctx._tasks = set()
ctx._queues = {}
ctx._step_flags = {}
ctx._retval = None Ensure that the context is fully reset before starting a new run. Additionally, make sure that the async def stream_events(self) -> AsyncGenerator[Event, None]:
"""Returns an async generator to consume any event that workflow steps decide to stream."""
await asyncio.sleep(0)
if len(self._contexts) > 1:
msg = (
"This workflow has multiple concurrent runs in progress and cannot stream events. "
"To be able to stream events, make sure you call `run()` on this workflow only once."
)
raise WorkflowRuntimeError(msg)
ctx = next(iter(self._contexts))
while True:
ev = await ctx.streaming_queue.get()
if type(ev) is StopEvent:
break
yield ev
self._contexts.remove(ctx) To avoid the issue, ensure that:
If the problem persists, consider adding more detailed logging to trace the state of the context and the events being processed. Additionally, you can refer to the @pytest.mark.asyncio()
async def test_multiple_sequential_streams():
wf = StreamingWorkflow()
r = wf.run()
# stream 1
async for _ in r.stream_events():
pass
await r
# stream 2 -- should not raise an error
r = wf.run()
async for _ in r.stream_events():
pass
await r
@pytest.mark.asyncio()
async def test_resume_streams():
class CounterWorkflow(Workflow):
@step
async def count(self, ctx: Context, ev: StartEvent) -> StopEvent:
ctx.write_event_to_stream(Event(msg="hello!"))
cur_count = await ctx.get("cur_count", default=0)
await ctx.set("cur_count", cur_count + 1)
return StopEvent(result="done")
wf = CounterWorkflow()
handler_1 = wf.run()
async for _ in handler_1.stream_events():
pass
await handler_1
handler_2 = wf.run(ctx=handler_1.ctx)
async for _ in handler_2.stream_events():
pass
await handler_2
assert await handler_2.ctx.get("cur_count") == 2 These tests can help you ensure that your workflow handles event streaming correctly across multiple runs [1][2][3]. |
Beta Was this translation helpful? Give feedback.
-
@dosu I've discussed this with a maintainer on Discord and it looks like there may be a bug, so I'll close this discussion. |
Beta Was this translation helpful? Give feedback.
Hey @rohanliston! It sounds like you've got it covered with the maintainer. I'm a bot that helps with bugs, technical questions, and engineering knowledge, so this one's a bit out of my scope. If you think I should have answered this request, feel free to reach out to the Dosu Team.
To continue talking to Dosu, mention @dosu.