Skip to content

Commit

Permalink
add: support for openai tool calls (functions only)
Browse files Browse the repository at this point in the history
add: two examples related to function calls
fix: Not saving a proper function call system message
chore: remove the internal response with assist/response messages. best to leave this in the agent code
chore: Update version to 0.3.0
  • Loading branch information
Lee Huffman committed Dec 26, 2023
1 parent 4f744d9 commit 6de742e
Show file tree
Hide file tree
Showing 11 changed files with 321 additions and 148 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,8 @@ from the agent.

- [ ] Add support for Azure OpenAI API
- [ ] Add support for OpenAI Assistant API
- [ ] Add Function call examples
- [x] Add support for new OpenAI Tool Calls vs now deprecated Function calls
- [x] Add Function call examples

## Stay Updated

Expand Down
39 changes: 39 additions & 0 deletions examples/completion_agent_function_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import json
import os
from typing import Literal, Dict

from nimbusagent.agent.completion import CompletionAgent


# Example dummy function hard coded to return the same weather
# In production, this could be your backend API or an external API
def get_current_weather(location: str, unit: Literal["celsius", "fahrenheit"] = "fahrenheit") -> Dict:
"""
Get the current weather in a given location
:param location: The city and state, e.g. San Francisco, CA
:param unit: The unit to return the temperature in, either celsius or fahrenheit
:return: The current weather in the given location
"""
if "tokyo" in location.lower():
content = json.dumps({"location": "Tokyo", "temperature": "10", "unit": unit})
elif "san francisco" in location.lower():
content = json.dumps({"location": "San Francisco", "temperature": "30", "unit": unit})
elif "paris" in location.lower():
content = json.dumps({"location": "Paris", "temperature": "22", "unit": unit})
else:
content = json.dumps({"location": location, "temperature": "unknown"})

return {"content": content}


agent = CompletionAgent(
openai_api_key=os.getenv('OPENAI_API_KEY'),
model_name="gpt-4-1106-preview",
system_message="You are a helpful assistant.",
functions=[get_current_weather],
use_tool_calls=True # If False, will disable tool calls and force the deprecated function calls
)

response = agent.ask("What's the weather like in San Francisco, Tokyo, and Paris?")
print(response)
44 changes: 44 additions & 0 deletions examples/streaming_agent_function_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import json
import os
import sys
from typing import Literal, Dict

from nimbusagent.agent.streaming import StreamingAgent


# Example dummy function hard coded to return the same weather
# In production, this could be your backend API or an external API
def get_current_weather(location: str, unit: Literal["celsius", "fahrenheit"] = "fahrenheit") -> Dict:
"""
Get the current weather in a given location
:param location: The city and state, e.g. San Francisco, CA
:param unit: The unit to return the temperature in, either celsius or fahrenheit
:return: The current weather in the given location
"""
if "tokyo" in location.lower():
content = json.dumps({"location": "Tokyo", "temperature": "10", "unit": unit})
elif "san francisco" in location.lower():
content = json.dumps({"location": "San Francisco", "temperature": "30", "unit": unit})
elif "paris" in location.lower():
content = json.dumps({"location": "Paris", "temperature": "22", "unit": unit})
else:
content = json.dumps({"location": location, "temperature": "unknown"})

return {"content": content}


agent = StreamingAgent(
openai_api_key=os.getenv('OPENAI_API_KEY'),
model_name="gpt-4-1106-preview",
system_message="You are a helpful assistant.",
functions=[get_current_weather],
use_tool_calls=True # If False, will disable tool calls and force the deprecated function calls
)

response = agent.ask("What's the weather like in San Francisco, Tokyo, and Paris?")
for chunk in response:
sys.stdout.write(chunk)

