-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathgiphy.py
133 lines (110 loc) · 4.59 KB
/
giphy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import os
from typing import Annotated, Any, Optional
from autogen import register_function
from autogen.agentchat import ConversableAgent
from fastagency import UI, FastAgency
from fastagency.api.openapi.client import OpenAPI
from fastagency.api.openapi.security import APIKeyQuery
from fastagency.messages import TextInput
from fastagency.runtimes.autogen.agents.websurfer import WebSurferAgent
from fastagency.runtimes.autogen.autogen import AutoGenWorkflows
from fastagency.ui.mesop import MesopUI
llm_config = {
"config_list": [
{
"model": "meta-llama/Meta-Llama-3.1-70B-Instruct-Turbo",
"api_key": os.getenv("TOGETHER_API_KEY"),
"api_type": "together",
"hide_tools": "if_any_run"
}
],
"temperature": 0.8,
}
openapi_url = "https://raw.githubusercontent.com/airtai/fastagency/refs/heads/main/examples/openapi/giphy_openapi.json"
giphy_api = OpenAPI.create(openapi_url=openapi_url)
giphy_api_key = os.getenv("GIPHY_API_KEY", "")
giphy_api.set_security_params(APIKeyQuery.Parameters(value=giphy_api_key))
GIPHY_SYSTEM_MESSAGE = """You are an agent in charge to communicate with the user and Giphy API.
Always use 'present_completed_task_or_ask_question' to interact with the user.
- make sure that the 'message' parameter contains all the necessary information for the user!
Initially, the Web_Surfer_Agent will provide you with some content from the web.
You must present this content provided Web_Surfer_Agent to the user by using 'present_completed_task_or_ask_question'.
Along with the content, ask the user if he wants you to generate some gifs based on the content.
- Do NOT generate gifs BEFORE you present the web content to the user, otherwise, you will be penalized!
Once get the wanted gifs, present them to the user by using 'present_completed_task_or_ask_question' again.
Note: Use '.gif' files when presenting a gif to the user and format it as a markdown gif -> ![Title](url)
- Also, make sure to add new lines '\n\n' between headlines and gifs for better readability.
e.g.:
'''
# Here are some gifs for you:
## Title 1
![Title 1](url1)
## Title 2
![Title 2](url2)
'''
Write 'TERMINATE' to end the conversation."""
wf = AutoGenWorkflows()
@wf.register(name="giphy_and_websurfer", description="Giphy and Websurfer chat")
def giphy_workflow_with_security(
ui: UI, params: dict[str, Any]
) -> str:
def is_termination_msg(msg: dict[str, Any]) -> bool:
return msg["content"] is not None and "TERMINATE" in msg["content"]
def present_completed_task_or_ask_question(
message: Annotated[str, "Message for examiner"],
) -> Optional[str]:
try:
return ui.text_input(
sender="giphy_agent",
recipient="giphy_agent",
prompt=message,
)
except Exception as e: # pragma: no cover
return f"present_completed_task_or_ask_question() FAILED! {e}"
giphy_agent = ConversableAgent(
name="Giphy_Agent",
system_message=GIPHY_SYSTEM_MESSAGE,
llm_config=llm_config,
human_input_mode="NEVER",
is_termination_msg=is_termination_msg,
)
web_surfer = WebSurferAgent(
name="Web_Surfer_Agent",
llm_config=llm_config,
summarizer_llm_config=llm_config,
human_input_mode="NEVER",
executor=giphy_agent,
is_termination_msg=is_termination_msg,
)
register_function(
present_completed_task_or_ask_question,
caller=giphy_agent,
executor=web_surfer,
name="present_completed_task_or_ask_question",
description="""Present completed task or ask question.
If you are presenting a completed task, last message should be a question: 'Do yo need anything else?'""",
)
functions = ["random_gif", "search_gifs", "trending_gifs"]
wf.register_api(
api=giphy_api,
callers=giphy_agent,
executors=web_surfer,
functions=functions,
)
initial_message = ui.text_input(
sender="Workflow",
recipient="User",
prompt="I can help you find images related to a certain subject. What kind of images would you like to find?",
)
chat_result = giphy_agent.initiate_chat(
web_surfer,
message=f"Users initial message: {initial_message}",
summary_method="reflection_with_llm",
max_turns=10,
)
return chat_result.summary # type: ignore[no-any-return]
app = FastAgency(provider=wf, ui=MesopUI(), title="Giphy and Websurfer chat")
# install
# pip install "fastagency[autogen,mesop,openapi,fastagency,server]"
# run
# gunicorn giphy:app -b 0.0.0.0:8000 --reload