Skip to content

Commit

Permalink
together model
Browse files Browse the repository at this point in the history
  • Loading branch information
manthanguptaa committed Oct 4, 2024
1 parent abc7711 commit ba29d2a
Showing 1 changed file with 74 additions and 59 deletions.
133 changes: 74 additions & 59 deletions phi/model/together/together.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,22 @@
from phi.utils.log import logger
from phi.utils.tools import get_function_call_for_tool_call

try:
from openai.types.completion_usage import CompletionUsage
from openai.types.chat.chat_completion_chunk import (
ChatCompletionChunk,
ChoiceDelta,
ChoiceDeltaToolCall,
)
from openai.types.chat.chat_completion_message import ChatCompletionMessage
except ImportError:
logger.error("`openai` not installed")
raise


class Together(OpenAILike):
"""
Together model class.
Args:
id (str): The id of the Anyscale model to use. Default is "mistralai/Mixtral-8x7B-Instruct-v0.1".
name (str): The name of this chat model instance. Default is "Anyscale"
Expand Down Expand Up @@ -45,54 +56,52 @@ def response_stream(self, messages: List[Message]) -> Iterator[ModelResponse]:
return

logger.debug("---------- Together Response Start ----------")
logger.debug("stream response")
# -*- Log messages for debugging
self._log_messages(messages)

stream_data: StreamData = StreamData()
stream_data.response_timer.start()

assistant_message_content = ""
response_is_tool_call = False

stream_data.response_content = ""
stream_data.response_tool_calls = []

stream_data.completion_tokens = 0
stream_data.response_timer.start()
for response in self.invoke_stream(messages=messages):
# logger.debug(f"Together response type: {type(response)}")
logger.debug(f"Together response: {response}")
stream_data.completion_tokens += 1
try:
stream_data.response_token = response.token # type: ignore
logger.debug(f"Together response: {stream_data.response_token}")
logger.debug(f"Together response type: {type(stream_data.response_token)}")
response_content = stream_data.response_token.get("text")
stream_data.response_tool_call = stream_data.response_token.get("tool_call")
if stream_data.response_tool_call:
response_is_tool_call = True
logger.debug(f"Together response content: {response_content }")
logger.debug(f"Together response_is_tool_call: {stream_data.response_tool_call}")
except Exception:
response_content = response.choices[0].delta.content

# -*- Add response content to agent message
if response_content is not None:
stream_data.response_content += response_content
# -*- Yield content if not a tool call
if not response_is_tool_call:
yield response_content
if len(response.choices) > 0:
response_delta: ChoiceDelta = response.choices[0].delta
response_content: Optional[str] = response_delta.content
response_tool_calls: Optional[List[ChoiceDeltaToolCall]] = response_delta.tool_calls

if response_content is not None:
stream_data.response_content += response_content
stream_data.completion_tokens += 1
if stream_data.completion_tokens == 1:
stream_data.time_to_first_token = stream_data.response_timer.elapsed
logger.debug(f"Time to first token: {stream_data.time_to_first_token:.4f}s")
yield ModelResponse(content=response_content)

if response_tool_calls is not None:
if stream_data.response_tool_calls is None:
stream_data.response_tool_calls = []
stream_data.response_tool_calls.extend(response_tool_calls)

if response.usage:
response_usage: Optional[CompletionUsage] = response.usage
if response_usage:
stream_data.response_prompt_tokens = response_usage.prompt_tokens
stream_data.response_completion_tokens = response_usage.completion_tokens
stream_data.response_total_tokens = response_usage.total_tokens

stream_data.response_timer.stop()
logger.debug(f"Time to generate response: {stream_data.response_timer.elapsed:.4f}s")

# -*- Create agent message
agent_message = Message(
role="agent",
content=stream_data.response_content,
# -*- Create assistant message
assistant_message = Message(
role="assistant",
content=assistant_message_content,
)
# -*- Check if the response is a tool call
try:
if response_is_tool_call and stream_data.response_content != "":
_tool_call_content = stream_data.response_content.strip()
if response_is_tool_call and assistant_message_content != "":
_tool_call_content = assistant_message_content.strip()
_tool_call_list = json.loads(_tool_call_content)
if isinstance(_tool_call_list, list):
# Build tool calls
Expand All @@ -110,63 +119,69 @@ def response_stream(self, messages: List[Message]) -> Iterator[ModelResponse]:
"function": _function_def,
}
)
agent_message.tool_calls = _tool_calls
assistant_message.tool_calls = _tool_calls
except Exception:
logger.warning(f"Could not parse tool calls from response: {stream_data.response_content}")
logger.warning(f"Could not parse tool calls from response: {assistant_message_content}")
pass

# -*- Update usage metrics
# Add response time to metrics
agent_message.metrics["time"] = stream_data.response_timer.elapsed
assistant_message.metrics["time"] = stream_data.response_timer.elapsed
if "response_times" not in self.metrics:
self.metrics["response_times"] = []
self.metrics["response_times"].append(stream_data.response_timer.elapsed)

# Add token usage to metrics
logger.debug(f"Estimated completion tokens: {stream_data.completion_tokens}")
agent_message.metrics["completion_tokens"] = stream_data.completion_tokens
assistant_message.metrics["completion_tokens"] = stream_data.completion_tokens
if "completion_tokens" not in self.metrics:
self.metrics["completion_tokens"] = stream_data.completion_tokens
else:
self.metrics["completion_tokens"] += stream_data.completion_tokens

# -*- Add agent message to messages
self._update_stream_metrics(stream_data=stream_data, assistant_message=agent_message)
messages.append(agent_message)
agent_message.log()
logger.debug(f"Agent Message: {agent_message}")
# -*- Add assistant message to messages
messages.append(assistant_message)
assistant_message.log()

# -*- Parse and run tool calls
if agent_message.tool_calls is not None:
if assistant_message.tool_calls is not None and len(assistant_message.tool_calls) > 0 and self.run_tools:
tool_role: str = "tool"
function_calls_to_run: List[FunctionCall] = []
for tool_call in agent_message.tool_calls:
function_call_results: List[Message] = []
for tool_call in assistant_message.tool_calls:
_tool_call_id = tool_call.get("id")
_function_call = get_function_call_for_tool_call(tool_call, self.functions)
if _function_call is None:
messages.append(
Message(role="tool", tool_call_id=_tool_call_id, content="Could not find function to call.")
Message(
role=tool_role,
tool_call_id=_tool_call_id,
content="Could not find function to call.",
)
)
continue
if _function_call.error is not None:
messages.append(
Message(
role="tool", tool_call_id=_tool_call_id, tool_call_error=True, content=_function_call.error
role=tool_role,
tool_call_id=_tool_call_id,
content=_function_call.error,
)
)
continue
function_calls_to_run.append(_function_call)

if self.show_tool_calls:
if len(function_calls_to_run) == 1:
yield f"\n - Running: {function_calls_to_run[0].get_call_str()}\n\n"
elif len(function_calls_to_run) > 1:
yield "\nRunning:"
for _f in function_calls_to_run:
yield f"\n - {_f.get_call_str()}"
yield "\n\n"

function_call_results = self.run_function_calls(function_calls_to_run)
# Add results of the function calls to the messages
yield ModelResponse(content="\nRunning:")
for _f in function_calls_to_run:
yield ModelResponse(content=f"\n - {_f.get_call_str()}")
yield ModelResponse(content="\n\n")

for intermediate_model_response in self.run_function_calls(
function_calls=function_calls_to_run, function_call_results=function_call_results, tool_role=tool_role
):
yield intermediate_model_response

if len(function_call_results) > 0:
messages.extend(function_call_results)
# -*- Yield new response using results of tool calls
Expand Down

0 comments on commit ba29d2a

Please sign in to comment.