sys.stdout.write("\n\n")
sys.stdout.flush()
30 changes: 22 additions & 8 deletions nimbusagent/agent/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def __init__(
functions_always_use: Optional[List[str]] = None,
functions_pattern_groups: Optional[List[dict]] = None,
functions_k_closest: int = 3,
use_tool_calls: bool = True,

system_message: str = SYS_MSG,
message_history: Optional[List[Dict[str, str]]] = None,
Expand All @@ -46,7 +47,7 @@ def __init__(
memory_max_tokens: int = 2000,

internal_thoughts_max_entries: int = 8,
loops_max: int = 8,
loops_max: int = 10,

send_events: bool = False,
):
Expand All @@ -63,6 +64,9 @@ def __init__(
functions_pattern_groups: The list of function pattern groups to use (default: None)
functions_k_closest: The number of closest functions to use (default: 3)
functions_always_use: The list of functions to always use (default: None)
use_tool_calls: True if parallel functions should be allowed (default: True). Functions are being
deprecated though tool_calls are still a bit beta, so for now this can be set to
False to continue using function calls.
system_message: The message to send to the user when the agent starts
(default: "You are a helpful assistant.")
message_history: The message history to use (default: None)
Expand Down Expand Up @@ -108,6 +112,7 @@ def __init__(

self.function_handler = self._init_function_handler(functions, functions_embeddings, functions_k_closest,
functions_always_use, functions_pattern_groups)
self.use_tool_calls = use_tool_calls

def set_system_message(self, message: str) -> None:
"""Sets the system message.
Expand Down Expand Up @@ -161,13 +166,22 @@ def _create_chat_completion(
model_name = self.secondary_model_name if use_secondary_model else self.model_name

if use_functions and self.function_handler.functions and not force_no_functions:
res = self.client.chat.completions.create(
model=model_name,
temperature=self.temperature,
messages=messages,
functions=self.function_handler.functions,
function_call=function_call,
stream=stream)
if self.use_tool_calls:
res = self.client.chat.completions.create(
model=model_name,
temperature=self.temperature,
messages=messages,
tools=self.function_handler.functions_to_tools(),
tool_choice=function_call,
stream=stream)
else:
res = self.client.chat.completions.create(
model=model_name,
temperature=self.temperature,
messages=messages,
functions=self.function_handler.functions,
function_call=function_call,
stream=stream)
else:
res = self.client.chat.completions.create(
model=model_name,
Expand Down
55 changes: 47 additions & 8 deletions nimbusagent/agent/completion.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
class CompletionAgent(BaseAgent):
"""
Agent that can handle openai function calls and can generate responsee, without streaming.
This agent is meant to be used in a non-streaming context, where the user cannot see the response as it is generated.
This agent is meant to be used in a non-streaming context, where the user cannot see the
response as it is generated.
This means it will take longer to generate a response, as we must wait for openAI to generate and respond.
"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

Expand Down Expand Up @@ -56,24 +58,61 @@ def _generate_response(self) -> Optional[Union[openai.types.chat.ChatCompletion,
res = self._create_chat_completion(
[self.system_message] + self.chat_history.get_chat_history() + self.internal_thoughts
)
finish_reason = res.choices[0].finish_reason

finish_reason = res.choices[0].finish_reason
if finish_reason == 'stop' or len(self.internal_thoughts) > self.internal_thoughts_max_entries:
return res
elif finish_reason == 'tool_calls':
message = res.choices[0].message
self.internal_thoughts.append(message)
tool_calls = message.tool_calls
if tool_calls:
content_send_directly_to_user = []
for tool_call in tool_calls:
if tool_call.type == 'function':
func_name = tool_call.function.name
args_str = tool_call.function.arguments
func_results = self.function_handler.handle_function_call(func_name, args_str)

if func_results and func_results.content is not None:
self.internal_thoughts.append({
'tool_call_id': tool_call.id,
"role": "tool",
'name': func_name,
'content': func_results.content
})

if func_results.send_directly_to_user and func_results.content:
content_send_directly_to_user.append(func_results.content)

if content_send_directly_to_user:
return "\n".join(content_send_directly_to_user)

elif finish_reason == 'function_call':
func_name = res.choices[0].message.function_call.name
args_str = res.choices[0].message.function_call.arguments
func_results = self.function_handler.handle_function_call(func_name, args_str)

if func_results:
if func_results.assistant_thought:
self.internal_thoughts.append(func_results.assistant_thought)

if 'internal_thought' in func_results:
self.internal_thoughts.append(func_results['internal_thought'])

if func_results.send_directly_to_user and func_results.content:
return func_results.content

# add the function call to the internal thoughts so the AI can see it
self.internal_thoughts.append({
"role": "assistant",
'content': None,
'function_call': {
'name': func_name,
'arguments': args_str
}
})

self.internal_thoughts.append({
"role": "function",
'content': func_results.content,
'name': func_name
})

else:
raise ValueError(f"Unexpected finish reason: {finish_reason}")

Expand Down
104 changes: 98 additions & 6 deletions nimbusagent/agent/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def output_post_content(post_content: List[str]):
post_content_items = []
use_secondary_model = False
force_no_functions = False
tool_calls = []
while loops < self.loops_max:
loops += 1
has_content = False
Expand All @@ -86,13 +87,92 @@ def output_post_content(post_content: List[str]):
continue

delta = message.choices[0].delta
if delta.function_call:
if not delta:
break

if delta.tool_calls:
tool_call = delta.tool_calls[0]
index = tool_call.index
if index == len(tool_calls):
tool_calls.append({
"id": None,
"type": "function",
"function": {
"name": "",
"arguments": "",
}
})

if tool_call.id:
tool_calls[index]['id'] = tool_call.id
if tool_call.function:
if tool_call.function.name:
tool_calls[index]['function']['name'] = tool_call.function.name
if tool_call.function.arguments:
tool_calls[index]['function']['arguments'] += tool_call.function.arguments

elif delta.function_call:
if delta.function_call.name:
func_call["name"] = delta.function_call.name
if delta.function_call.arguments:
func_call["arguments"] += delta.function_call.arguments

if message.choices[0].finish_reason == "function_call":
finish_reason = message.choices[0].finish_reason

if finish_reason == "tool_calls":
self.internal_thoughts.append({
"role": "assistant",
'content': None,
'tool_calls': tool_calls
})

if self.send_events:
for tool_call in tool_calls:
json_data = json.dumps(tool_call)
yield f"[[[function:{tool_call['name']}:{json_data}]]]"

# Handle tool calls
logging.info("Handling tool calls: %s", tool_calls)
content_send_directly_to_user = []

for tool_call in tool_calls:
func_name = tool_call['function']["name"]
if func_name is None:
continue

func_args = tool_call['function']["arguments"]
func_results = self.function_handler.handle_function_call(func_name, func_args)
if func_results is not None:
if func_results.stream_data and self.send_events:
for key, value in func_results.stream_data.items():
json_value = json.dumps(value)
yield f"[[[data:{key}:{json_value}]]]"

if func_results.send_directly_to_user and func_results.content:
content_send_directly_to_user.append(func_results.content)
continue

if func_results.content:
self.internal_thoughts.append({
'tool_call_id': tool_call['id'],
"role": "tool",
'name': func_name,
'content': func_results.content
})

if func_results.use_secondary_model:
use_secondary_model = True
if func_results.force_no_functions:
force_no_functions = True

if content_send_directly_to_user:
yield "\n".join(content_send_directly_to_user)
yield output_post_content(post_content_items)
return

tool_calls = [] # reset tool calls

elif finish_reason == "function_call":
if self.send_events:
json_data = json.dumps(self.function_handler.get_args(func_call['arguments']))
yield f"[[[function:{func_call['name']}:{json_data}]]]"
Expand All @@ -112,10 +192,22 @@ def output_post_content(post_content: List[str]):
yield output_post_content(post_content_items)
return

if func_results.assistant_thought:
self.internal_thoughts.append(func_results.assistant_thought)
if func_results.internal_thought:
self.internal_thoughts.append(func_results.internal_thought)
# Add the function call to the internal thoughts so the AI knows it called it
self.internal_thoughts.append({
"role": "assistant",
'content': None,
'function_call': {
'name': func_call['name'],
'arguments': func_call['arguments']
}
})

self.internal_thoughts.append({
"role": "function",
'content': func_results.content,
'name': func_call['name']
})

if func_results.post_content:
post_content_items.append(func_results.post_content)
if func_results.use_secondary_model:
Expand Down
Loading

0 comments on commit 6de742e

Please sign in to comment.