diff --git a/README.md b/README.md index e5c958b..ccca964 100644 --- a/README.md +++ b/README.md @@ -217,7 +217,8 @@ from the agent. - [ ] Add support for Azure OpenAI API - [ ] Add support for OpenAI Assistant API -- [ ] Add Function call examples +- [x] Add support for new OpenAI Tool Calls vs now deprecated Function calls +- [x] Add Function call examples ## Stay Updated diff --git a/examples/completion_agent_function_example.py b/examples/completion_agent_function_example.py new file mode 100644 index 0000000..641cd25 --- /dev/null +++ b/examples/completion_agent_function_example.py @@ -0,0 +1,39 @@ +import json +import os +from typing import Literal, Dict + +from nimbusagent.agent.completion import CompletionAgent + + +# Example dummy function hard coded to return the same weather +# In production, this could be your backend API or an external API +def get_current_weather(location: str, unit: Literal["celsius", "fahrenheit"] = "fahrenheit") -> Dict: + """ + Get the current weather in a given location + + :param location: The city and state, e.g. San Francisco, CA + :param unit: The unit to return the temperature in, either celsius or fahrenheit + :return: The current weather in the given location + """ + if "tokyo" in location.lower(): + content = json.dumps({"location": "Tokyo", "temperature": "10", "unit": unit}) + elif "san francisco" in location.lower(): + content = json.dumps({"location": "San Francisco", "temperature": "30", "unit": unit}) + elif "paris" in location.lower(): + content = json.dumps({"location": "Paris", "temperature": "22", "unit": unit}) + else: + content = json.dumps({"location": location, "temperature": "unknown"}) + + return {"content": content} + + +agent = CompletionAgent( + openai_api_key=os.getenv('OPENAI_API_KEY'), + model_name="gpt-4-1106-preview", + system_message="You are a helpful assistant.", + functions=[get_current_weather], + use_tool_calls=True # If False, will disable tool calls and force the deprecated function calls +) + +response = agent.ask("What's the weather like in San Francisco, Tokyo, and Paris?") +print(response) diff --git a/examples/streaming_agent_function_example.py b/examples/streaming_agent_function_example.py new file mode 100644 index 0000000..b1bd3a8 --- /dev/null +++ b/examples/streaming_agent_function_example.py @@ -0,0 +1,44 @@ +import json +import os +import sys +from typing import Literal, Dict + +from nimbusagent.agent.streaming import StreamingAgent + + +# Example dummy function hard coded to return the same weather +# In production, this could be your backend API or an external API +def get_current_weather(location: str, unit: Literal["celsius", "fahrenheit"] = "fahrenheit") -> Dict: + """ + Get the current weather in a given location + + :param location: The city and state, e.g. San Francisco, CA + :param unit: The unit to return the temperature in, either celsius or fahrenheit + :return: The current weather in the given location + """ + if "tokyo" in location.lower(): + content = json.dumps({"location": "Tokyo", "temperature": "10", "unit": unit}) + elif "san francisco" in location.lower(): + content = json.dumps({"location": "San Francisco", "temperature": "30", "unit": unit}) + elif "paris" in location.lower(): + content = json.dumps({"location": "Paris", "temperature": "22", "unit": unit}) + else: + content = json.dumps({"location": location, "temperature": "unknown"}) + + return {"content": content} + + +agent = StreamingAgent( + openai_api_key=os.getenv('OPENAI_API_KEY'), + model_name="gpt-4-1106-preview", + system_message="You are a helpful assistant.", + functions=[get_current_weather], + use_tool_calls=True # If False, will disable tool calls and force the deprecated function calls +) + +response = agent.ask("What's the weather like in San Francisco, Tokyo, and Paris?") +for chunk in response: + sys.stdout.write(chunk) + +sys.stdout.write("\n\n") +sys.stdout.flush() diff --git a/nimbusagent/agent/base.py b/nimbusagent/agent/base.py index 867598c..3ea89ac 100644 --- a/nimbusagent/agent/base.py +++ b/nimbusagent/agent/base.py @@ -32,6 +32,7 @@ def __init__( functions_always_use: Optional[List[str]] = None, functions_pattern_groups: Optional[List[dict]] = None, functions_k_closest: int = 3, + use_tool_calls: bool = True, system_message: str = SYS_MSG, message_history: Optional[List[Dict[str, str]]] = None, @@ -46,7 +47,7 @@ def __init__( memory_max_tokens: int = 2000, internal_thoughts_max_entries: int = 8, - loops_max: int = 8, + loops_max: int = 10, send_events: bool = False, ): @@ -63,6 +64,9 @@ def __init__( functions_pattern_groups: The list of function pattern groups to use (default: None) functions_k_closest: The number of closest functions to use (default: 3) functions_always_use: The list of functions to always use (default: None) + use_tool_calls: True if parallel functions should be allowed (default: True). Functions are being + deprecated though tool_calls are still a bit beta, so for now this can be set to + False to continue using function calls. system_message: The message to send to the user when the agent starts (default: "You are a helpful assistant.") message_history: The message history to use (default: None) @@ -108,6 +112,7 @@ def __init__( self.function_handler = self._init_function_handler(functions, functions_embeddings, functions_k_closest, functions_always_use, functions_pattern_groups) + self.use_tool_calls = use_tool_calls def set_system_message(self, message: str) -> None: """Sets the system message. @@ -161,13 +166,22 @@ def _create_chat_completion( model_name = self.secondary_model_name if use_secondary_model else self.model_name if use_functions and self.function_handler.functions and not force_no_functions: - res = self.client.chat.completions.create( - model=model_name, - temperature=self.temperature, - messages=messages, - functions=self.function_handler.functions, - function_call=function_call, - stream=stream) + if self.use_tool_calls: + res = self.client.chat.completions.create( + model=model_name, + temperature=self.temperature, + messages=messages, + tools=self.function_handler.functions_to_tools(), + tool_choice=function_call, + stream=stream) + else: + res = self.client.chat.completions.create( + model=model_name, + temperature=self.temperature, + messages=messages, + functions=self.function_handler.functions, + function_call=function_call, + stream=stream) else: res = self.client.chat.completions.create( model=model_name, diff --git a/nimbusagent/agent/completion.py b/nimbusagent/agent/completion.py index a35dc01..1a21b74 100644 --- a/nimbusagent/agent/completion.py +++ b/nimbusagent/agent/completion.py @@ -8,9 +8,11 @@ class CompletionAgent(BaseAgent): """ Agent that can handle openai function calls and can generate responsee, without streaming. - This agent is meant to be used in a non-streaming context, where the user cannot see the response as it is generated. + This agent is meant to be used in a non-streaming context, where the user cannot see the + response as it is generated. This means it will take longer to generate a response, as we must wait for openAI to generate and respond. """ + def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -56,24 +58,61 @@ def _generate_response(self) -> Optional[Union[openai.types.chat.ChatCompletion, res = self._create_chat_completion( [self.system_message] + self.chat_history.get_chat_history() + self.internal_thoughts ) - finish_reason = res.choices[0].finish_reason + finish_reason = res.choices[0].finish_reason if finish_reason == 'stop' or len(self.internal_thoughts) > self.internal_thoughts_max_entries: return res + elif finish_reason == 'tool_calls': + message = res.choices[0].message + self.internal_thoughts.append(message) + tool_calls = message.tool_calls + if tool_calls: + content_send_directly_to_user = [] + for tool_call in tool_calls: + if tool_call.type == 'function': + func_name = tool_call.function.name + args_str = tool_call.function.arguments + func_results = self.function_handler.handle_function_call(func_name, args_str) + + if func_results and func_results.content is not None: + self.internal_thoughts.append({ + 'tool_call_id': tool_call.id, + "role": "tool", + 'name': func_name, + 'content': func_results.content + }) + + if func_results.send_directly_to_user and func_results.content: + content_send_directly_to_user.append(func_results.content) + + if content_send_directly_to_user: + return "\n".join(content_send_directly_to_user) + elif finish_reason == 'function_call': func_name = res.choices[0].message.function_call.name args_str = res.choices[0].message.function_call.arguments func_results = self.function_handler.handle_function_call(func_name, args_str) if func_results: - if func_results.assistant_thought: - self.internal_thoughts.append(func_results.assistant_thought) - - if 'internal_thought' in func_results: - self.internal_thoughts.append(func_results['internal_thought']) - if func_results.send_directly_to_user and func_results.content: return func_results.content + + # add the function call to the internal thoughts so the AI can see it + self.internal_thoughts.append({ + "role": "assistant", + 'content': None, + 'function_call': { + 'name': func_name, + 'arguments': args_str + } + }) + + self.internal_thoughts.append({ + "role": "function", + 'content': func_results.content, + 'name': func_name + }) + else: raise ValueError(f"Unexpected finish reason: {finish_reason}") diff --git a/nimbusagent/agent/streaming.py b/nimbusagent/agent/streaming.py index cde8ffb..bfb2acb 100644 --- a/nimbusagent/agent/streaming.py +++ b/nimbusagent/agent/streaming.py @@ -60,6 +60,7 @@ def output_post_content(post_content: List[str]): post_content_items = [] use_secondary_model = False force_no_functions = False + tool_calls = [] while loops < self.loops_max: loops += 1 has_content = False @@ -86,13 +87,92 @@ def output_post_content(post_content: List[str]): continue delta = message.choices[0].delta - if delta.function_call: + if not delta: + break + + if delta.tool_calls: + tool_call = delta.tool_calls[0] + index = tool_call.index + if index == len(tool_calls): + tool_calls.append({ + "id": None, + "type": "function", + "function": { + "name": "", + "arguments": "", + } + }) + + if tool_call.id: + tool_calls[index]['id'] = tool_call.id + if tool_call.function: + if tool_call.function.name: + tool_calls[index]['function']['name'] = tool_call.function.name + if tool_call.function.arguments: + tool_calls[index]['function']['arguments'] += tool_call.function.arguments + + elif delta.function_call: if delta.function_call.name: func_call["name"] = delta.function_call.name if delta.function_call.arguments: func_call["arguments"] += delta.function_call.arguments - if message.choices[0].finish_reason == "function_call": + finish_reason = message.choices[0].finish_reason + + if finish_reason == "tool_calls": + self.internal_thoughts.append({ + "role": "assistant", + 'content': None, + 'tool_calls': tool_calls + }) + + if self.send_events: + for tool_call in tool_calls: + json_data = json.dumps(tool_call) + yield f"[[[function:{tool_call['name']}:{json_data}]]]" + + # Handle tool calls + logging.info("Handling tool calls: %s", tool_calls) + content_send_directly_to_user = [] + + for tool_call in tool_calls: + func_name = tool_call['function']["name"] + if func_name is None: + continue + + func_args = tool_call['function']["arguments"] + func_results = self.function_handler.handle_function_call(func_name, func_args) + if func_results is not None: + if func_results.stream_data and self.send_events: + for key, value in func_results.stream_data.items(): + json_value = json.dumps(value) + yield f"[[[data:{key}:{json_value}]]]" + + if func_results.send_directly_to_user and func_results.content: + content_send_directly_to_user.append(func_results.content) + continue + + if func_results.content: + self.internal_thoughts.append({ + 'tool_call_id': tool_call['id'], + "role": "tool", + 'name': func_name, + 'content': func_results.content + }) + + if func_results.use_secondary_model: + use_secondary_model = True + if func_results.force_no_functions: + force_no_functions = True + + if content_send_directly_to_user: + yield "\n".join(content_send_directly_to_user) + yield output_post_content(post_content_items) + return + + tool_calls = [] # reset tool calls + + elif finish_reason == "function_call": if self.send_events: json_data = json.dumps(self.function_handler.get_args(func_call['arguments'])) yield f"[[[function:{func_call['name']}:{json_data}]]]" @@ -112,10 +192,22 @@ def output_post_content(post_content: List[str]): yield output_post_content(post_content_items) return - if func_results.assistant_thought: - self.internal_thoughts.append(func_results.assistant_thought) - if func_results.internal_thought: - self.internal_thoughts.append(func_results.internal_thought) + # Add the function call to the internal thoughts so the AI knows it called it + self.internal_thoughts.append({ + "role": "assistant", + 'content': None, + 'function_call': { + 'name': func_call['name'], + 'arguments': func_call['arguments'] + } + }) + + self.internal_thoughts.append({ + "role": "function", + 'content': func_results.content, + 'name': func_call['name'] + }) + if func_results.post_content: post_content_items.append(func_results.post_content) if func_results.use_secondary_model: diff --git a/nimbusagent/functions/handler.py b/nimbusagent/functions/handler.py index 1c2f2e7..c0d035e 100644 --- a/nimbusagent/functions/handler.py +++ b/nimbusagent/functions/handler.py @@ -7,9 +7,10 @@ from typing import Optional, List, Dict, Any, Union, Callable, Type, Tuple import tiktoken +from openai.types.chat import ChatCompletionToolParam from nimbusagent.functions import parser -from nimbusagent.functions.responses import FuncResponse, InternalFuncResponse, DictFuncResponse +from nimbusagent.functions.responses import FuncResponse, DictFuncResponse from nimbusagent.memory.base import AgentMemory from nimbusagent.utils.helper import find_similar_embedding_list, combine_lists_unique @@ -76,6 +77,20 @@ def __init__(self, functions: list = None, self.calling_function_start_callback = calling_function_start_callback self.calling_function_stop_callback = calling_function_stop_callback + def functions_to_tools(self) -> List[ChatCompletionToolParam]: + """ + Convert the functions defs to the new OpenAI tools format. + :return: The tools. + """ + tools = [] + for func in self.functions: + tools.append({ + "type": "function", + "function": func + }) + + return tools + def _get_function_info(self, func_name: str) -> Optional[FunctionInfo]: """ Get the FunctionInfo for the given function name. @@ -171,39 +186,38 @@ def get_functions_from_query_and_history(self, query: str, history: List[Dict[st if not self.orig_functions: return None - # Step 1: Initialize with 'always_use' functions - actual_function_names = self.always_use if self.always_use else [] - # print("actual_function_names: ", actual_function_names) + if not self.pattern_groups and not self.embeddings: + actual_function_names = self.orig_functions.keys() - # step 2: Add functions based on pattern groups on query - query_group_functions = self._get_group_function(query) - if query_group_functions: - actual_function_names = combine_lists_unique(actual_function_names, query_group_functions) - # print("query_group_functions: ", query_group_functions) + else: + # Step 1: Initialize with 'always_use' functions + actual_function_names = self.always_use if self.always_use else [] # print("actual_function_names: ", actual_function_names) - # step 3: Add functions based on embeddings - recent_history_and_query = [message['content'] for message in history[-2:]] + [query] - recent_history_and_query_str = " ".join(recent_history_and_query) - - if self.embeddings: - similar_functions = find_similar_embedding_list(recent_history_and_query_str, - function_embeddings=self.embeddings, - k_nearest_neighbors=self.k_nearest) - similar_function_names = [d['name'] for d in similar_functions] - if similar_function_names: - actual_function_names = combine_lists_unique(actual_function_names, similar_function_names) - # print("similar_function_names: ", similar_function_names) - # print("actual_function_names: ", actual_function_names) - - # step 4: Add functions based on pattern groups on history - query_group_functions = self._get_group_function(recent_history_and_query_str) - if query_group_functions: - actual_function_names = combine_lists_unique(actual_function_names, query_group_functions) - # print("history_group_functions: ", query_group_functions) - - logging.info(f"Functions to use: {actual_function_names}") - # step 5: step through functions and get the function info, adding up to max_tokens + # step 2: Add functions based on pattern groups on query + query_group_functions = self._get_group_function(query) + if query_group_functions: + actual_function_names = combine_lists_unique(actual_function_names, query_group_functions) + + # step 3: Add functions based on embeddings + recent_history_and_query = [message['content'] for message in history[-2:]] + [query] + recent_history_and_query_str = " ".join(recent_history_and_query) + + if self.embeddings: + similar_functions = find_similar_embedding_list(recent_history_and_query_str, + function_embeddings=self.embeddings, + k_nearest_neighbors=self.k_nearest) + similar_function_names = [d['name'] for d in similar_functions] + if similar_function_names: + actual_function_names = combine_lists_unique(actual_function_names, similar_function_names) + + # step 4: Add functions based on pattern groups on history + query_group_functions = self._get_group_function(recent_history_and_query_str) + if query_group_functions: + actual_function_names = combine_lists_unique(actual_function_names, query_group_functions) + + logging.info(f"Functions to use: {actual_function_names}") + # step 5: step through functions and get the function info, adding up to max_tokens processed_functions = [] token_count = 0 @@ -282,7 +296,7 @@ def create_individual_func_mapping(item: Union[Callable, Type]) -> Tuple[str, Ca else: raise ValueError(f"Unsupported item {item}") - def handle_function_call(self, func_name: str, args_str: str) -> Optional[InternalFuncResponse]: + def handle_function_call(self, func_name: str, args_str: str) -> Optional[FuncResponse]: """ Handle a function call. This method will call the function and return the result. If the result is a FuncResponse, it will be returned as is. If the result is a dictionary, it will be converted to a @@ -304,7 +318,10 @@ def handle_function_call(self, func_name: str, args_str: str) -> Optional[Intern else: response_obj = DictFuncResponse(result) - return response_obj.to_internal_response(func_name, args_str) + response_obj.name = func_name + response_obj.arguments = args_str + + return response_obj @staticmethod def _execute_method(item: Any, method_name: str, args: Dict[str, Any]) -> Any: diff --git a/nimbusagent/functions/responses.py b/nimbusagent/functions/responses.py index dec68d5..8d6eb58 100644 --- a/nimbusagent/functions/responses.py +++ b/nimbusagent/functions/responses.py @@ -1,4 +1,4 @@ -from typing import Optional +from typing import Optional, Dict from pydantic import BaseModel @@ -18,6 +18,8 @@ class FuncResponse(BaseModel): :param use_secondary_model: Whether to use the secondary model. :param force_no_functions: Whether to force no functions. """ + name: str = None + arguments: str = None content: str = None summarize_only: bool = False send_directly_to_user: bool = False @@ -26,79 +28,23 @@ class FuncResponse(BaseModel): use_secondary_model: bool = False force_no_functions: bool = False - def to_internal_response(self, func_name: str, args_str: str = None): - """ - Convert this response to an internal response. - :param func_name: The name of the function. - :param args_str: The arguments of the function. - :return: The internal response. - """ - if not self.content: - return None - asst_thought_content = f"#{func_name}" - if args_str: - args_str = args_str.replace("\n", " ") - asst_thought_content += f"({args_str})" - else: - asst_thought_content += "()" - internal_asst_thought = {'role': 'assistant', 'content': asst_thought_content} - internal_msg = {'role': 'function', 'name': func_name, 'content': self.content} - return InternalFuncResponse(content=self.content, send_directly_to_user=self.send_directly_to_user, - internal_thought=internal_msg, - assistant_thought=internal_asst_thought, - post_content=self.post_content, - stream_data=self.stream_data, - use_secondary_model=self.use_secondary_model, - force_no_functions=self.force_no_functions) - - -class InternalFuncResponse(FuncResponse): - """ - An internal response from a function. This is the response that is returned from a function call. It contains the - content of the response, whether to send the response directly to the user, the content to post to the chat history, - the data to stream to the user, whether to use the secondary model, and whether to force no functions. - :param content: The content of the response. - :param send_directly_to_user: Whether to send the response directly to the user. - :param post_content: The content to post to the chat history. - :param stream_data: The data to stream to the user. - :param use_secondary_model: Whether to use the secondary model. - :param force_no_functions: Whether to force no functions. - """ - internal_thought: dict = None - assistant_thought: dict = None - - -class DictFuncResponse: +class DictFuncResponse(FuncResponse): """ A response from a function. This is the response that is returned from a function call. It contains the content of the response, whether to only summarize the content, whether to send the response directly to the user, the content to post to the chat history, the data to stream to the user, whether to use the secondary model, and whether to force no functions. """ - def __init__(self, data: dict): - self.data = data - - def to_internal_response(self, func_name: str): - """ - Convert this response to an internal response. - :param func_name: The name of the function. - :return: The internal response. - """ - content, send_directly_to_user, post_content, stream_data, use_secondary_model, force_no_functions = ( - self.data.get('content', ''), - self.data.get('send_directly_to_user', False), - self.data.get('post_content', None), - self.data.get('data', None), - self.data.get('use_secondary_model', False), - self.data.get('force_no_functions', False) - ) - if not content: - return None - res_msg = {'role': 'function', 'name': func_name, 'content': content} - return InternalFuncResponse(content=content, send_directly_to_user=send_directly_to_user, - internal_thought=res_msg, - post_content=post_content, - stream_data=stream_data, - use_secondary_model=use_secondary_model, - force_no_functions=force_no_functions) + data: Dict = None + + def __init__(self, init_data: Dict): + super().__init__() + self.data = init_data + self.content = init_data.get('content', '') + self.summarize_only = init_data.get('summarize_only', False) + self.send_directly_to_user = init_data.get('send_directly_to_user', False) + self.post_content = init_data.get('post_content', None) + self.stream_data = init_data.get('stream_data', None) + self.use_secondary_model = init_data.get('use_secondary_model', False) + self.force_no_functions = init_data.get('force_no_functions', False) diff --git a/pyproject.toml b/pyproject.toml index cd87952..db8a42a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "nimbusagent" -version = "0.2.0" +version = "0.3.0" description = "An OpenAI agent with basic memory, functions, and moderation support" readme = "README.md" license = { file = "LICENSE.txt" } diff --git a/tests/test_nimbusagent_functions_handler.py b/tests/test_nimbusagent_functions_handler.py index 76eabd2..dd03c13 100644 --- a/tests/test_nimbusagent_functions_handler.py +++ b/tests/test_nimbusagent_functions_handler.py @@ -13,7 +13,6 @@ def setUp(self): # Initialize FunctionHandler or other required objects here self.handler = FunctionHandler() - def test_initialization(self): # Test if FunctionHandler initializes properly self.assertIsNotNone(self.handler) diff --git a/tests/test_nimbusagent_functions_response.py b/tests/test_nimbusagent_functions_response.py index 3b2de2e..c02bba1 100644 --- a/tests/test_nimbusagent_functions_response.py +++ b/tests/test_nimbusagent_functions_response.py @@ -1,6 +1,6 @@ import unittest -from nimbusagent.functions.responses import FuncResponse, InternalFuncResponse, DictFuncResponse +from nimbusagent.functions.responses import FuncResponse, DictFuncResponse class TestFuncResponses(unittest.TestCase): @@ -10,28 +10,10 @@ def test_func_response_initialization(self): self.assertEqual(fr.content, "hello") self.assertEqual(fr.summarize_only, True) - def test_func_response_to_internal_response(self): - fr = FuncResponse(content="hello") - internal_response = fr.to_internal_response("MyFunc") - self.assertEqual(internal_response.internal_thought['role'], 'function') - self.assertEqual(internal_response.internal_thought['name'], 'MyFunc') - self.assertEqual(internal_response.internal_thought['content'], 'hello') - - def test_internal_func_response_initialization(self): - ifr = InternalFuncResponse(content="hello", internal_thought={'role': 'function'}) - self.assertEqual(ifr.content, "hello") - self.assertEqual(ifr.internal_thought['role'], 'function') - def test_dict_func_response_initialization(self): dfr = DictFuncResponse({'content': 'hello'}) self.assertEqual(dfr.data['content'], 'hello') - - def test_dict_func_response_to_internal_response(self): - dfr = DictFuncResponse({'content': 'hello'}) - internal_response = dfr.to_internal_response("MyFunc") - self.assertEqual(internal_response.internal_thought['role'], 'function') - self.assertEqual(internal_response.internal_thought['name'], 'MyFunc') - self.assertEqual(internal_response.internal_thought['content'], 'hello') + self.assertEqual(dfr.content, 'hello') if __name__ == '__main__':