Skip to content

Commit

Permalink
fix: update AnthropicProvider streaming implementation
Browse files Browse the repository at this point in the history
Co-Authored-By: Alex Reibman <[email protected]>
  • Loading branch information
devin-ai-integration[bot] and areibman committed Dec 20, 2024
1 parent 9dd3ae4 commit cc945a8
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 28 deletions.
40 changes: 26 additions & 14 deletions agentops/llms/providers/anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,14 @@ async def atext_stream(self):

async def __stream_text__(self):
"""Stream text content from the response."""
async with self.response as stream:
async for chunk in stream:
if hasattr(chunk, "type"):
try:
async for chunk in self.response:
if hasattr(chunk, "delta") and hasattr(chunk.delta, "text"):
text = chunk.delta.text
if text:
self._accumulate_event(text)
yield text
elif hasattr(chunk, "type"):
if chunk.type == "content_block_delta":
text = chunk.delta.text if hasattr(chunk.delta, "text") else ""
elif chunk.type == "message_delta":
Expand All @@ -223,12 +228,12 @@ async def __stream_text__(self):
self._final_message_snapshot = chunk.message
else:
text = ""
else:
text = chunk.text if hasattr(chunk, "text") else ""

if text: # Only accumulate non-empty text
self.completion += text
yield text
if text:
self._accumulate_event(text)
yield text
except Exception as e:
print(f"Error in stream: {e}")
raise

async def __aiter__(self):
"""Return self as an async iterator."""
Expand All @@ -250,9 +255,12 @@ def __init__(self, client=None, async_client=None):
super().__init__(client)
self._provider_name = "Anthropic"
# Initialize sync client
self.client = client or Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
if client is None:
self.client = Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
else:
self.client = client
# Ensure async client uses the same API key
api_key = self.client.api_key or os.getenv("ANTHROPIC_API_KEY")
api_key = getattr(self.client, 'api_key', None) or os.getenv("ANTHROPIC_API_KEY")
self.async_client = async_client if async_client is not None else AsyncAnthropic(api_key=api_key)
# Get session from either client, prioritizing the sync client
self.session = getattr(client, 'session', None) or getattr(async_client, 'session', None)
Expand All @@ -261,14 +269,18 @@ def __init__(self, client=None, async_client=None):
def create_stream(self, **kwargs):
"""Create a streaming context manager for Anthropic messages."""
init_timestamp = get_ISO_time()
# Ensure stream=True is set
kwargs['stream'] = True
# Use messages API
response = self.client.messages.create(**kwargs)
return StreamWrapper(response, self, kwargs, init_timestamp, self.session)

async def create_stream_async(self, **kwargs):
"""Create an async streaming context."""
"""Create an async streaming context manager for Anthropic messages."""
init_timestamp = get_ISO_time()
kwargs["stream"] = True # Ensure streaming is enabled
response = self.async_client.messages.create(**kwargs)
# Ensure stream=True is set
kwargs['stream'] = True
response = await self.async_client.messages.create(**kwargs)
return StreamWrapper(response, self, kwargs, init_timestamp, self.session)

def __call__(self, messages, model="claude-3-sonnet-20240229", stream=False, **kwargs):
Expand Down
12 changes: 5 additions & 7 deletions examples/anthropic_examples/anthropic-example-async.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,10 @@ async def generate_message(provider, personality, health):
async with provider.create_stream_async(
max_tokens=1024,
model="claude-3-sonnet-20240229",
messages=[{"role": "user", "content": prompt}],
stream=True
messages=[{"role": "user", "content": prompt}]
) as stream:
message = ""
async for text in stream.text_stream:
async for text in stream:
print(text, end="", flush=True)
message += text
print() # Add newline after message
Expand All @@ -94,9 +93,8 @@ async def main():
session = ao_client.start_session()

try:
# Initialize Anthropic client and provider
anthropic_client = anthropic.Client(api_key=os.getenv("ANTHROPIC_API_KEY"))
provider = AnthropicProvider(client=anthropic_client, session=session)
# Initialize Anthropic provider
provider = AnthropicProvider(session=session)

# Define Titan personality and health status
personality = "Ronin is a swift and aggressive melee specialist who thrives on close-quarters hit-and-run tactics. He talks like a Samurai might."
Expand All @@ -115,7 +113,7 @@ async def main():

except Exception as e:
print(f"Error in Titan Support Protocol: {e}")
session.end_session(end_state=EndState.ERROR)
session.end_session(end_state=EndState.FAIL)


if __name__ == "__main__":
Expand Down
12 changes: 5 additions & 7 deletions examples/anthropic_examples/anthropic-example-sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,8 @@ def generate_story():
session = ao_client.start_session()

try:
# Initialize Anthropic client and provider
anthropic_client = anthropic.Client(api_key=os.getenv("ANTHROPIC_API_KEY"))
provider = AnthropicProvider(client=anthropic_client, session=session)
# Initialize Anthropic provider
provider = AnthropicProvider(session=session)

# Generate a random prompt
prompt = f"A {random.choice(first)} {random.choice(second)} {random.choice(third)}."
Expand All @@ -89,18 +88,17 @@ def generate_story():
"content": "Create a story based on the following prompt. Make it dark and atmospheric, similar to NieR:Automata's style.",
},
{"role": "assistant", "content": prompt},
],
stream=True
]
) as stream:
for text in stream.text_stream:
for text in stream:
print(text, end="", flush=True)
print("\nStory generation complete!")

# End session with success status
session.end_session(end_state=EndState.SUCCESS)
except Exception as e:
print(f"Error generating story: {e}")
session.end_session(end_state=EndState.ERROR)
session.end_session(end_state=EndState.FAIL)

if __name__ == "__main__":
generate_story()
Expand Down

0 comments on commit cc945a8

Please sign in to comment.