Skip to content

Commit

Permalink
fix: improve StreamWrapper async iteration and remove duplicate __ait…
Browse files Browse the repository at this point in the history
…er__

Co-Authored-By: Alex Reibman <[email protected]>
  • Loading branch information
devin-ai-integration[bot] and areibman committed Dec 19, 2024
1 parent d579339 commit 6a177cf
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 30 deletions.
70 changes: 45 additions & 25 deletions agentops/llms/providers/anthropic.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
from typing import Any, AsyncIterator, Dict, Iterator, Optional, Union

from anthropic import Anthropic
from anthropic import Anthropic, AsyncAnthropic

from agentops.event import LLMEvent
from agentops.helpers import get_ISO_time, check_call_stack_for_agent_id
Expand Down Expand Up @@ -80,57 +80,77 @@ def __iter__(self):

@property
def text_stream(self):
"""Get the text stream from the response."""
if self._text_stream is None and hasattr(self.response, "text_stream"):
self._text_stream = self.response.text_stream
return self._text_stream
"""Get the text stream from the response.
async def __aiter__(self):
"""Async iterate over the stream chunks."""
Returns an async iterator for async usage and a sync iterator for sync usage.
"""
if hasattr(self.response, "text_stream"):
return self.response.text_stream
return self.__stream_text__() if asyncio.iscoroutine(self.response) else self

async def __stream_text__(self):
"""Stream text content from the response."""
if asyncio.iscoroutine(self.response):
self.response = await self.response

# Iterate over stream events and yield text from text events
async for event in self.response:
if hasattr(event, "type"):
if event.type == "text":
text = event.text
self.llm_event.completion["content"] += text
yield text
elif event.type == "content_block_stop":
# Handle content block completion if needed
continue
elif hasattr(event, "delta") and hasattr(event.delta, "text"):
# Fallback for older streaming format
text = event.delta.text
# Handle Stream object from Anthropic SDK
if hasattr(self.response, "__aiter__"):
async for chunk in self.response:
if hasattr(chunk, "type"):
if chunk.type == "content_block_delta" and hasattr(chunk, "delta"):
if chunk.delta.type == "text_delta":
text = chunk.delta.text
self.llm_event.completion["content"] += text
yield text
elif chunk.type == "text":
text = chunk.text
self.llm_event.completion["content"] += text
yield text
elif hasattr(self.response, "text_stream"):
async for text in self.response.text_stream:
self.llm_event.completion["content"] += text
yield text

async def __aiter__(self):
"""Async iterate over the stream chunks."""
async for text in self.__stream_text__():
yield text


@singleton
class AnthropicProvider(InstrumentedProvider):
"""Anthropic provider for AgentOps."""
original_create = None
original_create_async = None

def __init__(self, client):
def __init__(self, client=None):
"""Initialize the Anthropic provider."""
super().__init__(client)
self._provider_name = "Anthropic"
self.session = None
self.client = client or Anthropic()
self.async_client = AsyncAnthropic(api_key=self.client.api_key)

def create_stream(self, **kwargs):
"""Create a streaming context manager for Anthropic messages."""
init_timestamp = get_ISO_time()
response = self.client.messages.create(**kwargs)
return StreamWrapper(response, self, kwargs, init_timestamp, self.session)

async def create_stream(self, **kwargs):
"""Create a streaming context manager for Anthropic messages"""
async def create_stream_async(self, **kwargs):
"""Create an async streaming context manager for Anthropic messages."""
init_timestamp = get_ISO_time()
response = await self.client.messages.create(**kwargs)
response = await self.async_client.messages.create(**kwargs)
return StreamWrapper(response, self, kwargs, init_timestamp, self.session)

async def __call__(self, messages, model="claude-3-sonnet-20240229", stream=False, **kwargs):
"""Call the Anthropic provider with messages."""
kwargs["messages"] = messages
kwargs["model"] = model
kwargs["stream"] = stream
return await self.create_stream(**kwargs)
if stream:
return await self.create_stream_async(**kwargs)
return self.client.messages.create(**kwargs)

def handle_response(self, response, kwargs, init_timestamp, session: Optional[Session] = None) -> dict:
"""Handle the response from Anthropic."""
Expand Down
8 changes: 3 additions & 5 deletions examples/anthropic_examples/anthropic-example-async.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,9 @@ async def generate_message():
],
stream=True,
) as response:
async for event in response:
if event.type == "text":
text = event.text
message += text
print(text, end="", flush=True)
async for text in response.text_stream:
message += text
print(text, end="", flush=True)
return message


Expand Down

0 comments on commit 6a177cf

Please sign in to comment.